python3 请问协程怎么 wait_for 一个 task list

2019-12-07 10:19:02 +08:00
 pmispig

版本 3.7.5

taskList = []
task = asyncio.create_task(nested())
task1 = asyncio.create_task(nested())
taskList.append(task)
taskList.append(task1)
await asyncio.wait_for(taskList timeout=10)

然而 wait_for 不支持 task list.
请问怎么创建一堆 task,给每个 task 设置一个超时时间,然后并发运行呢。 如果一个 async 函数里有 time.sleep() ,那么 wait_for 设置的超时时间就无效,请问这个怎么解决?当然正确的方法是用 asyncio.sleep().然而第三方的库里面,它用的是 time.sleep()呢,那么无法 wait_for()超时中断了吗,task 有没有 kill 机制呢

4653 次点击
所在节点    Python
17 条回复
wwqgtxx
2019-12-07 10:43:02 +08:00
在 async 函数中如果使用了 time.sleep(),无论如何都不可能中断的,也不可能 kill,除非你 kill 掉这个 thread 再重启一个 loop
如果一定要用该第三方库,请调用
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
把它单独丢到一个线程中(注意,python 中依然没有办法超时 Kill 一个 thread,这是非法的,当然你可以换一个 ProcessPoolExecutor 来把任务丢到一个新的进程,这样去 kill )
ClericPy
2019-12-07 10:43:38 +08:00
简单等待
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
并发运行 aws 指定的 可等待对象 并阻塞线程直到满足 return_when 指定的条件。

返回两个 Task/Future 集合: (done, pending)。

用法:

done, pending = await asyncio.wait(aws)
如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。

请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回。


time.sleep 本来就不是异步的, 非要用, 有三个方法吧

1. 模仿 gevent 猴子补丁把 time.sleep 给 hack 掉, 不建议
2. 动态语言可以加载以后动态修改原始函数, 把第三方的那个函数给修改掉成协程版本
3. 这种本来就不是异步函数的, 直接套个 executor 然后 await loop.run_in_executor 就可以了, 这个最保险
qcts33
2019-12-07 11:06:15 +08:00
等待一组 task 的话可以用 asyncio.gather()或 asyncio.as_completed()啊
不要用 time.sleep(),这个是不是异步的,应该用专门的 asyncio.sleep()
pmispig
2019-12-07 11:28:50 +08:00
@wwqgtxx 明白了,多谢
pmispig
2019-12-07 11:32:00 +08:00
@qcts33
@ClericPy
asyncio.wait 不能设置超时时间,我现在是
[await asyncio.wait_for(task,timeout=10) for task in taskList]
这么操作的。。有点辣眼睛
pmispig
2019-12-07 11:39:02 +08:00
@ClericPy 0.0 刚重新看了下文档,可以设置 timeout,与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。
keepeye
2019-12-07 11:41:49 +08:00
taskList = []
task = asyncio.wait_for(asyncio.create_task(nested()), 10)
task1 = asyncio.wait_for(asyncio.create_task(nested()), 10)
taskList.append(task)
taskList.append(task1)
await asyncio.gather(*taskList)
ClericPy
2019-12-07 11:45:36 +08:00
@pmispig #6 对的, asyncio 里大部分设计都比较明确, 就是 wait 和 gather 以及是否默认 cancel 这部分比较混乱. 又懒得装 trio 那些, 只能硬着头皮看源码看文档了
pmispig
2019-12-07 11:54:44 +08:00
```
import asyncio
async def nested():
await asyncio.sleep(10000)

async def main():
taskList = []
waitList = []
task = asyncio.create_task(nested())
task1 = asyncio.create_task(nested())
t1 = asyncio.create_task(asyncio.wait_for(task,timeout=10))
t2 = asyncio.create_task(asyncio.wait_for(task1,timeout=10))
waitList.append(t1)
waitList.append(t2)
await asyncio.gather(*waitList)


asyncio.run(main())
```
想了一种套娃实现...
0Y89tX3MgR4I
2019-12-07 12:44:31 +08:00
用 asyncio 不爽的一个地方就是所有的组件都要换成支持 asyncio 的
0Y89tX3MgR4I
2019-12-07 12:44:52 +08:00
@0Y89tX3MgR4I 不支持的只能放到一个单独的线程里面
jingcoco
2019-12-07 14:25:54 +08:00
小白,用 python 3.8 运行楼主的 报错 ........
pmispig
2019-12-07 14:38:02 +08:00
@jingcoco concurrent.futures._base.TimeoutError 报这个是正常的,目的就是为了引出这个异常
wwqgtxx
2019-12-07 15:22:45 +08:00
我自己写过一个给 task 加上 timeout 的辅助函数,你可以参考一下,不过这样输出的会是 asyncio.CancelledError
https://github.com/wwqgtxx/wwqLyParse/blob/master/wwqLyParse/common/asyncio.py#L182
wwqgtxx
2019-12-07 15:29:17 +08:00
不过其实 python 官方的 wait_for 的实现也是类似思路
https://github.com/python/cpython/blob/3.8/Lib/asyncio/tasks.py#L434
pmispig
2019-12-07 17:36:30 +08:00
@wwqgtxx 3Q,要改也挺麻烦得,还是直接套娃算了,task 套 task...
ClericPy
2019-12-07 18:22:29 +08:00
@pmispig #16 可以借鉴 aiohttp 依赖的 async-timeout

wait 之所以不主动 cancel 看它 api 就明白了, 区分了 pending 和 done, pending 的自己 cancel 就好了

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/626729

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX