关于 Python 协程的 event loop 与 future

157 天前
 sgld

最近在看这个视频

Python AsyncIO 从入门到放弃

前面 生成器、yield from 、native coroutine 还理解的上,到了后面两节,event loop 与 future 的时候就有点懵。 event loop 还好,轮循检查有没有待执行任务,然后执行任务

但是结合了 future 以后,感觉脑子就转不过来了,啥时候让出权限,啥时候任务加进时间循环,啥时候恢复执行。

单独看片段还理解,连一起有点晕了。

想问问大佬们有没有纯文字 + 代码版的相关解释或者博文、教程之类的。

2713 次点击
所在节点    Python
21 条回复
cyaki
157 天前
写 curio 的作者油管上有不少关于这块的
lxy42
157 天前
500 Lines or Less
A Web Crawler With asyncio Coroutines

A. Jesse Jiryu Davis and Guido van Rossum

https://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html
thevita
157 天前
分享一下我的看法,我理解这些概念不太喜欢先深入细节,有全局视角再看细节

无栈协程的核心就是 把顺序代码变成一种状态机,不同语言的实现差异很大,但逻辑差不多

(其实我们如果不用 coroutine, 写事件驱动应用 就是手写这个状态机)

await 就是状态转移点,从一个 await 完成 到下一个代码路径上的 await 就是一次状态转移

将这一小段代码封装起来 就叫 task, 这就是 事件循环执行的基本单元(不同语言实现也不一样,python 应该是依靠 生成器状态机来实现,rust ,c++ 则靠编译器)

future/awaitable 作用是管理 task 之间的依赖关系,在某个 task 的 future done 的时候,将依赖它的 task 放进就绪队列等待执行(不同实现也不一样,比如 直接通过 callback )

所以:


- 啥时候让出权限: 一个 task 完成的时候
- 啥时候任务加进事件循环: 这个任务的依赖 future done 的时候 (实现可以都不一样,单实践效果一定是这样的)
- 啥时候恢复执行: 进如 ready 队列了,就等待执行了,自于啥时候执行,就是 队列和 调度器的实现了,也都不一样

----


正好前段时间看了 foundationdb ,他们自己实现了一个 叫 flow 的语言,在 < c++20 上实现了无栈协程,它的编译器会把 flow 的代码编译成 C++ 的状态机,可以清晰的看到怎么把代码转成状态机
009694
157 天前
你执行 await 语句的时候,就会出让执行权。也就是说,如果你在一组代码里面没有任何 await 语句的话,这段代码是完全同步的。 当你 await 的 io 事件发生之后,执行权就回到你的下一行代码了。 future 对象就是一个预留的桩子 告诉你“你委托我执行的异步代码,等下就用这个 future 对象获取结果”。
Trim21
157 天前
曾经尝试过搓一个 eventloop ,然后因为懒得像 uvloop 一样搓到跟 asyncio 100%兼容所以就放弃了。

如果你不考虑网络 IO 的话,事件循环本身是非常简单的,实际上就是 event loop 上面的 call_soon 、call_at 以及 call_later 三个方法... 你可以继承一下现有的事件循环,然后在这三个方法上打 log ,然后写一段简单的 async/await 程序,就能看到你生成 future 之类的对象到底干了什么。

python 的 Future 对象有一个 c 版本,也有一个 python 版本的,你可以直接去看源码。看看他什么时候调用我前面说的几个方法。
coldear
157 天前
songray
156 天前
没那么复杂,比如我们有一个 async 函数 foo ,代码执行到 await 的时候,控制权就从 foo 函数让出到别的代码块了,同时向待完成列表里插入 foo 。
等到 foo 的 await 任务完成后,就会向 eventloop 中插入类似于 “foo 已经完成啦,你应该继续 foo 的后续操作”的 task 。
等到 eventloop 循环到这个 task ,就会恢复上下文(也可以说是状态)到 foo ,这也就是为啥无栈协程也可以看做是一种状态机。
希望我的解释比较明朗。
sgld
156 天前
@thevita 感谢大佬,我理解一下
sgld
156 天前
@Trim21 源码太复杂了,有涉及到很多情况,现在在看上面视频里面 up 写的简易版本的,
sgld
156 天前
用 ai 辅助修改了下代码,然后加了一些 print ,直观的了解了下执行流程

import time
import random
from collections import deque
from itertools import count
import heapq

count_id = count(1)

class Future:
def __init__(self):
self._done = False
self._result = None
self._callbacks = []
self._cancelled = False
self._id = f'Future-{next(count_id)}'

def add_done_callback(self, fn):
if self._done:
fn(self)
else:
self._callbacks.append(fn)

def set_result(self, result):
self._result = result
self._done = True
for cb in self._callbacks:
cb(self)

def __await__(self):
if not self._done:
print(f"Future {self._id} is not done, waiting...")
# 这里的 self 是一个 Future 对象, 需要在调用时使用 await 关键字
yield self
return self._result

class Task(Future):
def __init__(self, coro):
super().__init__()
self.coro = coro
print(f"任务初始化, 任务 ID: {self._id}")
loop.call_soon(self.run)

def run(self):
try:
result = self.coro.send(None)
except StopIteration as e:
"""执行"""
print(f"任务 {self._id} 执行完毕, 结果: {e.value}")
self.set_result(e.value)
else:
if isinstance(result, Future):
result.add_done_callback(self._wakeup)
print(f"Task {self._id} is waiting for Future {result._id}")
def _wakeup(self, future: Future):
"""
This method is called when the future is done.
It schedules the task to run again.
"""
print(f"等待完成, 唤醒任务 {self._id}, 结果: {future._result}")
loop.call_soon(self.run)

class EventLoop:
def __init__(self):
self._ready = deque()
self._scheduled = []
self._stopped = False

def call_soon(self, callback, *args):
self._ready.append((callback, args))

def call_later(self, delay, callback, *args):
heapq.heappush(self._scheduled,
(time.time() + delay, callback, args))

def stop(self):
self._stopped = True

def create_task(self, coro):
return Task(coro)

def run_forever(self):
while not self._stopped:
self.run_once()

def run_once(self):
now = time.time()
while self._scheduled and self._scheduled[0][0] <= now:
_, cb, args = heapq.heappop(self._scheduled)
self._ready.append((cb, args))

num = len(self._ready)
for _ in range(num):

# 取出一个任务, 执行它
cb, args = self._ready.popleft()
print(f"----> 执行 {cb}({args}) ---->")
cb(*args)

async def smallrun():
print("Start smallrun")

# 创建一个 Future 对象
# 代表一个将来的结果, 但是现在还不知道结果是什么
fut = Future()
print(f"Future {fut._id} created")
# 功能模拟 --- 随机延迟, 模拟 IO 操作
# IO 结束以后, 调用 fut.set_result(None)
delay = random.random()
loop.call_later(delay, fut.set_result, None)

await fut
print("End smallrun after", delay)
return delay

async def bigrun():
print("Start bigrun")
delay = await smallrun()
print("End bigrun with", delay*10)
return delay * 10

async def main_task():
print("Main task start")
result = await bigrun()
print("Final result:", result)

if __name__ == "__main__":
loop = EventLoop()
loop.create_task(main_task())

# 2.1 秒后停止事件循环
loop.call_later(2.2, loop.stop)
loop.run_forever()
sgld
156 天前
@lxy42 好的好的,我看看这个,🙏感谢资源
sgld
156 天前
@cyaki 是哪位呀
cyaki
156 天前
@sgld David Beazley, 在 youtube 上搜就行
iorilu
155 天前
有本书很好 Python Concurrency with asyncio

当然这书内容太多, 不需要都看, 我也只看了一小部分足够了

除非是想独自开发 asyncio 相关的框架, 一般人了解下就行了
kivmi
153 天前
@thevita CV 大神
kivmi
153 天前
@009694 future + callback 更好理解吧
kivmi
153 天前
@songray 正解
UN2758
109 天前
@sgld #10 你这不加个代码缩进,直接看头痛
pyKane
99 天前
那习惯了异步,再也回不去了。
以前 PY 做高并发想都不敢想,
现在轻轻松松。很多 SDK 没有异步,就自已去写。
Neonyuyang
94 天前
@coldear 打不开,老师

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1127399

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX