Source code for zmqwrapper.subscribers

import zmq
from .sockets import ClientConnection
from .constants import *
import threading

[docs]def subscriber(address,topics,callback,message_type): """ Creates a subscriber binding to the given address and subscribe the given topics. The callback is invoked for every message received. Args: - address: the address to bind the PUB socket to. - topics: the topics to subscribe - callback: the callback to invoke for every message. Must accept 2 variables - topic and message - message_type: the type of message to receive """ return Subscriber(address,topics,callback,message_type)
[docs]class Subscriber(ClientConnection): """ Subscriber that can read messages from ZMQ Args: - address: the address to bind to - topics: the topics to subscribe - callback: the callback to invoke for every message - message_type: the type of message to receive """ def __init__(self,address,topics,callback,message_type): self._active = True self._topics = topics self._callback = callback self._message_type = message_type super(Subscriber,self).__init__(address,zmq.SUB) for topic in self._topics: self._sock.setsockopt_string(zmq.SUBSCRIBE,topic) def _consume(self): while self._active: try: topic, message=super(Subscriber,self).receive(self._message_type) #process the message self._callback(topic,message) except zmq.ZMQError: pass
[docs] def start(self): """ Start a thread that consumes the messages and invokes the callback """ t=threading.Thread(target=self._consume) t.start()
[docs] def stop(self): """ Stop the consumer thread """ self._active = False