golang 分享: 60 行代码巧妙实现一个高性能无 channel 任务队列

2023-03-03 20:12:39 +08:00
 Nazz

话不多说, 先上测试数据, 在各种负载下均有良好表现:

// small task
const (
	PoolSize   = 16
	BenchTimes = 1000
	N          = 1000
)

goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8   	    3302	    357841 ns/op	   55977 B/op	    2053 allocs/op
BenchmarkGopool
BenchmarkGopool-8           	    4426	    319383 ns/op	   20000 B/op	    1173 allocs/op
BenchmarkAnts
BenchmarkAnts-8             	    3026	    399899 ns/op	   16047 B/op	    1001 allocs/op
BenchmarkNbio
BenchmarkNbio-8             	    4314	    259668 ns/op	   48028 B/op	    3000 allocs/op
PASS
// medium task
const (
	PoolSize   = 16
	BenchTimes = 1000
	N          = 10000
)

goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8   	    1491	    808853 ns/op	   57635 B/op	    2008 allocs/op
BenchmarkGopool
BenchmarkGopool-8           	    1377	    870051 ns/op	   17266 B/op	    1029 allocs/op
BenchmarkAnts
BenchmarkAnts-8             	     886	   1324236 ns/op	   16054 B/op	    1001 allocs/op
BenchmarkNbio
BenchmarkNbio-8             	    1324	    836092 ns/op	   48000 B/op	    3000 allocs/op
PASS
// large task
const (
	PoolSize   = 16
	BenchTimes = 1000
	N          = 100000
)

goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8   	     193	   6026196 ns/op	   58162 B/op	    2004 allocs/op
BenchmarkGopool
BenchmarkGopool-8           	     178	   6942255 ns/op	   17108 B/op	    1019 allocs/op
BenchmarkAnts
BenchmarkAnts-8             	     174	   6300705 ns/op	   16157 B/op	    1002 allocs/op
BenchmarkNbio
BenchmarkNbio-8             	     176	   7084957 ns/op	   48071 B/op	    2995 allocs/op
PASS

测试代码 Benchmark

代码实现

package bench

import (
	"sync"
)

type (
	WorkerQueue struct {
		mu             *sync.Mutex // 锁
		q              []Job       // 任务队列
		maxConcurrency int32       // 最大并发
		curConcurrency int32       // 当前并发
	}

	Job func()
)

// NewWorkerQueue 创建一个任务队列
func NewWorkerQueue(maxConcurrency int32) *WorkerQueue {
	return &WorkerQueue{
		mu:             &sync.Mutex{},
		maxConcurrency: maxConcurrency,
		curConcurrency: 0,
	}
}

// 获取一个任务
func (c *WorkerQueue) getJob(delta int32) Job {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.curConcurrency += delta
	if c.curConcurrency >= c.maxConcurrency {
		return nil
	}
	if n := len(c.q); n == 0 {
		return nil
	}
	var result = c.q[0]
	c.q = c.q[1:]
	c.curConcurrency++
	return result
}

// 递归地执行任务
func (c *WorkerQueue) do(job Job) {
	job()
	if nextJob := c.getJob(-1); nextJob != nil {
		go c.do(nextJob)
	}
}

// Push 追加任务, 有资源空闲的话会立即执行
func (c *WorkerQueue) Push(job Job) {
	c.mu.Lock()
	c.q = append(c.q, job)
	c.mu.Unlock()
	if item := c.getJob(0); item != nil {
		go c.do(item)
	}
}

如果觉得对你有帮助, 麻烦给 gws 点个赞吧:)

5079 次点击
所在节点    分享创造
29 条回复
MindMindMax
2023-03-05 04:16:09 +08:00
对于常规项目价值在哪? channel 的价值又在哪?
Nazz
2023-03-05 07:43:55 +08:00
@MindMindMax 尽量使用 mutex 替代 chan. 很多时候保证线程安全就行了,不需要多线程通信. channel 我用得最多的地方是线程同步和超时控制.
Nazz
2023-03-05 08:18:48 +08:00
@chuanqirenwu 确实有 EventLoop. 最开始我是模仿的 JS, 因为我认为 JS WebSocket API 比 gorilla/nhooyr 这些提供的都要清晰得多. 初版只有 Sync IO, Read=>Event Handler=>Write 循环往复. 后面在此基础上加了 Async IO, AIO 模式在每个连接上有读写两个任务队列(并发度分别是 N 和 1), 就是我分享的这个实现, 它需要足够的轻量. 两种模式压测表现都比 gorilla 好得多, 原因大概是 Parser 本身的简单高效和无额外常驻协程吧, 如果有, 协程数量会增加一倍.
chuanqirenwu
2023-03-05 12:45:03 +08:00
@Nazz 👍,虽然不怎么搞 go ,但感觉这个思路挺不错的。我看 README 的简介写的是 go websocket server ,是只支持 server 端吗? client 端没有实现?
Nazz
2023-03-05 13:16:43 +08:00
@chuanqirenwu 刚实现的 client ,还在测试
rockuw
2023-03-13 10:21:57 +08:00
mutex 是比 channel 轻量,但是每个 job 新建一个 goroutine 也是有代价的。一个简单的固定 goroutine 数量的实现,测试结果还稍微好一些,分配次数则明显更低:

```
N=10000
goos: linux
goarch: amd64
pkg: muwu.com/example/workerqueue
cpu: Intel(R) Xeon(R) CPU E5-2682 v4 @ 2.50GHz
BenchmarkGwsWorkerQueue-8 903 1310335 ns/op 55471 B/op 2010 allocs/op
BenchmarkGopool-8 897 1394589 ns/op 17926 B/op 1059 allocs/op
BenchmarkAnts-8 1203 1020211 ns/op 16046 B/op 1001 allocs/op
BenchmarkNbio-8 956 1278696 ns/op 48017 B/op 2999 allocs/op
BenchmarkChan-8 1004 1181569 ns/op 16016 B/op 1001 allocs/op
```

```go
type workerQueueV1 struct {
maxConn int
queue chan Job
}

func newWorkerQueueV1(n int) *workerQueueV1 {
wq := &workerQueueV1{
maxConn: n,
queue: make(chan Job, 1024),
}
for i := 0; i < n; i++ {
go func() {
for job := range wq.queue {
job()
}
}()
}
return wq
}

func (wq *workerQueueV1) Push(job Job) {
wq.queue <- job
}
```
Nazz
2023-03-13 11:19:56 +08:00
@rockuw 从你的 Benchmark 结果来看, 差距不大. GwsWorkqueue 是专门为 IO 任务设计的, 每个 WebSocket 连接上有读写两个任务队列, 它们非常轻量, 而且并行读写不会新增常驻协程. 量变产生质变, 每个连接上都增加常驻协程会使 CPU 使用率提高不少. 实际业务中并发不会很高, 可以用优先队列替代普通队列减少 allocs, 收益不高我懒得去优化了, 复用 goroutine 对于 IO 任务收益也不大.
ClarkAbe
2023-03-23 14:49:55 +08:00
感觉每次都起一个协程有点浪费...
Nazz
2023-03-23 16:32:22 +08:00
@ClarkAbe 为了防止栈溢出. 如果做了容量限制, 可以不开新协程.

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/920932

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX