python - Attaching ZMQStream with existing tornado ioloop -
i have application every websocket connection (within tornado open callback) creates zmq.sub
socket existing zmq.forwarder
device. idea receive data zmq callbacks, can relayed frontend clients on websocket connection.
https://gist.github.com/abhinavsingh/6378134
ws.py
import zmq zmq.eventloop import ioloop zmq.eventloop.zmqstream import zmqstream ioloop.install() tornado.websocket import websockethandler tornado.web import application tornado.ioloop import ioloop ioloop = ioloop.instance() class zmqpubsub(object): def __init__(self, callback): self.callback = callback def connect(self): self.context = zmq.context() self.socket = self.context.socket(zmq.sub) self.socket.connect('tcp://127.0.0.1:5560') self.stream = zmqstream(self.socket) self.stream.on_recv(self.callback) def subscribe(self, channel_id): self.socket.setsockopt(zmq.subscribe, channel_id) class mywebsocket(websockethandler): def open(self): self.pubsub = zmqpubsub(self.on_data) self.pubsub.connect() self.pubsub.subscribe("session_id") print 'ws opened' def on_message(self, message): print message def on_close(self): print 'ws closed' def on_data(self, data): print data def main(): application = application([(r'/channel', mywebsocket)]) application.listen(10001) print 'starting ws on port 10001' ioloop.start() if __name__ == '__main__': main()
forwarder.py
import zmq def main(): try: context = zmq.context(1) frontend = context.socket(zmq.sub) frontend.bind('tcp://*:5559') frontend.setsockopt(zmq.subscribe, '') backend = context.socket(zmq.pub) backend.bind('tcp://*:5560') print 'starting zmq forwarder' zmq.device(zmq.forwarder, frontend, backend) except keyboardinterrupt: pass except exception e: logger.exception(e) finally: frontend.close() backend.close() context.term() if __name__ == '__main__': main()
publish.py
import zmq if __name__ == '__main__': context = zmq.context() socket = context.socket(zmq.pub) socket.connect('tcp://127.0.0.1:5559') socket.send('session_id helloworld') print 'sent data channel session_id'
however, zmqpubsub
class doesn't seem receiving data @ all.
i further experimented , realized need call ioloop.ioloop.instance().start()
after registering on_recv
callback within zmqpubsub
. but, block execution.
i tried passing main.ioloop
instance zmqstream
constructor doesn't either.
is there way can bind zmqstream
existing main.ioloop
instance without blocking flow within mywebsocket.open
?
in complete example, change frontend
in forwarder pull socket , publisher socket push, , should behave expect.
the general principles of socket choice relevant here:
- use pub/sub when want send message ready receive (may no one)
- use push/pull when want send message 1 peer, waiting them ready
it may appear want pub-sub, once start looking @ each socket pair, realize different. frontend-websocket
connection pub-sub - may have zero-to-many receivers, , want send messages happens available when message comes through. backend side different - there 1 receiver, , wants every message publishers.
so there have - backend should pull , frontend pub. sockets:
push -> [pull-pub] -> sub
publisher.py: socket push
, connected backend
in device.py
forwarder.py: backend
pull
, frontend
pub
ws.py: sub
connects , subscribes forwarder.frontend
.
the relevant behavior makes pub/sub fail on backend in case slow joiner syndrome, described in guide. essentially, subscribers take finite time tell publishers there subscriptions, if send message after opening pub socket, odds hasn't been told has subscribers yet, it's discarding messages.
Comments
Post a Comment