golang master-worker 模式,有不懂的地方,求解答

2019-08-09 14:13:10 +08:00
 Bruin
package main

import (
	"fmt"
	"math/rand"
	"strconv"
	"time"
)

type Payload struct {
	name string
}

//任务
type Job struct {
	Payload Payload
}

//任务队列
var JobQueue chan Job

//  执行者 消费者 工人
type Worker struct {
	WorkerPool chan chan Job //对象池
	JobChannel chan Job      //通道里面拿
	quit       chan bool     //
	name       string        //工人的名字
}

// 调度器
type Dispatcher struct {
	name       string        //调度的名字
	maxWorkers int           //获取 调试的大小
	WorkerPool chan chan Job //注册和工人一样的通道
}

//打游戏
func (p *Payload) Play() {
	fmt.Printf("%s 打 LOL 游戏...当前任务完成\n", p.name)
}

// 新建一个工人
func NewWorker(workerPool chan chan Job, name string) Worker {
	fmt.Printf("创建了一个工人,它的名字是:%s \n", name)
	//workerPool 确定 woker 的容量
	return Worker{
		name:       name,          
		WorkerPool: workerPool,     
		JobChannel: make(chan Job),
		quit:       make(chan bool),
	}
}

// 工人开始工作

func (w *Worker) Start() {
	//开一个新的协程
	go func() {
		for {
			//注册到对象池中,
			w.WorkerPool <- w.JobChannel
			fmt.Printf("[%s]把自己注册到 对象池中 \n", w.name)
			select {
			//接收到了新的任务
			case job := <-w.JobChannel:
				fmt.Printf("[%s] 工人接收到了任务 当前任务的长度是[%d]\n", w.name, len(w.WorkerPool))
				job.Payload.Play()
				time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
			//接收到了任务
			case <-w.quit:
				fmt.Println("结束任务", w.name)
				return
			}
		}
	}()
}

func (w Worker) Stop() {
	go func() {
		w.quit <- true
	}()
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	//容量为{maxWorkers}的 channel
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{
		WorkerPool: pool,       // 将工人放到一个池中,可以理解成一个部门中
		name:       "调度者",      //调度者的名字
		maxWorkers: maxWorkers, //这个调度者有好多个工人
	}
}

func (d *Dispatcher) Run() {
	// 开始运行
	for i := 0; i < d.maxWorkers; i++ {
		worker := NewWorker(d.WorkerPool, fmt.Sprintf("work-%s", strconv.Itoa(i)))
		//开始工作
		worker.Start()
	}
	//监控
	go d.dispatch()

}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			fmt.Println("调度者,接收到一个工作任务")
			time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
			// 调度者接收到一个工作任务
			go func(job Job) {
				//从现有的对象池中拿出一个
				jobChannel := <-d.WorkerPool
				fmt.Println(jobChannel)
				fmt.Println(job)
				jobChannel <- job
			}(job)
		default:

			//fmt.Println("ok!!")
		}

	}
}

func initialize() {
	maxWorkers := 1
	maxQueue := 20
	//初始化一个调试者,并指定它可以操作的 工人个数
	dispatch := NewDispatcher(maxWorkers)
	JobQueue = make(chan Job, maxQueue) //指定任务的队列长度
	//并让它一直接运行
	dispatch.Run()
}

func main() {
	//初始化对象池
	initialize()
	for i := 0; i < 1; i++ {
		p := Payload{
			fmt.Sprintf("玩家-[%s]", strconv.Itoa(i)),
		}
		JobQueue <- Job{
			Payload: p,
		}
		time.Sleep(time.Second)
	}
	close(JobQueue)
}

不懂的地方如下:

// w.WorkerPool <- w.JobChannel 这个流向不了解, 为什么下面的 select job := <-w.JobChannel: 还可以获得 channel 数据

func (w *Worker) Start() {
	go func() {
		for {
			w.WorkerPool <- w.JobChannel
			fmt.Printf("[%s]把自己注册到 对象池中 \n", w.name)
			select {
			//接收到了新的任务
			case job := <-w.JobChannel:
				fmt.Printf("[%s] 工人接收到了任务 当前任务的长度是[%d]\n", w.name, len(w.WorkerPool))
				job.Payload.Play()
				time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
			//接收到了任务
			case <-w.quit:
				fmt.Println("结束任务", w.name)
				return
			}
		}
	}()
}

// dispatch 
go func(job Job) {
    //从现有的对象池中拿出一个
    jobChannel := <-d.WorkerPool
    fmt.Println(jobChannel)
    fmt.Println(job)
    jobChannel <- job    //job 留向 jobChannel ??? 不懂
  }(job)

大佬们帮忙帮忙!

2598 次点击
所在节点    Go 编程语言
4 条回复
Tomotoes
2019-08-09 14:23:12 +08:00
你仔细看下 WorkerPool,JobChannel 的 类型 , 仔细看。
Tomotoes
2019-08-09 14:23:33 +08:00
Bruin
2019-08-09 14:27:53 +08:00
@Tomotoes WorkerPool 通道的通道,JobChannel 通道,
```
w.WorkerPool <- w.JobChannel 从 JobChannel 流向了 WorkerPool,

初始化的时候 JobChannel 是 nil 怎么流向 WorkerPool, 注册到对象池的呢?

```
Bruin
2019-08-09 14:55:09 +08:00
@Tomotoes 我好像明白了
jobChannel := <-d.WorkerPool
jobChannel <- job //其实是 w.JobChannel <- job

所以
select {
//会接受到新流入的 channel
case job := <-w.JobChannel:

感谢感谢!

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/590463

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX