不死心,再来问一遍关于 Python 的 asyncio 问题

2019-07-22 17:42:42 +08:00
 waibunleung

有两段代码,是关于 asyncio 的。
代码段一:

import asyncio

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

async def main():
    print('before await')
    await worker_1()
    print('awaited worker_1')
    await worker_2()
    print('awaited worker_2')

%time asyncio.run(main())

########## 输出 ##########

before await
worker_1 start
worker_1 done
awaited worker_1
worker_2 start
worker_2 done
awaited worker_2
Wall time: 3 s

代码段二:

import asyncio

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    await task1
    print('awaited worker_1')
    await task2
    print('awaited worker_2')

%time asyncio.run(main())

########## 输出 ##########

before await
worker_1 start
worker_2 start
worker_1 done
awaited worker_1
worker_2 done
awaited worker_2
Wall time: 2.01 s

问题:代码段一里面的协程(coroutine)换成代码段二的任务(task)后,为什么执行顺序就变了?这个过程中发生了什么事情?

说说我的猜想:
发现调用 asyncio.run(main()) 或者 [asyncio.gather()->asyncio.get_event_loop()->loop.run_until_complete()]都会将一个 coroutine 转化成 task/future 再放 event loop 里面去, 交由 event loop 去管理这些 task/future。代码段一只将 main()这个 coroutine 封装成了 task 加入到 event loop 中,所以整个 event loop 中只有一个 task 在走,在这个 task 中代码是顺序执行的,所以最后呈现出同步执行的结果;
但是代码段二调用了两次 asyncio.create_task(),这个方法会将一个 coroutine 转换成一个 task 并且放到 event loop 中,所以整个 event loop 其实有三个 task ( main,task1,task2 ),之后程序就交给 event loop 来调度,执行顺序就变不同了。 这个假设目前来看好像能解释得通

最后,希望各位能指点一下~

6278 次点击
所在节点    Python
90 条回复
guokeke
2019-07-22 18:49:22 +08:00
我发不了外链了。。。
waibunleung
2019-07-22 18:49:23 +08:00
@dbow 还是看不到的
dbow
2019-07-22 18:49:29 +08:00
看上图,call_soon 把 task 加入 loop._ready, loop.run_once 把_ready 的东西 popleft 出来, 开始执行。
waibunleung
2019-07-22 18:50:09 +08:00
@guokeke 直接贴应该可以吧? github 的都可以发?
guokeke
2019-07-22 18:50:20 +08:00
base_events create_task
tasks __init__
base_events call_soon
base_events _call_soon
_call_soon 的时候创建好 handle events
dbow
2019-07-22 18:50:32 +08:00
图床是 imgur 可能被墙掉了
waibunleung
2019-07-22 18:50:36 +08:00
@dbow 老哥我真的看不到图...
guokeke
2019-07-22 18:51:17 +08:00
v2 这个判断逻辑有问题,只是文件名字,都没有 http 就说看起来像垃圾链接。。。
dbow
2019-07-22 18:52:39 +08:00
def call_soon(self, callback, *args, context=None):
"""Arrange for a callback to be called as soon as possible.

This operates as a FIFO queue: callbacks are called in the
order in which they are registered. Each callback will be
called exactly once.

Any positional arguments after the callback will be passed to
the callback when it is called.
"""
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
在事件循环里
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)

# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
waibunleung
2019-07-22 18:52:45 +08:00
@guokeke 创建好 handle events 之后是要等到下一次循环的时候才开始执行吧?什么时候会进入到事件循环的下一轮?
meik2333
2019-07-22 18:53:39 +08:00
fzzff
2019-07-22 18:54:17 +08:00
好好理解理解 create_task...看的我有点尴尬
waibunleung
2019-07-22 18:55:59 +08:00
@dbow 按照你的意思,结合代码二,什么时候会进入下一批调度?
waibunleung
2019-07-22 18:58:40 +08:00
@fzzff 嗯嗯,我会努力的。同时也祝愿您下次评论的时候除了这种没有营养的话也多能说说有建设性的东西呢~
可以接受批评,但拒绝无实际帮助的评论呢,亲
waibunleung
2019-07-22 18:59:00 +08:00
@meik2333 谢谢你啦~
cxyfreedom
2019-07-22 19:14:35 +08:00
task 部分上面说了,如果你还要再深入的话,事件循环部分细节需要去看 _asynciomodule.c 里面的内容了
fzzff
2019-07-22 19:34:45 +08:00
首先对刚刚评论的不礼貌向你道歉,这个 creat_task 跟 tornado 中的 add_callback 有些相似,这个 creat_task 再被调用时会直接执行传入的协程对象,并且默认不等待协程结果的执行完毕,你在调用 await worker 的时候在会开始等待 Task 对象执行完成,下面这段代码应该会对理解有些帮助,可以单步走下:
import asyncio


async def worker_1():
print('worker_1 start')
print('worker_1 done')
return '123'


async def worker_2():
print('worker_2 start')
# await asyncio.sleep(0)
print('worker_2 done')
return '234'


async def main():
task1 = asyncio.create_task(worker_1())
task2 = asyncio.create_task(worker_2())
print('before await')
print('awaited worker_1')
print('awaited worker_2')
print(task1)
print(task2)

asyncio.run(main())
ilucio
2019-07-22 19:44:38 +08:00
说说我的理解,这两段代码都创建了 3 个 task,但是对于代码 1,这三个 task 的执行顺序是 main-> work1->work2,对于代码 2,这三个 task 的执行顺序是 main-> work1,main->work2,并且可以认为 work1 和 work2 是同时执行的。
waibunleung
2019-07-22 19:45:41 +08:00
@cxyfreedom 相关注释我也看到了,但是由于没有 C 基础,就先不勉强自己,谢谢你啦
waibunleung
2019-07-22 19:48:05 +08:00
@fzzff 好的谢谢你,我试一下

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/585164

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX