import zmq
from .constants import *
[docs]class SendReceiveMixin:
"""
Provides send or receive functionality for the sockets
"""
[docs] def send(self,message,message_type,topic=''):
"""
Send the message on the socket.
Args:
- message: the message to publish
- message_type: the type of message being sent
- topic: the topic on which to send the message. Defaults to ''.
"""
if message_type == RAW:
self._sock.send(message)
elif message_type == PYOBJ:
self._sock.send_pyobj(message)
elif message_type == JSON:
self._sock.send_json(message)
elif message_type == MULTIPART:
self._sock.send_multipart([topic, message])
elif message_type == STRING:
self._sock.send_string(message)
elif message_type == UNICODE:
self._sock.send_unicode(message)
else:
raise Exception("Unknown message type %s"%(message_type,))
[docs] def receive(self,message_type):
"""
Receive the message of the specified type and retun
Args:
- message_type: the type of the message to receive
Returns:
- the topic of the message
- the message received from the socket
"""
topic = None
message = None
if message_type == RAW:
message = self._sock.recv(flags=zmq.NOBLOCK)
elif message_type == PYOBJ:
message = self._sock.recv_pyobj(flags=zmq.NOBLOCK)
elif message_type == JSON:
message = self._sock.recv_json(flags=zmq.NOBLOCK)
elif message_type == MULTIPART:
data = self._sock.recv_multipart(flags=zmq.NOBLOCK)
message = data[1]
topic = data[0]
elif message_type == STRING:
message = self._sock.recv_string(flags=zmq.NOBLOCK)
elif message_type == UNICODE:
message = self._sock.recv_unicode(flags=zmq.NOBLOCK)
else:
raise Exception("Unknown message type %s"%(self._message_type,))
return (topic, message)
[docs]class ServerConnection(SendReceiveMixin,object):
"""
Creates a server side socket of given type.
Args:
- address: the address to use
- socket_type: the tyoe of socket to open
"""
_address = None
_ctx = None
_sock = None
def __init__(self, address,socket_type):
self._address = address
self._ctx = zmq.Context()
self._sock = self._ctx.socket(socket_type)
self._sock.bind(address)
[docs] def sock(self):
"""
Returns the zmq socket object being used for the connection. This can be used
to send messages with additional flags etc.
Returns:
- The underlying zmq socket
"""
return self._sock
[docs] def close(self):
"""
Close the socket connection.
"""
self._sock.close()
[docs]class ClientConnection(SendReceiveMixin,object):
"""
Creates a client side socket of given type.
Args:
- address: the address to use
- socket_type: the tyoe of socket to open
"""
_address = None
_ctx = None
_sock = None
def __init__(self, address,socket_type):
self._address = address
self._ctx = zmq.Context()
self._sock = self._ctx.socket(socket_type)
self._sock.connect(address)
[docs] def sock(self):
"""
Returns the zmq socket object being used for the connection. This can be used
to receive messages with additional flags etc.
Returns:
- The underlying zmq socket
"""
return self._sock
[docs] def close(self):
"""
Close the socket connection.
"""
self._sock.close()