python - Attaching ZMQStream with existing tornado ioloop -


i have application every websocket connection (within tornado open callback) creates zmq.sub socket existing zmq.forwarder device. idea receive data zmq callbacks, can relayed frontend clients on websocket connection.

https://gist.github.com/abhinavsingh/6378134

ws.py

import zmq zmq.eventloop import ioloop zmq.eventloop.zmqstream import zmqstream ioloop.install()  tornado.websocket import websockethandler tornado.web import application tornado.ioloop import ioloop ioloop = ioloop.instance()  class zmqpubsub(object):      def __init__(self, callback):         self.callback = callback      def connect(self):         self.context = zmq.context()         self.socket = self.context.socket(zmq.sub)         self.socket.connect('tcp://127.0.0.1:5560')         self.stream = zmqstream(self.socket)         self.stream.on_recv(self.callback)      def subscribe(self, channel_id):         self.socket.setsockopt(zmq.subscribe, channel_id)  class mywebsocket(websockethandler):      def open(self):         self.pubsub = zmqpubsub(self.on_data)         self.pubsub.connect()         self.pubsub.subscribe("session_id")         print 'ws opened'      def on_message(self, message):         print message      def on_close(self):         print 'ws closed'      def on_data(self, data):         print data  def main():     application = application([(r'/channel', mywebsocket)])     application.listen(10001)     print 'starting ws on port 10001'     ioloop.start()  if __name__ == '__main__':     main() 

forwarder.py

import zmq  def main():     try:         context = zmq.context(1)          frontend = context.socket(zmq.sub)         frontend.bind('tcp://*:5559')         frontend.setsockopt(zmq.subscribe, '')          backend = context.socket(zmq.pub)         backend.bind('tcp://*:5560')          print 'starting zmq forwarder'         zmq.device(zmq.forwarder, frontend, backend)     except keyboardinterrupt:         pass     except exception e:         logger.exception(e)     finally:         frontend.close()         backend.close()         context.term()  if __name__ == '__main__':     main() 

publish.py

import zmq  if __name__ == '__main__':     context = zmq.context()     socket = context.socket(zmq.pub)     socket.connect('tcp://127.0.0.1:5559')     socket.send('session_id helloworld')     print 'sent data channel session_id' 

however, zmqpubsub class doesn't seem receiving data @ all.

i further experimented , realized need call ioloop.ioloop.instance().start() after registering on_recv callback within zmqpubsub. but, block execution.

i tried passing main.ioloop instance zmqstream constructor doesn't either.

is there way can bind zmqstream existing main.ioloop instance without blocking flow within mywebsocket.open?

in complete example, change frontend in forwarder pull socket , publisher socket push, , should behave expect.

the general principles of socket choice relevant here:

  • use pub/sub when want send message ready receive (may no one)
  • use push/pull when want send message 1 peer, waiting them ready

it may appear want pub-sub, once start looking @ each socket pair, realize different. frontend-websocket connection pub-sub - may have zero-to-many receivers, , want send messages happens available when message comes through. backend side different - there 1 receiver, , wants every message publishers.

so there have - backend should pull , frontend pub. sockets:

push -> [pull-pub] -> sub 

publisher.py: socket push, connected backend in device.py

forwarder.py: backend pull, frontend pub ws.py: sub connects , subscribes forwarder.frontend.

the relevant behavior makes pub/sub fail on backend in case slow joiner syndrome, described in guide. essentially, subscribers take finite time tell publishers there subscriptions, if send message after opening pub socket, odds hasn't been told has subscribers yet, it's discarding messages.


Comments

Popular posts from this blog

Unable to remove the www from url on https using .htaccess -