Python异步和事件循环探究
概念辨析
Python中的异步编程由事件循环驱动,在单线程的基础上提高运行效率,不存在并发问题。原理是让cpu不等待io、网络请求等耗时操作,而是直接执行其他任务代码。
Python中和异步有关的几个核心概念:
- Event Loop:事件循环。可以把函数注册到时间循环中,注册时被包装成Handler,Handler可以追踪函数执行的状态。事件循环中维护了ready队列,每个循环中,会执行当前ready队列中的所有函数。事件循环还可以根据条件将注册到“等待”队列中的任务检查并放入ready中准备执行,例如可以将设置1分钟后执行的任务在1分钟后的检查中放入ready。Event中维护的本质是函数的队列,调度函数的执行。
- Handler:函数在Event Loop中被包装成Handler,方便追踪其运行状态。
- Future:表示一个异步操作的最终结果,是awaitable的,被await的时候会返回自身。Future对象创建时需要注册到一个时间循环中,在条件达成的时候被set_result,最终执行完成返回结果。Future可以绑定一个回调函数,表示Future完成(即set_result)的时候,该回调函数会被放在事件循环的ready列表中,等待被马上的一轮循环执行掉。Future本身不能不会放入事件循环中,而是用户可以将future的set_result函数注册到时间循环中,来表示将来会被set_result完成掉。以某个Future开始的事件循环,Future会被注册一个完成的回调,让当前Future执行后循环就自动结束。连接底层回调和高层async/await异步的桥梁
- Task:Future的子类,里面可以包装一个Coroutine,然后通过注册step函数到事件循环中,来每次执行到产生下一个Future之间这一“步”,将协程对象真正以可暂停的方式在事件循环中执行。
- Coroutine:协程,一个把函数包装起来的特殊对象。直接调用async声明的异步函数,就会产生一个Coroutine对象。如果协程对象被await,就会执行里面函数的代码。可以被包装成Task之后在事件循环中可暂停地执行。时间循环中的不同协程看起来就像可以互相切换一样。
整个异步调用过程解析
示例内容
借用一下代码分析整个异步调用时到底发生了什么。
1import asyncio
2
3async def main():
4 res = await foo()
5 print(res)
6
7async def foo():
8 await asyncio.sleep(1)
9 return 1
10
11asyncio.run(main())
进入事件循环
main() 访问一个Coroutine对象,可以放入asyncio.run()方法中。这个方法内部精髓部分如下:
1# asyncio.run():
2
3# 如果有已经在进行的事件循环,报错
4if events._get_running_loop() is not None:
5 raise RuntimeError("asyncio.run() cannot be called from a running event loop")
6# 如果传入的不是协程对象,报错
7if not coroutines.iscoroutine(main):
8 raise ValueError("a coroutine was expected, got {!r}".format(main))
9# 创建一个新事件循环
10loop = events.new_event_loop()
11try:
12 events.set_event_loop(loop)
13 loop.set_debug(debug)
14 # 调用loop.run_until_complete方法
15 return loop.run_until_complete(main)
16finally:
17 try:
18 _cancel_all_tasks(loop)
19 loop.run_until_complete(loop.shutdown_asyncgens())
20 finally:
21 events.set_event_loop(None)
22 loop.close()
这里的重点就是调用loop.run_until_complete方法处理传入的协程,下面来看run_until_complete方法(以base_events为例):
1# base_events.run_until_complete():
2
3new_task = not futures.isfuture(future)
4# 保证接下来操作的是Future。如果传入的是协程,会被包装成Future
5future = tasks.ensure_future(future, loop=self)
6if new_task:
7 ...
8# 在该future执行完毕的时候,将“结束循环”回调函数加入事件循环的ready准备结束
9future.add_done_callback(_run_until_complete_cb)
10try:
11 self.run_forever()
12except:
13 ...
14...
15
16return future.result()
看到重点是调用了自身循环的run_forever()方法,查看run_forever核心代码:
1# base_events.run_until_complete():
2
3events._set_running_loop(self)
4# 无限循环,直到停止flag被执行
5while True:
6 self._run_once()
7 if self._stopping:
8 break
run_forever将一个运行一轮循环的run_once循环执行,来看这个函数:
1# base_events.run_once():
2
3# This calls all currently ready callbacks, polls for I/O,
4# schedules the resulting callbacks, and finally schedules
5#'call_later' callbacks.
6
7...
8# 上面根据条件处理好回调函数堆,得到当前可以执行的ready函数列表。
9
10# 执行所有ready函数
11ntodo = len(self._ready)
12for i in range(ntodo):
13 handle = self._ready.popleft()
14 if handle._cancelled:
15 continue
16 if self._debug:
17 ...
18 else:
19 handle._run()
20handle = None # Needed to break cycles when an exception occurs.
最终事件循环被一轮一轮的循环驱动起来。
回到运行内容
在这个例子中,handle.run()运行了什么?想要解答这个问题,就要回到run_until_complete方法,看看协程被包装成Task(Future)的时候到底做了什么。
1# tasks.ensure_future():
2
3if coroutines.iscoroutine(coro_or_future):
4 if loop is None:
5 loop = events.get_event_loop()
6 # 如果传入的是协程,就创建一个任务,将其包装起来
7 task = loop.create_task(coro_or_future)
8 if task._source_traceback:
9 del task._source_traceback[-1]
10 return task
11elif futures.isfuture(coro_or_future):
12 ...
13 return coro_or_future
14...
重点是中间通过create_task把协程创建成了任务,create就是实例化了一个新的task返回。这里来看看Task的构造函数:
1# tasks.__init__():
2
3...
4self._loop.call_soon(self.__step, context=self._context)
5...
其中做的最关键的就是将一个step函数加入到了事件循环中,loop.call_soon方法会直接将函数加入到ready队列里,在下次循环马上执行。然而这里的step函数是什么还不清楚,继续探究这里的step函数:
1...
2coro = self._coro
3...
4try:
5 if exc is None:
6 result = coro.send(None)
7 else:
8 result = coro.throw(exc)
9except StopIteration as exc:
10 if self._must_cancel:
11 # Task is cancelled right before coro stops.
12 self._must_cancel = False
13 super().set_exception(futures.CancelledError())
14 else:
15 super().set_result(exc.value)
16else:
17 blocking = getattr(result, '_asyncio_future_blocking', None)
18 if blocking is not None:
19 if blocking:
20 ...
21 result._asyncio_future_blocking = False
22 result.add_done_callback(
23 self.__wakeup, context=self._context)
24 ...
25 else:
26 ...
27 self._loop.call_soon(
28 self.__step, new_exc, context=self._context)
29
30 elif result is None:
31 # Bare yield relinquishes control for one event loop iteration.
32 self._loop.call_soon(self.__step, context=self._context)
33 ...
在这个函数的核心部分,可以看到每次都会用coro.send(None)来驱动协程运行一次。这一次运行协程到什么时候为止呢?主要有三种情况:
- 遇到Future创建,把Future返回出来。后面会继续详细解释这种情况。
- 遇到yield没有返回值,中断然后返回出来。这种情况对应最后elif result is None的判断,会直接将下一步加入到事件循环ready中。
- 协程里的函数执行到头了,会抛出StopIteration异常,表示执行完毕。调用super.set_result(注意super就是Future),将返回值设置好,结束Future的执行。
也就是说,每一步并不是以当前协程中函数内await进行步骤划分的,而是以最里层真正碰到yield的地方划分的。
进入await和Future底层
重新来看今天的例子:
1import asyncio
2
3async def main():
4 res = await foo()
5 print(res)
6
7async def foo():
8 await asyncio.sleep(1)
9 return 1
10
11asyncio.run(main())
在这个例子里面,main第一步开始执行后,并不是在await foo()停止,而是一直运行到asyncio.sleep(1)内部的yield语句。await如果后面是一个协程对象,则会进入协程的函数中继续执行。那来看一下sleep函数的构成:
1async def sleep(delay, result=None, *, loop=None):
2 """Coroutine that completes after a given time (in seconds)."""
3 if delay <= 0:
4 await __sleep0()
5 return result
6
7 if loop is None:
8 loop = events.get_event_loop()
9 future = loop.create_future()
10 h = loop.call_later(delay, futures._set_result_unless_cancelled, future, result)
11 try:
12 return await future
13 finally:
14 h.cancel()
看到,这里创建了一个future,并且将它的set_result函数注册到事件循环中,然后返回了await future。在asyncio中,Future对象是awaitable的,换句话说,他内部定义了叫做__await__函数。凡是定义了这个函数的,都可以被await,然后在被await的时候调用此函数。那么Future对象在被await的时候会返回什么呢?来到futures.__await__()函数:
1def __await__(self):
2 if not self.done():
3 self._asyncio_future_blocking = True
4 yield self # This tells Task to wait for completion.
5 if not self.done():
6 raise RuntimeError("await wasn't used with future")
7 return self.result() # May raise too.
这里要注意,return才是返回值,yield返回的可不是返回值,而是可以理解为“挂起信息”。可以通过下面的程序理解这句话的含义:
1import asyncio
2
3class Foo():
4 def __await__(self):
5 print("foo")
6 yield 1
7 print("ok")
8 return 2
9
10async def main():
11 print("haha")
12 a = await Foo() #a 接受到的是return出来的返回值。
13 print(a)
14 # 协程函数执行到结束时,会抛出异常
15
16coro = main()
17print(coro.send(None))# 这里接受到的是挂起信息
18print(coro.send(None))
19'''Output
20haha
21foo
221
23ok
242
25Traceback (most recent call last):
26 File "c:\Users\claws\learn\python\async_test\awaitable_test.py", line 17, in <module>
27 print(coro.send(None))
28StopIteration
29'''
await后面需要是定义了await函数的对象,并且会将挂起信息不断向外传播。底层实现还是generator那一套。
所以在future中,这里暂时挂起,并且将自己作为挂起信息返回出去了,这个挂起信息会不断传播到step的coro.send那里,变成step中coro.send的返回值被接收到,就可以实现在Future中添加回调了。等到下个step运行coro.send的时候,会将result真正return回到await的地方,继续执行后面的代码。
所以sleep的逻辑就是,注册一个1s后会触发设置result的Future,这个Future返回到step中被挂上执行下一个step的回调,达到代码至少1s之后才会再次继续执行的效果。
结论
事件循环是个调度函数运行的循环,驱动着Future、Task中各个函数的触发,让异步系统有效运转。而最最底层的支持还是要看yield这个关键字,这个关键字让函数可以挂起,执行原先的代码,在下次请求执行的时候恢复。
扩展思考
思考以下代码的输出:
1import asyncio
2
3async def shoot():
4 print("ping")
5 await asyncio.sleep(1)
6 print("pong")
7
8async def main():
9 print("start shoot")
10 shoot_task = asyncio.create_task(shoot())
11 print("shoot over")
12 # leave
13
14asyncio.run(main())
输出是
1start shoot
2shoot over
3ping
为什么会有这样的输出呢?来一步一步分析整个程序执行的过程:
asyncio.run(main())
,后面调用loop.run_until_complete(main())
loop.run_until_complete(main())
中将main()协程包装成Task,然后将_run_until_complete_cb加入到该Task的回调函数中。这个cb的意思是call_back,而其中所做的事情就是停止循环。task的第一个step被放入循环的ready中。此时ready:[mainTask的step函数]- 循环从ready中取出mainTask的step运行,在
asyncio.create_task(shoot())
时创建了一个新的任务。创建任务的过程中,也将该shoot任务的step加入循环ready,此时ready:[shootTask的step] - main函数直接结束,触发StopIteration异常,让该Task触发set_result然后返回,这个时候mainTask因为被循环绑定了回调
_run_until_complete_cb
,会将_run_until_complete_cb
放入循环的ready。此时ready:[shootTask的step,_run_until_complete_cb] - 执行shootTask的step之后,停止在asyncio.sleep中,这里将一个Future的set_result放入循环的later堆里面。然后马上取出**_run_until_complete_cb**执行,循环不等sleep兑现就提前结束.
循环结束后,会重置循环到新的状态,如果后来又通过asyncio.run或其他方式重启循环,不会受到之前循环中残留工作的影响。