asynchronous - Python Asyncio blocked coroutine -


i'm trying code simple program based on asyncio , publish/subscribe design pattern implemented zeromq. publisher has 2 coroutines; 1 listens incoming subscriptions, , 1 publishes value (obtained via http request) subscriber. subscriber subscribes specific parameter (the name of city in case), , waits value (the temperature in city).

here code:

publisher.py

#!/usr/bin/env python  import json import aiohttp import aiozmq import asyncio import zmq   class publisher:     bind_address = 'tcp://*:10000'      def __init__(self):         self.stream = none         self.parameter = ""      @asyncio.coroutine     def main(self):         self.stream = yield aiozmq.create_zmq_stream(zmq.xpub, bind=publisher.bind_address)         tasks = [             asyncio.async(self.subscriptions()),             asyncio.async(self.publish())]         print("before wait")         yield asyncio.wait(tasks)         print("after wait")      @asyncio.coroutine     def subscriptions(self):         print("entered subscriptions coroutine")         while true:             print("new iteration of subscriptions loop")             received = yield self.stream.read()             first_byte = received[0][0]             self.parameter = received[0][-len(received[0])+1:].decode("utf-8")             # subscribe request             if first_byte == 1:                 print("subscription request received parameter "+self.parameter)             # unsubscribe request             elif first_byte == 0:                 print("unsubscription request received parameter "+self.parameter)       @asyncio.coroutine     def publish(self):         print("entered publish coroutine")         while true:             if self.parameter:                 print("new iteration of publish loop")                  # make http request                 url = "http://api.openweathermap.org/data/2.5/weather?q="+self.parameter                 response = yield aiohttp.request('get', url)                 assert response.status == 200                 content = yield response.read()                  # decode json string                 decoded_json = json.loads(content.decode())                  # parameter value                 value = decoded_json["main"]["temp"]                  # publish fetched values subscribers                 message = bytearray(self.parameter+":"+str(value),"utf-8")                 print(message)                 pack = [message]                  print("before write")                 yield self.stream.write(pack)                 print("after write")              yield asyncio.sleep(10)  test = publisher() loop = asyncio.get_event_loop() loop.run_until_complete(test.main()) 

subscriber.py

#!/usr/bin/env python  import zmq  class subscriber:     xsub_connect = 'tcp://localhost:10000'      def __init__(self):         self.context = zmq.context()         self.socket = self.context.socket(zmq.xsub)         self.socket.connect(subscriber.xsub_connect)      def loop(self):         print(self.socket.recv())         self.socket.close()      def subscribe(self, parameter):         self.socket.send_string('\x01'+parameter)         print("subscribed parameter "+parameter)      def unsubscribe(self, parameter):         self.socket.send_string('\x00'+parameter)         print("unsubscribed parameter "+parameter)  test = subscriber() test.subscribe("london") while true:     print(test.socket.recv()) 

and here output :

subscriber side :

$ python3 subscriber.py      subscribed parameter london     b'london:288.15' 

publisher side :

$ python3 publisher.py      before wait     entered subscriptions coroutine     new iteration of subscriptions loop     entered publish coroutine     subscription request received parameter london     new iteration of subscriptions loop     new iteration of publish loop     bytearray(b'london:288.15')     before write 

and program stuck there.

as can see, "before write" appears in output , message sent, "after write" doesn't appear. so, figured exception raised , caught somewhere in self.stream.write(pack) call stack.

if send keyboardinterrupt publisher, here get:

traceback (most recent call last):   file "publisher.py", line 73, in <module>     loop.run_until_complete(test.main())   file "/usr/lib/python3.4/asyncio/base_events.py", line 304, in run_until_complete     self.run_forever()   file "/usr/lib/python3.4/asyncio/base_events.py", line 276, in run_forever     self._run_once()   file "/usr/lib/python3.4/asyncio/base_events.py", line 1136, in _run_once     event_list = self._selector.select(timeout)   file "/usr/lib/python3.4/selectors.py", line 432, in select     fd_event_list = self._epoll.poll(timeout, max_ev) keyboardinterrupt task exception never retrieved future: <task finished coro=<publish() done, defined @ publisher.py:43> exception=typeerror("'nonetype' object not iterable",)> traceback (most recent call last):   file "/usr/lib/python3.4/asyncio/tasks.py", line 236, in _step     result = coro.send(value)   file "publisher.py", line 66, in publish     yield self.stream.write(pack) typeerror: 'nonetype' object not iterable task destroyed pending! task: <task pending coro=<subscriptions() running @ publisher.py:32> wait_for=<future pending cb=[task._wakeup()]> cb=[_wait.<locals>._on_completion() @ /usr/lib/python3.4/asyncio/tasks.py:399]> 

so guess problem error: typeerror: 'nonetype' object not iterable, have no clue what's causing it.

what going wrong here?

the issue you're trying yield from call self.stream.write(), stream.write isn't coroutine. when call yield from on item, python internally calls iter(item). in case, call write() returning none, python trying iter(none) - hence exception see.

to fix it, should call write() normal function. if want wait until write flushed , sent reader, use yield stream.drain() after make call write():

print("before write") self.stream.write(pack) yield self.stream.drain() print("after write") 

also, make sure exception in publish raised without needing ctrl+c, use asyncio.gather instead of asyncio.wait:

    yield asyncio.gather(*tasks) 

with asyncio.gather, exception thrown task inside tasks re-raised.


Comments

Popular posts from this blog

How has firefox/gecko HTML+CSS rendering changed in version 38? -

android - CollapsingToolbarLayout: position the ExpandedText programmatically -

Listeners to visualise results of load test in JMeter -