概念辨析

Python中的异步编程由事件循环驱动,在单线程的基础上提高运行效率,不存在并发问题。原理是让cpu不等待io、网络请求等耗时操作,而是直接执行其他任务代码。

Python中和异步有关的几个核心概念:

  • Event Loop:事件循环。可以把函数注册到时间循环中,注册时被包装成Handler,Handler可以追踪函数执行的状态。事件循环中维护了ready队列,每个循环中,会执行当前ready队列中的所有函数。事件循环还可以根据条件将注册到“等待”队列中的任务检查并放入ready中准备执行,例如可以将设置1分钟后执行的任务在1分钟后的检查中放入ready。Event中维护的本质是函数的队列,调度函数的执行。
  • Handler:函数在Event Loop中被包装成Handler,方便追踪其运行状态。
  • Future:表示一个异步操作的最终结果,是awaitable的,被await的时候会返回自身。Future对象创建时需要注册到一个时间循环中,在条件达成的时候被set_result,最终执行完成返回结果。Future可以绑定一个回调函数,表示Future完成(即set_result)的时候,该回调函数会被放在事件循环的ready列表中,等待被马上的一轮循环执行掉。Future本身不能不会放入事件循环中,而是用户可以将future的set_result函数注册到时间循环中,来表示将来会被set_result完成掉。以某个Future开始的事件循环,Future会被注册一个完成的回调,让当前Future执行后循环就自动结束。连接底层回调和高层async/await异步的桥梁
  • Task:Future的子类,里面可以包装一个Coroutine,然后通过注册step函数到事件循环中,来每次执行到产生下一个Future之间这一“步”,将协程对象真正以可暂停的方式在事件循环中执行。
  • Coroutine:协程,一个把函数包装起来的特殊对象。直接调用async声明的异步函数,就会产生一个Coroutine对象。如果协程对象被await,就会执行里面函数的代码。可以被包装成Task之后在事件循环中可暂停地执行。时间循环中的不同协程看起来就像可以互相切换一样。

整个异步调用过程解析

示例内容

借用一下代码分析整个异步调用时到底发生了什么。

import asyncio

async def main():
    res = await foo()
    print(res)

async def foo():
    await asyncio.sleep(1)
    return 1

asyncio.run(main())

进入事件循环

main() 访问一个Coroutine对象,可以放入asyncio.run()方法中。这个方法内部精髓部分如下:

# asyncio.run():

# 如果有已经在进行的事件循环,报错
if events._get_running_loop() is not None:
    raise RuntimeError("asyncio.run() cannot be called from a running event loop")
# 如果传入的不是协程对象,报错
if not coroutines.iscoroutine(main):
    raise ValueError("a coroutine was expected, got {!r}".format(main))
# 创建一个新事件循环
loop = events.new_event_loop()
try:
    events.set_event_loop(loop)
    loop.set_debug(debug)
    # 调用loop.run_until_complete方法
    return loop.run_until_complete(main)
finally:
    try:
        _cancel_all_tasks(loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        events.set_event_loop(None)
        loop.close()

这里的重点就是调用loop.run_until_complete方法处理传入的协程,下面来看run_until_complete方法(以base_events为例):

# base_events.run_until_complete():

new_task = not futures.isfuture(future)
# 保证接下来操作的是Future。如果传入的是协程,会被包装成Future
future = tasks.ensure_future(future, loop=self)
if new_task:
    ...
# 在该future执行完毕的时候,将“结束循环”回调函数加入事件循环的ready准备结束
future.add_done_callback(_run_until_complete_cb)
try:
    self.run_forever()
except:
    ...
...

return future.result()

看到重点是调用了自身循环的run_forever()方法,查看run_forever核心代码:

# base_events.run_until_complete():

events._set_running_loop(self)
# 无限循环,直到停止flag被执行
while True:
    self._run_once()
    if self._stopping:
        break

run_forever将一个运行一轮循环的run_once循环执行,来看这个函数:

# base_events.run_once():

# This calls all currently ready callbacks, polls for I/O,
# schedules the resulting callbacks, and finally schedules
#'call_later' callbacks.

...
# 上面根据条件处理好回调函数堆,得到当前可以执行的ready函数列表。

# 执行所有ready函数
ntodo = len(self._ready)
for i in range(ntodo):
    handle = self._ready.popleft()
    if handle._cancelled:
        continue
    if self._debug:
        ...
    else:
        handle._run()
handle = None  # Needed to break cycles when an exception occurs.

最终事件循环被一轮一轮的循环驱动起来。

回到运行内容

在这个例子中,handle.run()运行了什么?想要解答这个问题,就要回到run_until_complete方法,看看协程被包装成Task(Future)的时候到底做了什么。

# tasks.ensure_future():

if coroutines.iscoroutine(coro_or_future):
    if loop is None:
        loop = events.get_event_loop()
    # 如果传入的是协程,就创建一个任务,将其包装起来
    task = loop.create_task(coro_or_future)
    if task._source_traceback:
        del task._source_traceback[-1]
    return task
elif futures.isfuture(coro_or_future):
    ...
    return coro_or_future
...

重点是中间通过create_task把协程创建成了任务,create就是实例化了一个新的task返回。这里来看看Task的构造函数:

# tasks.__init__():

...
self._loop.call_soon(self.__step, context=self._context)
...

其中做的最关键的就是将一个step函数加入到了事件循环中,loop.call_soon方法会直接将函数加入到ready队列里,在下次循环马上执行。然而这里的step函数是什么还不清楚,继续探究这里的step函数:

...
coro = self._coro
...
try:
    if exc is None:
        result = coro.send(None)
    else:
        result = coro.throw(exc)
except StopIteration as exc:
    if self._must_cancel:
        # Task is cancelled right before coro stops.
        self._must_cancel = False
        super().set_exception(futures.CancelledError())
    else:
        super().set_result(exc.value)
else:
    blocking = getattr(result, '_asyncio_future_blocking', None)
    if blocking is not None:
        if blocking:
            ...
            result._asyncio_future_blocking = False
            result.add_done_callback(
                self.__wakeup, context=self._context)
            ...
        else:
            ...
            self._loop.call_soon(
                self.__step, new_exc, context=self._context)

    elif result is None:
        # Bare yield relinquishes control for one event loop iteration.
        self._loop.call_soon(self.__step, context=self._context)
    ...

在这个函数的核心部分,可以看到每次都会用coro.send(None)来驱动协程运行一次。这一次运行协程到什么时候为止呢?主要有三种情况:

  1. 遇到Future创建,把Future返回出来。后面会继续详细解释这种情况。
  2. 遇到yield没有返回值,中断然后返回出来。这种情况对应最后elif result is None的判断,会直接将下一步加入到事件循环ready中。
  3. 协程里的函数执行到头了,会抛出StopIteration异常,表示执行完毕。调用super.set_result(注意super就是Future),将返回值设置好,结束Future的执行。

也就是说,每一步并不是以当前协程中函数内await进行步骤划分的,而是以最里层真正碰到yield的地方划分的。

进入await和Future底层

重新来看今天的例子:

import asyncio

async def main():
    res = await foo()
    print(res)

async def foo():
    await asyncio.sleep(1)
    return 1

asyncio.run(main())

在这个例子里面,main第一步开始执行后,并不是在await foo()停止,而是一直运行到asyncio.sleep(1)内部的yield语句。await如果后面是一个协程对象,则会进入协程的函数中继续执行。那来看一下sleep函数的构成:

async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    if loop is None:
        loop = events.get_event_loop()
    future = loop.create_future()
    h = loop.call_later(delay, futures._set_result_unless_cancelled, future, result)
    try:
        return await future
    finally:
        h.cancel()

看到,这里创建了一个future,并且将它的set_result函数注册到事件循环中,然后返回了await future。在asyncio中,Future对象是awaitable的,换句话说,他内部定义了叫做__await__函数。凡是定义了这个函数的,都可以被await,然后在被await的时候调用此函数。那么Future对象在被await的时候会返回什么呢?来到futures.__await__()函数:

def __await__(self):
    if not self.done():
        self._asyncio_future_blocking = True
        yield self  # This tells Task to wait for completion.
    if not self.done():
        raise RuntimeError("await wasn't used with future")
    return self.result()  # May raise too.

这里要注意,return才是返回值,yield返回的可不是返回值,而是可以理解为“挂起信息”。可以通过下面的程序理解这句话的含义:

import asyncio

class Foo():
    def __await__(self):
        print("foo")
        yield 1
        print("ok")
        return 2

async def main():
    print("haha")
    a = await Foo() #a 接受到的是return出来的返回值。
    print(a)
    # 协程函数执行到结束时,会抛出异常

coro = main()
print(coro.send(None))# 这里接受到的是挂起信息
print(coro.send(None))
'''Output
haha
foo
1
ok
2
Traceback (most recent call last):
  File "c:\Users\claws\learn\python\async_test\awaitable_test.py", line 17, in <module>
    print(coro.send(None))
StopIteration
'''

await后面需要是定义了await函数的对象,并且会将挂起信息不断向外传播。底层实现还是generator那一套。

所以在future中,这里暂时挂起,并且将自己作为挂起信息返回出去了,这个挂起信息会不断传播到step的coro.send那里,变成step中coro.send的返回值被接收到,就可以实现在Future中添加回调了。等到下个step运行coro.send的时候,会将result真正return回到await的地方,继续执行后面的代码。

所以sleep的逻辑就是,注册一个1s后会触发设置result的Future,这个Future返回到step中被挂上执行下一个step的回调,达到代码至少1s之后才会再次继续执行的效果。

结论

事件循环是个调度函数运行的循环,驱动着Future、Task中各个函数的触发,让异步系统有效运转。而最最底层的支持还是要看yield这个关键字,这个关键字让函数可以挂起,执行原先的代码,在下次请求执行的时候恢复。

扩展思考

思考以下代码的输出:

import asyncio

async def shoot():
    print("ping")
    await asyncio.sleep(1)
    print("pong")

async def main():
    print("start shoot")
    shoot_task = asyncio.create_task(shoot())
    print("shoot over")
    # leave

asyncio.run(main())

输出是

start shoot
shoot over
ping

为什么会有这样的输出呢?来一步一步分析整个程序执行的过程:

  1. asyncio.run(main()),后面调用loop.run_until_complete(main())
  2. loop.run_until_complete(main())中将main()协程包装成Task,然后将_run_until_complete_cb加入到该Task的回调函数中。这个cb的意思是call_back,而其中所做的事情就是停止循环。task的第一个step被放入循环的ready中。此时ready:[mainTask的step函数]
  3. 循环从ready中取出mainTask的step运行,在asyncio.create_task(shoot())时创建了一个新的任务。创建任务的过程中,也将该shoot任务的step加入循环ready,此时ready:[shootTask的step]
  4. main函数直接结束,触发StopIteration异常,让该Task触发set_result然后返回,这个时候mainTask因为被循环绑定了回调_run_until_complete_cb,会将_run_until_complete_cb放入循环的ready。此时ready:[shootTask的step,_run_until_complete_cb]
  5. 执行shootTask的step之后,停止在asyncio.sleep中,这里将一个Future的set_result放入循环的later堆里面。然后马上取出**_run_until_complete_cb**执行,循环不等sleep兑现就提前结束.

循环结束后,会重置循环到新的状态,如果后来又通过asyncio.run或其他方式重启循环,不会受到之前循环中残留工作的影响。