问一个关于 asyncio 的问题,当多个 task 运行,其中一个正确返回后,怎么取消其他协程?

2021-01-22 10:33:25 +08:00
 dongcheng

一个简单例子 import asyncio import time

特殊函数

async def get_request(url): print('正在下载',url) # time.sleep(2) 不支持异步的模块 会中断整个的异步效果 await asyncio.sleep(2) print('下载完成',url) return 'page_text'

def parse(task): print(task.result())

start = time.time() urls = ['www.xxx1.com','www.xxx2.com','www.xxx3.com']

tasks = [] #存放多任务 for url in urls: # 调用特殊函数 func = get_request(url)

# 创建任务对象
task = asyncio.ensure_future(func)

# 给任务对象绑定回调函数
task.add_done_callback(parse)
tasks.append(task)

创建事件循环对象

loop = asyncio.get_event_loop()

执行任务

loop.run_until_complete(asyncio.wait(tasks)) print('总耗时:',time.time()-start) #2.0015313625335693

可以在 get_request 判断任务是否成功,但是如何判断成功之后取消其他任务呢?

谢谢

2614 次点击
所在节点    Python
23 条回复
chanchancl
2021-01-22 10:46:51 +08:00
将 func 包装成一个 function

在结尾的地方,将 tasks 中,除了自己的 task 都 cancle 掉
pursuer
2021-01-22 10:52:53 +08:00
如果想要让协程提前结束等待,可以使用 asyncio.wait(...,return_when='FIRST_COMPLETED')或者更灵活的方案 asyncio.Future,然后处理下资源释放
dongcheng
2021-01-22 11:02:28 +08:00
@chanchancl 这个 func 能获取到 task 吗?
sujin190
2021-01-22 11:08:05 +08:00
你是不是想错了,对于协程来说,如果设计到 io,那么就算能取消协程其实并不能取消 io 操作

比如 http 请求,就算你要取消协程其实并不能取消已经发送出去的 http 请求,取消协程完全没有意义,如果你能取消 io,比如关闭 http 连接来取消 http 请求,那么你应该通过关闭 http 请求的方式来触发协程返回,即先取消 io 操作再由 io 完成来触发协程返回,而不能倒过来

对于非 io 请求,再整个协程完成前会独占线程,并不会调度到其他协程,所以你自然也不能用其他协程来取消当前正在运行的协程了
keepeye
2021-01-22 11:10:10 +08:00
不是有 tasks 吗?遍历一下都 cancel 掉
fiveelementgid
2021-01-22 11:10:13 +08:00
看见 Task 和 continuation 还激动了一下以为有. NET 玩家一起研究 TAP 了,一看是 Python,告辞
sujin190
2021-01-22 11:12:28 +08:00
对于非 io 请求协程再补充一点,似乎协程再创建的时候就会进行首次运行,没 io 操作,所以首次运行必然独占线程一直到运行完成,所以这种情况下完全做不了中途取消的操作
dongcheng
2021-01-22 11:20:08 +08:00
@sujin190 应该可以取消 task 吧。只是有局限。

@keepeye asyncio.Task.all_tasks()可以获取 all task 。但是现在有个需求是嵌套的 task,可以取消所有 task,不能取消第 2 层的 task (保留第一层)
sujin190
2021-01-22 11:26:56 +08:00
@dongcheng #8 但是没意义,不符合协程原理实现,从程序的可靠性严谨性来说也不应该出现这种设计,不关闭 io 而取消上层协程是很不科学的,很容易导致 io 、连接泄露啥的,而且还不容易排查是啥问题
xiaoHuang3
2021-01-22 11:48:58 +08:00
@sujin190 附议~,我觉得楼主这种需求根本不适合用 asyncio 来做- -
crclz
2021-01-22 12:10:44 +08:00
@fiveelementgid .NET 的设计就很清晰,TAP 、cancellation token

如果是.NET 的话,应该把一个 CancellationTokenSource ( cts )的 CancellationToken 传给所有异步方法,异步方法会返回 task,再等 Task.WhenAny 返回后,再取消 cts 。

由于 CancellationToken 是层层深入的( HTTP 、TCP ),所以底层负责 TCP 的代码也会取消正在进行的操作,然后会扔出一个取消异常,最后捕获这个异常就行了(在异步方法中)。

我想 python 的问题就是,底层的 api 都没有提供 cancellation token 的参数,导致无法让取消操作层层传播。
abersheeran
2021-01-22 12:21:18 +08:00
写过相关代码,在我的项目里使用过很久,一切良好。

https://gist.github.com/abersheeran/bd2372bb35fec859d7fca453ca5f7826
abersheeran
2021-01-22 12:32:01 +08:00
刚看到楼主在 #8 说有嵌套 Task 。很遗憾,asyncio 不支持嵌套取消。可以试试 trio,宣传上说是解决了这个问题。我也没试过。
optional
2021-01-22 12:33:41 +08:00
从设计上来说,这是不可取消的,如果有 transaction 对象之类,可以标记一下
fiveelementgid
2021-01-22 13:01:58 +08:00
@crclz Sure,我昨晚刚看完印象深刻,特地还去扒了一下源码。最底下就是 new 一个新的 Token 和上层传下来的 Token 然后 Link,整层整层地控制
dongcheng
2021-01-22 13:30:11 +08:00
@abersheeran 感谢分享代码。研究了下 asyncio.as_completed(tasks)可以实现类似的功能
zhuangzhuang1988
2021-01-22 13:36:50 +08:00
@crclz 微软还是最牛的
cancal 进度报告是 api 中设计的
xuboying
2021-01-22 13:41:24 +08:00
@abersheeran #12 个人感觉这个代码片段用类来实现可读性可能会更好一点。loop = loop or asyncio.get_running_loop()学到了。之前写的时候还没这个函数。。。
imn1
2021-01-22 13:44:33 +08:00
刚好有类似需求
没找到满意的解决方案
目前是用异步队列,当满足一定条件就清空队列,但条件满足时,已经并行启动的不能取消,要继续工作,只是清空了未启动的队列
js8510
2021-01-22 14:19:27 +08:00
同 @sujin190 。 我猜测可能想的是 multiprocessing 而不是 asyncio.
如果 pre request/ post response 有 cpu heavy 的处理。并发倒是有意义的。

你可以 map_async 。如果 i/o 还没发生,或者已经发生,你可以 terminate 掉其他 process. 如果正在 i/o blocking 你还是要 wait 然后 kill 掉。

这样调度有开销,要具体分析。。简单的 post 请求就返回应该不值得。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/747295

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX