Source code for zmqwrapper.repliers

import zmq
from .sockets import ServerConnection
from .constants import *
import threading

[docs]def replier(address,callback,message_type): """ Creates a replier binding to the given address send replies. The callback is invoked for every request received. Args: - address: the address to bind the REP socket to. - callback: the callback to invoke for every message. Must accept 2 variables - message and the replier - message_type: the type of message to receive """ return Replier(address,callback,message_type)
[docs]class Replier(ServerConnection): """ Requestor that that can respond to requests of given type Args: - address: the address to bind to - callback: the callback to invoke for every request - message_type: the type of reply to send """ def __init__(self,address,callback,message_type): self._active = True self._callback = callback self._message_type = message_type super(Replier,self).__init__(address,zmq.REP) def _consume(self): while self._active: try: topic, message=super(Replier,self).receive(self._message_type) #process the message self._callback(message,self) except zmq.ZMQError: pass
[docs] def start(self): """ Start a thread that consumes the requests and invokes the callback """ t=threading.Thread(target=self._consume) t.start()
[docs] def stop(self): """ Stop the consumer thread """ self._active = False
[docs] def reply(self,message,message_type): """ Send a reply message of the given type Args: - message: the message to publish - message_type: the type of message being sent """ if message_type == MULTIPART: raise Exception("Unsupported reply type") super(Replier,self).send(message,message_type)