Claws Garden

Python异步和事件循环探究

概念辨析

Python中的异步编程由事件循环驱动,在单线程的基础上提高运行效率,不存在并发问题。原理是让cpu不等待io、网络请求等耗时操作,而是直接执行其他任务代码。

Python中和异步有关的几个核心概念:

整个异步调用过程解析

示例内容

借用一下代码分析整个异步调用时到底发生了什么。

 1import asyncio
 2
 3async def main():
 4    res = await foo()
 5    print(res)
 6
 7async def foo():
 8    await asyncio.sleep(1)
 9    return 1
10
11asyncio.run(main())

进入事件循环

main() 访问一个Coroutine对象,可以放入asyncio.run()方法中。这个方法内部精髓部分如下:

 1# asyncio.run():
 2
 3# 如果有已经在进行的事件循环,报错
 4if events._get_running_loop() is not None:
 5    raise RuntimeError("asyncio.run() cannot be called from a running event loop")
 6# 如果传入的不是协程对象,报错
 7if not coroutines.iscoroutine(main):
 8    raise ValueError("a coroutine was expected, got {!r}".format(main))
 9# 创建一个新事件循环
10loop = events.new_event_loop()
11try:
12    events.set_event_loop(loop)
13    loop.set_debug(debug)
14    # 调用loop.run_until_complete方法
15    return loop.run_until_complete(main)
16finally:
17    try:
18        _cancel_all_tasks(loop)
19        loop.run_until_complete(loop.shutdown_asyncgens())
20    finally:
21        events.set_event_loop(None)
22        loop.close()

这里的重点就是调用loop.run_until_complete方法处理传入的协程,下面来看run_until_complete方法(以base_events为例):

 1# base_events.run_until_complete():
 2
 3new_task = not futures.isfuture(future)
 4# 保证接下来操作的是Future。如果传入的是协程,会被包装成Future
 5future = tasks.ensure_future(future, loop=self)
 6if new_task:
 7    ...
 8# 在该future执行完毕的时候,将“结束循环”回调函数加入事件循环的ready准备结束
 9future.add_done_callback(_run_until_complete_cb)
10try:
11	self.run_forever()
12except:
13    ...
14...
15
16return future.result()

看到重点是调用了自身循环的run_forever()方法,查看run_forever核心代码:

1# base_events.run_until_complete():
2
3events._set_running_loop(self)
4# 无限循环,直到停止flag被执行
5while True:
6    self._run_once()
7    if self._stopping:
8        break

run_forever将一个运行一轮循环的run_once循环执行,来看这个函数:

 1# base_events.run_once():
 2
 3# This calls all currently ready callbacks, polls for I/O,
 4# schedules the resulting callbacks, and finally schedules
 5#'call_later' callbacks.
 6
 7...
 8# 上面根据条件处理好回调函数堆,得到当前可以执行的ready函数列表。
 9
10# 执行所有ready函数
11ntodo = len(self._ready)
12for i in range(ntodo):
13    handle = self._ready.popleft()
14    if handle._cancelled:
15        continue
16    if self._debug:
17        ...
18    else:
19        handle._run()
20handle = None  # Needed to break cycles when an exception occurs.

最终事件循环被一轮一轮的循环驱动起来。

回到运行内容

在这个例子中,handle.run()运行了什么?想要解答这个问题,就要回到run_until_complete方法,看看协程被包装成Task(Future)的时候到底做了什么。

 1# tasks.ensure_future():
 2
 3if coroutines.iscoroutine(coro_or_future):
 4    if loop is None:
 5        loop = events.get_event_loop()
 6    # 如果传入的是协程,就创建一个任务,将其包装起来
 7    task = loop.create_task(coro_or_future)
 8    if task._source_traceback:
 9        del task._source_traceback[-1]
10    return task
11elif futures.isfuture(coro_or_future):
12    ...
13    return coro_or_future
14...

重点是中间通过create_task把协程创建成了任务,create就是实例化了一个新的task返回。这里来看看Task的构造函数:

1# tasks.__init__():
2
3...
4self._loop.call_soon(self.__step, context=self._context)
5...

其中做的最关键的就是将一个step函数加入到了事件循环中,loop.call_soon方法会直接将函数加入到ready队列里,在下次循环马上执行。然而这里的step函数是什么还不清楚,继续探究这里的step函数:

 1...
 2coro = self._coro
 3...
 4try:
 5    if exc is None:
 6        result = coro.send(None)
 7    else:
 8        result = coro.throw(exc)
 9except StopIteration as exc:
10    if self._must_cancel:
11        # Task is cancelled right before coro stops.
12        self._must_cancel = False
13        super().set_exception(futures.CancelledError())
14    else:
15        super().set_result(exc.value)
16else:
17    blocking = getattr(result, '_asyncio_future_blocking', None)
18    if blocking is not None:
19        if blocking:
20            ...
21            result._asyncio_future_blocking = False
22            result.add_done_callback(
23                self.__wakeup, context=self._context)
24            ...
25        else:
26            ...
27            self._loop.call_soon(
28                self.__step, new_exc, context=self._context)
29
30    elif result is None:
31        # Bare yield relinquishes control for one event loop iteration.
32        self._loop.call_soon(self.__step, context=self._context)
33    ...

在这个函数的核心部分,可以看到每次都会用coro.send(None)来驱动协程运行一次。这一次运行协程到什么时候为止呢?主要有三种情况:

  1. 遇到Future创建,把Future返回出来。后面会继续详细解释这种情况。
  2. 遇到yield没有返回值,中断然后返回出来。这种情况对应最后elif result is None的判断,会直接将下一步加入到事件循环ready中。
  3. 协程里的函数执行到头了,会抛出StopIteration异常,表示执行完毕。调用super.set_result(注意super就是Future),将返回值设置好,结束Future的执行。

也就是说,每一步并不是以当前协程中函数内await进行步骤划分的,而是以最里层真正碰到yield的地方划分的。

进入await和Future底层

重新来看今天的例子:

 1import asyncio
 2
 3async def main():
 4    res = await foo()
 5    print(res)
 6
 7async def foo():
 8    await asyncio.sleep(1)
 9    return 1
10
11asyncio.run(main())

在这个例子里面,main第一步开始执行后,并不是在await foo()停止,而是一直运行到asyncio.sleep(1)内部的yield语句。await如果后面是一个协程对象,则会进入协程的函数中继续执行。那来看一下sleep函数的构成:

 1async def sleep(delay, result=None, *, loop=None):
 2    """Coroutine that completes after a given time (in seconds)."""
 3    if delay <= 0:
 4        await __sleep0()
 5        return result
 6
 7    if loop is None:
 8        loop = events.get_event_loop()
 9    future = loop.create_future()
10    h = loop.call_later(delay, futures._set_result_unless_cancelled, future, result)
11    try:
12        return await future
13    finally:
14        h.cancel()

看到,这里创建了一个future,并且将它的set_result函数注册到事件循环中,然后返回了await future。在asyncio中,Future对象是awaitable的,换句话说,他内部定义了叫做__await__函数。凡是定义了这个函数的,都可以被await,然后在被await的时候调用此函数。那么Future对象在被await的时候会返回什么呢?来到futures.__await__()函数:

1def __await__(self):
2    if not self.done():
3        self._asyncio_future_blocking = True
4        yield self  # This tells Task to wait for completion.
5    if not self.done():
6        raise RuntimeError("await wasn't used with future")
7    return self.result()  # May raise too.

这里要注意,return才是返回值,yield返回的可不是返回值,而是可以理解为“挂起信息”。可以通过下面的程序理解这句话的含义:

 1import asyncio
 2
 3class Foo():
 4    def __await__(self):
 5        print("foo")
 6        yield 1
 7        print("ok")
 8        return 2
 9
10async def main():
11    print("haha")
12    a = await Foo() #a 接受到的是return出来的返回值。
13    print(a)
14    # 协程函数执行到结束时,会抛出异常
15
16coro = main()
17print(coro.send(None))# 这里接受到的是挂起信息
18print(coro.send(None))
19'''Output
20haha
21foo
221
23ok
242
25Traceback (most recent call last):
26  File "c:\Users\claws\learn\python\async_test\awaitable_test.py", line 17, in <module>
27    print(coro.send(None))
28StopIteration
29'''

await后面需要是定义了await函数的对象,并且会将挂起信息不断向外传播。底层实现还是generator那一套。

所以在future中,这里暂时挂起,并且将自己作为挂起信息返回出去了,这个挂起信息会不断传播到step的coro.send那里,变成step中coro.send的返回值被接收到,就可以实现在Future中添加回调了。等到下个step运行coro.send的时候,会将result真正return回到await的地方,继续执行后面的代码。

所以sleep的逻辑就是,注册一个1s后会触发设置result的Future,这个Future返回到step中被挂上执行下一个step的回调,达到代码至少1s之后才会再次继续执行的效果。

结论

事件循环是个调度函数运行的循环,驱动着Future、Task中各个函数的触发,让异步系统有效运转。而最最底层的支持还是要看yield这个关键字,这个关键字让函数可以挂起,执行原先的代码,在下次请求执行的时候恢复。

扩展思考

思考以下代码的输出:

 1import asyncio
 2
 3async def shoot():
 4    print("ping")
 5    await asyncio.sleep(1)
 6    print("pong")
 7
 8async def main():
 9    print("start shoot")
10    shoot_task = asyncio.create_task(shoot())
11    print("shoot over")
12    # leave
13
14asyncio.run(main())

输出是

1start shoot
2shoot over
3ping

为什么会有这样的输出呢?来一步一步分析整个程序执行的过程:

  1. asyncio.run(main()),后面调用loop.run_until_complete(main())
  2. loop.run_until_complete(main())中将main()协程包装成Task,然后将_run_until_complete_cb加入到该Task的回调函数中。这个cb的意思是call_back,而其中所做的事情就是停止循环。task的第一个step被放入循环的ready中。此时ready:[mainTask的step函数]
  3. 循环从ready中取出mainTask的step运行,在asyncio.create_task(shoot())时创建了一个新的任务。创建任务的过程中,也将该shoot任务的step加入循环ready,此时ready:[shootTask的step]
  4. main函数直接结束,触发StopIteration异常,让该Task触发set_result然后返回,这个时候mainTask因为被循环绑定了回调_run_until_complete_cb,会将_run_until_complete_cb放入循环的ready。此时ready:[shootTask的step,_run_until_complete_cb]
  5. 执行shootTask的step之后,停止在asyncio.sleep中,这里将一个Future的set_result放入循环的later堆里面。然后马上取出**_run_until_complete_cb**执行,循环不等sleep兑现就提前结束.

循环结束后,会重置循环到新的状态,如果后来又通过asyncio.run或其他方式重启循环,不会受到之前循环中残留工作的影响。

#Python