asynchronous - Python Asyncio blocked coroutine -
i'm trying code simple program based on asyncio , publish/subscribe design pattern implemented zeromq. publisher has 2 coroutines; 1 listens incoming subscriptions, , 1 publishes value (obtained via http request) subscriber. subscriber subscribes specific parameter (the name of city in case), , waits value (the temperature in city).
here code:
publisher.py
#!/usr/bin/env python import json import aiohttp import aiozmq import asyncio import zmq class publisher: bind_address = 'tcp://*:10000' def __init__(self): self.stream = none self.parameter = "" @asyncio.coroutine def main(self): self.stream = yield aiozmq.create_zmq_stream(zmq.xpub, bind=publisher.bind_address) tasks = [ asyncio.async(self.subscriptions()), asyncio.async(self.publish())] print("before wait") yield asyncio.wait(tasks) print("after wait") @asyncio.coroutine def subscriptions(self): print("entered subscriptions coroutine") while true: print("new iteration of subscriptions loop") received = yield self.stream.read() first_byte = received[0][0] self.parameter = received[0][-len(received[0])+1:].decode("utf-8") # subscribe request if first_byte == 1: print("subscription request received parameter "+self.parameter) # unsubscribe request elif first_byte == 0: print("unsubscription request received parameter "+self.parameter) @asyncio.coroutine def publish(self): print("entered publish coroutine") while true: if self.parameter: print("new iteration of publish loop") # make http request url = "http://api.openweathermap.org/data/2.5/weather?q="+self.parameter response = yield aiohttp.request('get', url) assert response.status == 200 content = yield response.read() # decode json string decoded_json = json.loads(content.decode()) # parameter value value = decoded_json["main"]["temp"] # publish fetched values subscribers message = bytearray(self.parameter+":"+str(value),"utf-8") print(message) pack = [message] print("before write") yield self.stream.write(pack) print("after write") yield asyncio.sleep(10) test = publisher() loop = asyncio.get_event_loop() loop.run_until_complete(test.main())
subscriber.py
#!/usr/bin/env python import zmq class subscriber: xsub_connect = 'tcp://localhost:10000' def __init__(self): self.context = zmq.context() self.socket = self.context.socket(zmq.xsub) self.socket.connect(subscriber.xsub_connect) def loop(self): print(self.socket.recv()) self.socket.close() def subscribe(self, parameter): self.socket.send_string('\x01'+parameter) print("subscribed parameter "+parameter) def unsubscribe(self, parameter): self.socket.send_string('\x00'+parameter) print("unsubscribed parameter "+parameter) test = subscriber() test.subscribe("london") while true: print(test.socket.recv())
and here output :
subscriber side :
$ python3 subscriber.py subscribed parameter london b'london:288.15'
publisher side :
$ python3 publisher.py before wait entered subscriptions coroutine new iteration of subscriptions loop entered publish coroutine subscription request received parameter london new iteration of subscriptions loop new iteration of publish loop bytearray(b'london:288.15') before write
and program stuck there.
as can see, "before write"
appears in output , message sent, "after write"
doesn't appear. so, figured exception raised , caught somewhere in self.stream.write(pack)
call stack.
if send keyboardinterrupt
publisher, here get:
traceback (most recent call last): file "publisher.py", line 73, in <module> loop.run_until_complete(test.main()) file "/usr/lib/python3.4/asyncio/base_events.py", line 304, in run_until_complete self.run_forever() file "/usr/lib/python3.4/asyncio/base_events.py", line 276, in run_forever self._run_once() file "/usr/lib/python3.4/asyncio/base_events.py", line 1136, in _run_once event_list = self._selector.select(timeout) file "/usr/lib/python3.4/selectors.py", line 432, in select fd_event_list = self._epoll.poll(timeout, max_ev) keyboardinterrupt task exception never retrieved future: <task finished coro=<publish() done, defined @ publisher.py:43> exception=typeerror("'nonetype' object not iterable",)> traceback (most recent call last): file "/usr/lib/python3.4/asyncio/tasks.py", line 236, in _step result = coro.send(value) file "publisher.py", line 66, in publish yield self.stream.write(pack) typeerror: 'nonetype' object not iterable task destroyed pending! task: <task pending coro=<subscriptions() running @ publisher.py:32> wait_for=<future pending cb=[task._wakeup()]> cb=[_wait.<locals>._on_completion() @ /usr/lib/python3.4/asyncio/tasks.py:399]>
so guess problem error: typeerror: 'nonetype' object not iterable
, have no clue what's causing it.
what going wrong here?
the issue you're trying yield from
call self.stream.write()
, stream.write
isn't coroutine. when call yield from
on item, python internally calls iter(item)
. in case, call write()
returning none
, python trying iter(none)
- hence exception see.
to fix it, should call write()
normal function. if want wait until write
flushed , sent reader, use yield stream.drain()
after make call write()
:
print("before write") self.stream.write(pack) yield self.stream.drain() print("after write")
also, make sure exception in publish
raised without needing ctrl+c, use asyncio.gather
instead of asyncio.wait
:
yield asyncio.gather(*tasks)
with asyncio.gather
, exception thrown task inside tasks
re-raised.
Comments
Post a Comment