Source code for zmqwrapper.consumers

import zmq
from .sockets import ClientConnection
from .constants import *
import threading

[docs]def consumer(address,callback,message_type): """ Creates a consumer binding to the given address pull messages. The callback is invoked for every reply received. Args: - address: the address to bind the PULL socket to. - callback: the callback to invoke for every message. Must accept 1 variables - the message - message_type: the type of message to receive """ return Consumer(address,callback,message_type)
[docs]class Consumer(ClientConnection): """ Requestor that that can send requests of given type Args: - address: the address to bind to - callback: the callback to invoke for every reply - message_type: the type of request to send """ def __init__(self,address,callback,message_type): self._active = True self._callback = callback self._message_type = message_type super(Consumer,self).__init__(address,zmq.PULL) def _consume(self): while self._active: try: topic, message=super(Consumer,self).receive(self._message_type) #process the message self._callback(message) except zmq.ZMQError: pass
[docs] def start(self): """ Start a thread that consumes the replies and invokes the callback """ t=threading.Thread(target=self._consume) t.start()
[docs] def stop(self): """ Stop the consumer thread """ self._active = False