求 go 并发限制的最佳实现

2021-02-08 12:01:29 +08:00
 monkeyWie

情况如下: 有 N 个任务,每个任务执行完都会返回结果或者 error,通过固定的(M)协程去执行,如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表。

我自己写了一版,感觉有点复杂:https://play.golang.org/p/ono1S04XupK

不知道各位有没有什么更简单的实现。

7040 次点击
所在节点    Go 编程语言
70 条回复
KaynW
2021-02-08 15:53:11 +08:00
caiych
2021-02-08 16:05:58 +08:00
一个比较简单的实现
https://play.golang.org/p/MDU_x7C2npI

如果有一个 routine 有 error,ctx 会 Done
如果 ctx 已经 Done 了,semaphore 会 error
wpf375516041
2021-02-08 17:03:40 +08:00
package main

import (
"fmt"
"net/http"
)

/*有 N 个任务,每个任务都会返回结果或者 error,通过固定的并发数(M)去执行。
如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表*/
func main() {

n := 10
m := 5
result := make([]string, n)
limitCh := make(chan interface{}, m)
errCh := make(chan error)
doneCh := make(chan interface{},1)

defer func() {
close(limitCh)
close(errCh)
}()

for state, i := true, 0; i < n; i++ {
state = true
for state {
select {
case limitCh <- nil:
fmt.Printf("开始第%d 个任务\n", i)
go func(i int) {
var err error
defer func() {
if i == n-1 {
close(doneCh)
}
if err != nil {
errCh <- err
}
<-limitCh
}()
ret, err := doTask()
if err != nil {
return
}
result[i] = ret
}(i)
state = false
case <-errCh:
return
default:
}
}
}
<- doneCh
fmt.Println(result)

}

func doTask() (string, error) {
// 模拟执行任务
resp, err := http.Get("https://www.baidu.com")
if err != nil {
return "", err
}
defer resp.Body.Close()
return resp.Status, nil
}
monkeyWie
2021-02-08 18:17:31 +08:00
@caiych #22 这个做不到有一个 task 发生 error 立即结束,例如: https://play.golang.org/p/sqlMbgW7z9Z
monkeyWie
2021-02-08 18:18:55 +08:00
@caiych #22 比如第一个任务执行已经失败了,需要立即返回,而不是等到所有任务执行完
monkeyWie
2021-02-08 18:28:10 +08:00
@wpf375516041 #23 这个好像也有点问题哦,就是判断任务全部执行完成的地方

```
if i == n-1 {
close(doneCh)
}
```
这里判断最后一个任务执行完成就结束,但是可能会存在还有正在执行的任务并且比最后一个任务执行还慢,就不对了。
caiych
2021-02-08 18:54:42 +08:00
@monkeyWie 你仔细看一下打出来的内容,第一批开始执行的任务里没有 1 ( 1 没有抢到信号量)。当 1 抢到信号量开始执行后整个程序都结束了(其他的任务都取消了,不然如果 i 那个循环大的话整个程序会运行很久)
rrfeng
2021-02-08 19:36:10 +08:00
1. 声明一个 buffered chan 当做 goroutine 池子 。
2. 启动 goroutine 的时候传入 context 用做取消(前提是你的 goroutine 任务可以取消)。
ginjedoad
2021-02-08 20:29:53 +08:00
兄弟,errgroup 就是为了你这个场景设计的。不要重复造轮子了
ginjedoad
2021-02-08 20:30:42 +08:00
说只要一个错误不能马上中断返回的,自己好好审计一下 errgroup
monkeyWie
2021-02-08 22:29:21 +08:00
@caiych #27 错误发生时不会立刻结束,而是会等正在执行的任务全部完成才返回,你可以跑这个试试: https://play.golang.org/p/66Me2TYbVoK

错误发生了也要等 5 秒才结束。
monkeyWie
2021-02-08 22:32:04 +08:00
@ginjedoad #30 老哥贴个代码我跑一下看看,我自己测的 errgroup 是不能发生错误立即中断的
wpf375516041
2021-02-08 22:55:03 +08:00
@monkeyWie 是滴 用 waitgroup 大家说的都对 其实你把原来的封装下 搞个协程池 代码就清晰了
wpf375516041
2021-02-08 23:16:58 +08:00
我觉得这是个很好的面试题,既有实际意义也考验基本功,大家可以试试不用三方库实现一下~
talk is cheap, show me the code
一起娱乐娱乐,新年快乐~!
1. 控制并发
2. 等待所有任务返回
3. 一个任务错误,立刻结束

如果不解耦,并发控制和结果处理的逻辑混杂确实屎

go 实现的时候想当然了,只以最后提交的任务判断是否结束

kotlin 协程实现的时候发现 io,计算任务无法退出,必须要手动捕捉中止信号

java 须要手动捕捉中止信号 但是可以通过 thread.stop()强制停止,另外判断线程是否异常退出较难
caiych
2021-02-09 00:06:49 +08:00
@monkeyWie

最开始的版本里注释有写 DoWork 里的操作需要支持 context cancellation (比如如果操作是 http.Get 的话,可以使用 http.NewRequestWithContext)

这个里面实现了一个如果调用的操作不支持 context cancellation 的情况
https://play.golang.org/p/Cot1FYgIKLd
ooh
2021-02-09 00:11:53 +08:00
monkeyWie
2021-02-09 08:45:30 +08:00
@wpf375516041 #34 哈哈,搞不好会加入大厂面试题库
monkeyWie
2021-02-09 08:48:38 +08:00
@caiych #35 实际上很多 work 是不支持 cancel 的,而且也不一定要 cancel 掉,只要不阻塞主协程就行了,发送错误的时候主协程继续执行,其它正在执行的任务让它继续跑。
monkeyWie
2021-02-09 09:47:54 +08:00
@caiych #35 不好意思前面没看仔细,这个确实可以,赞一个!
guonaihong
2021-02-09 09:51:10 +08:00
难道 slice+context.Context+errgroup 的组合不行?

1.分配 M 容量的 slice 。放 M 个 go 程正常运行的结果。每个 go 程都有自己的 index,所以存结果这块都不要加锁,相当舒服的操作。

2.context 当作异常终端点。每个 go 程都持有这个 context 变量。任意一个 go 程错误,cancel 。任意 go 程检查 ctx.Done()。所谓 “如果其中有一个任务返回 error 时立即结束” 完美实现。

3.检查 errgroup 的返回,err != nil,就返回错误,else 部分没有错误,返回第 1 步声明的 slice

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/752304

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX