Go 协程池解析~通俗易懂

2020-11-19 19:16:13 +08:00
 sunshinev

协程池主要是为了减少 go 协程频繁创建、销毁带来的性能损耗,虽然可以忽略不计,但是网上说特殊情况还是有用的。

那这个协程池通俗易懂来讲,比如老板给员工分配任务:

老板领了一堆任务,得找工人干活呀, 那领导就拿出一个任务,给一个空闲的员工 A,再把下一个任务,给另外一个空闲的员工 B 。

这时候 A 或者 B,指不定谁先忙完了

如果有人忙完了,领导就把下一个任务,给先忙完的人。A/B 就是协程池里面的两个协程

下面这段代码,完成了如下功能

  1. 协程池数量上限控制
  2. 协程空闲清理,释放内存
  3. 协程复用
package gopool

import (
	"context"
	"log"
	"sync"
	"time"
)

type Task func()

// boss 老板
type GoPool struct {
	MaxWorkerIdleTime time.Duration // worker 最大空闲时间
	MaxWorkerNum      int32         // 协程最大数量
	TaskEntryChan     chan *Task    // 任务入列
	Workers           []*worker     // 已创建 worker
	FreeWorkerChan    chan *worker  // 空闲 worker
	Lock              sync.Mutex
}

const (
	WorkerStatusStop = 1
	WorkerStatusLive = 0
)

// 干活的人
type worker struct {
	Pool         *GoPool
	StartTime    time.Time  // 开始时间
	TaskChan     chan *Task // 执行队列
	LastWorkTime time.Time  // 最后执行时间
	Ctx          context.Context
	Cancel       context.CancelFunc
	Status       int32 // 被过期删掉的标记
}

var defaultPool = func() *GoPool {
	return NewPool()
}()

// 初始化
func NewPool() *GoPool {
	g := &GoPool{
		MaxWorkerIdleTime: 10 * time.Second,
		MaxWorkerNum:      20,
		TaskEntryChan:     make(chan *Task, 2000),
		FreeWorkerChan:    make(chan *worker, 2000),
	}

	// 分发任务
	go g.dispatchTask()

	//清理空闲 worker
	go g.fireWorker()

	return g
}

// 定期清理空闲 worker
func (g *GoPool) fireWorker() {
	for {
		select {
		// 10 秒执行一次
		case <-time.After(10 * time.Second):
			for k, w := range g.Workers {
				if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime {
					log.Printf("overtime %v %p", k, w)
					// 终止协程
					w.Cancel()
					// 清理 Free
					w.Status = WorkerStatusStop
				}
			}

			g.Lock.Lock()
			g.Workers = g.cleanWorker(g.Workers)
			g.Lock.Unlock()
		}
	}
}

// 递归清理无用 worker
func (g *GoPool) cleanWorker(workers []*worker) []*worker {
	for k, w := range workers {
		if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime {
			workers = append(workers[:k], workers[k+1:]...) // 删除中间 1 个元素
			return g.cleanWorker(workers)
		}
	}

	return workers
}

// 分发任务
func (g *GoPool) dispatchTask() {

	for {
		select {
		case t := <-g.TaskEntryChan:
			log.Printf("dispatch task %p", t)
			// 获取 worker
			w := g.fetchWorker()
			// 将任务扔给 worker
			w.accept(t)
		}
	}
}

// 获取可用 worker
func (g *GoPool) fetchWorker() *worker {
	for {
		select {
		// 获取空闲 worker
		case w := <-g.FreeWorkerChan:
			if w.Status == WorkerStatusLive {
				return w
			}
		default:
			// 创建新的 worker
			if int32(len(g.Workers)) < g.MaxWorkerNum {
				w := &worker{
					Pool:         g,
					StartTime:    time.Now(),
					LastWorkTime: time.Now(),
					TaskChan:     make(chan *Task, 1),
					Ctx:          context.Background(),
					Status:       WorkerStatusLive,
				}
				ctx, cancel := context.WithCancel(w.Ctx)

				w.Cancel = cancel
				// 接到任务自己去执行吧
				go w.execute(ctx)

				g.Lock.Lock()
				g.Workers = append(g.Workers, w)
				g.Lock.Unlock()

				g.FreeWorkerChan <- w

				log.Printf("worker create %p", w)
			}
		}
	}
}

// 添加任务
func (g *GoPool) addTask(t Task) {
	// 将任务放到入口任务队列
	g.TaskEntryChan <- &t
}

// 接受任务
func (w *worker) accept(t *Task) {
	// 每个 worker 自己的工作队列
	w.TaskChan <- t
}

// 执行任务
func (w *worker) execute(ctx context.Context) {
	for {
		select {
		case t := <-w.TaskChan:
			// 执行
			(*t)()
			// 记录工作状态
			w.LastWorkTime = time.Now()
			w.Pool.FreeWorkerChan <- w
		case <-ctx.Done():
			log.Printf("worker done %p", w)
			return
		}
	}
}

// 执行
func SafeGo(t Task) {
	defaultPool.addTask(t)
}

1687 次点击
所在节点    程序员
4 条回复
pabupa
2020-11-19 19:53:01 +08:00
越俎代庖
fuis
2020-11-19 21:33:32 +08:00
脱裤子放屁
Weixiao0725
2020-11-19 21:39:17 +08:00
可能协程比线程轻量,不需要池子缓存?
koujyungenn
2020-11-20 10:01:57 +08:00
不懂就问,协程池有什么应用场景?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/727257

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX