求 go 并发限制的最佳实现

2021-02-08 12:01:29 +08:00
 monkeyWie

情况如下: 有 N 个任务,每个任务执行完都会返回结果或者 error,通过固定的(M)协程去执行,如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表。

我自己写了一版,感觉有点复杂:https://play.golang.org/p/ono1S04XupK

不知道各位有没有什么更简单的实现。

7055 次点击
所在节点    Go 编程语言
70 条回复
crclz
2021-02-09 09:54:50 +08:00
分解问题:
1. 通过固定的 M 个携程执行。
解决方案:信号量或者条件变量或者 channel,很多种方法实现。

2. 只要出现一个任务返回 Error,就立即结束全部任务。
解决方案:C#的 TAP 的 Task.WhenAny 和 CancellationToken 模式可以很好的解决这些问题。
那么只需要用 go 来实现 Task.WhenAny 和 CancellationToken 模式即可。

CancellationToken 很好实现,用 struct {IsCancellationRequired: bool}就可以实现。
Task.WhenAny 可以用如下方式实现:有一个 chan ch,每个任务完成后,往 ch 里面写一个数字(或者写入任务信息),同时主线程阻塞读取 ch 。
monkeyWie
2021-02-09 09:57:02 +08:00
@guonaihong #40 应该行的,但是对第一点有点疑问,用 slice 怎么实现 M 个协程的限制呢
guonaihong
2021-02-09 10:01:11 +08:00
//放结果伪代码
// 每个 go 程的 id 已经固定下来,就是 for 循环启动的 index.大家操作自己的私有 index 。为啥会有竞争?
for i :=0;i < M;i++ {
i:=i
go func(){
slice[i] = result
}
}
jackeliang
2021-02-09 10:09:47 +08:00
@monkeyWie 发生错误不能立即返回的,需要等待其它协程结束才行,不然协程泄露了。
aeli
2021-02-09 10:11:36 +08:00
所有在并发里要求错误立即中断所有并发返回的,显然是脑抽抽
dawniii
2021-02-09 10:13:53 +08:00
当其中有一个协程有错误,另一个协程在密集计算,应该是没法打断直接返回的吧
SignLeung
2021-02-09 11:10:36 +08:00
楼上说的没错,协程好像只能自己结束
seth19960929
2021-02-09 11:11:06 +08:00
N 个的容量的 success channel.
再来一个 err channel

然后主线程那里 select 这两个 channel 做事情就可以啦.

----------------------
至于你纠结 err 之后协程能不能关闭, 那个不是你关心的事情了. 可以考虑传递一个 context 给 request, err 发生错误的时候进行 cancel context 即可.
liyunlong41
2021-02-09 11:24:29 +08:00
https://play.golang.org/p/YYvynelzIHj
感兴趣写了下,如果想出现错误后中断其他 goroutine 的处理,其他 goroutine 必须可以被 cancel 掉。
asAnotherJack
2021-02-09 12:47:03 +08:00
errgroup 可以实现出错时结束流程,前提是你的代码实现了 cancel 逻辑
Aoang
2021-02-09 14:47:03 +08:00
并发限制用协程池,退出机制需要自己实现。

如果懒得实现,就直接摘出来,用系统的实现。main 启动之后开一个协程监听退出信号,收到退出信号之后直接 os.Exit()

其实还是应该自己在代码中实现出来,那个操作只能玩玩。看看源码,用 runtime.Goexit() 来终止协程。

errgroup context 都是用来传递消息的,并不是来做终止的。你用这两个来做的话,只能每进行一步就检查一下是否有退出信号,不然就做不到及时退出。
teawithlife
2021-02-09 14:48:32 +08:00
@liyunlong41 #49 你好,请教一下,在 61 行的这一部分,为什么两个 channel 之间互相要写数据?
```
select {
case err := <-errCh:
handleErr(err, result[1:])
<-done
case <-done:
if len(errCh) > 0 {
err := <-errCh
handleErr(err, result[1:])
return
}
fmt.Println("success handle all task:", result[1:])
}
```

改成这样是否可以?
```
select {
case err := <-errCh:
handleErr(err, result[1:])
case <-done:
fmt.Println("success handle all task:", result[1:])
}
```
Aoang
2021-02-09 15:01:42 +08:00
@teawithlife #52

因为他代码中 errCh 和 done 作用是一样的,一个 chan 就可以了,检查 chan 中的内容是出错退出的还是最终完成退出就行了。
liyunlong41
2021-02-09 15:28:38 +08:00
@teawithlife 没有相互写数据,是为了一些健壮性考虑吧。第一个 case 如果从 errCh 中读到 err 了,<-done 表示等待其他 goroutine 也都结束,是怕有 goroutine 泄露。第二个 case 是考虑可能有极小概率 errCh 和 done 同时有数据,select 随机选择了 done channel,所以最终再判断下 errCh 是否有 err 。
monkeyWie
2021-02-09 19:50:17 +08:00
@liyunlong41 #49 目前 35L 这种应该是最优雅的实现,我们用纯标准库实现的还是太复杂了哈哈
teawithlife
2021-02-09 20:55:51 +08:00
@liyunlong41 #54 “相互写数据”这个表述不太严谨。不过你说的有道理,从健壮性来说,确实需要考虑这些极端情况。


@monkeyWie #55 个人觉得 35L 的写法确实比较优雅,但是从效率来说,还是 49L 的写法更好一些,因为 49L 的协程数量是 M 个,而 35L 的协程数量是 N 个,当 N>>M 时,虽然 golang 的协程足够轻量,但是也没必要这么浪费。
monkeyWie
2021-02-09 20:59:03 +08:00
@teawithlife #56 35L 协程数量其实是 N 个,用信号量做了控制的
monkeyWie
2021-02-09 21:00:12 +08:00
@monkeyWie #57 说出了,是 M 个
teawithlife
2021-02-10 08:29:04 +08:00
@monkeyWie #57 协程有 N 个,只不过通过信号量保证了其中只有 M 个在跑,其他的协程虽然在等待,也需要消耗少量资源
Nillouise
2021-02-10 09:52:31 +08:00
我之前研究过 go 的流控怎么写,感觉还是流控好,流控还可以控制几秒才做一个任务,单纯控制并发感觉不如流控。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/752304

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX