一条面试题引发的思考 Go 版本

2019-04-07 09:42:15 +08:00
 bwangel

说明

前两天看到了这个帖子,https://www.v2ex.com/t/547045#reply53 感觉其中描述的问题很有意思,我就用 Go 把它提到的解决方案都实现了一遍,文章链接:一条面试题引发的关于 Go 语言中锁的思考,还请大家多多指教!

~~~正文分割线~~~

一条面试题引发的关于 Go 语言中锁的思考

前言

前两天在 V2EX 上看到了一篇博文,《一条经典面试题引发的思考》,感觉很有意思,我就用 Go 把它的答案实现了一遍。

问题描述及一个错误解法

问题如下:

编写一个程序,开启 3 个线程 A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的 输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC....

一个错误解法如下:

     1	package main
     2	
     3	import (
     4		"fmt"
     5		"sync"
     6	)
     7	
     8	var mu sync.Mutex
     9	var index int
    10	var endSignal = make(chan struct{})
    11	
    12	func echoRoutine(routineIndex int, routineName string) {
    13		for index < 30 {
    14			mu.Lock()
    15			if index%3 == routineIndex {
    16				fmt.Println(index, routineName)
    17				index++
    18			}
    19			mu.Unlock()
    20		}
    21	
    22		endSignal <- struct{}{}
    23	}
    24	
    25	func main() {
    26		routineNames := []string{"A", "B", "C"}
    27	
    28		for idx, name := range routineNames {
    29			go echoRoutine(idx, name)
    30		}
    31	
    32		for _ = range routineNames {
    33			<-endSignal
    34		}
    35		//Output:
    36		//0 A
    37		//1 B
    38		//2 C
    39		//3 A
    40		//4 B
    41		//5 C
    42		//6 A
    43		//7 B
    44		//8 C
    45		//9 A
    46		//10 B
    47		//11 C
    48		//12 A
    49		//13 B
    50		//14 C
    51		//15 A
    52		//16 B
    53		//17 C
    54		//18 A
    55		//19 B
    56		//20 C
    57		//21 A
    58		//22 B
    59		//23 C
    60		//24 A
    61		//25 B
    62		//26 C
    63		//27 A
    64		//28 B
    65		//29 C
    66		//30 A
    67		//31 B
    68	}

从上面的输出可以看到,程序还额外输出了两次 A 和 B。首先说结论,这是因为检查条件index < 30 没有加锁。为了理解这个错误,首先我们需要了解一下 Go 的内存模型。

Go 语言的内存模型

首先说什么是 Go 的内存模型,在官方文档中是这样定义的:

The Go memory model specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine.

Go 语言的内存模型规定了一个 Goroutine 可以看见另外一个 Goroutine 修改同一个变量的值的条件,这类似于 Java 内存模型中 内存可见性 问题。

Happens Before 原则

在 Go 语言中,编译器和处理器会对执行的指令进行重排序。Go 语言会保证的是,在单个 Goroutine 内,重排序后的执行效果和未进行重排序(即在代码中定义的执行顺序)的执行效果是一样。但是在多个 Goroutine 的运行环境下,由于重排序的原因,一个 Goroutine 观察到的执行顺序可能和另外一个 Goroutine 观察到的不一样。例如一个 Goroutine 执行a = 1; b = 2,另一个 Goroutine 可能会在a被赋值之前先观察到b被赋值了。

为了规范读和写的操作,Go 定义了 happens before 原则。对于变量读写操作,我们可以将其称作是事件。事件之间的顺序可以定义为三种,E1 发生在 E2 之前,E1 发生在 E2 之后,E1 和 E2 同时发生。

在单个 Goroutine 内,happens before 顺序就是程序执行的顺序,如果下面两个条件都被满足的话,变量v的读取事件r,可以观察到变量v的写入事件w

  1. r没有在w之前发生。
  2. w之后,r之前,没有另外一个变量v的写入事件w'发生。

总的来说,在单个 Goroutine 的环境下,读事件r会观察到最近的一个写事件w

在多个 Goroutine 的运行环境下,如果需要让v的读取事件r观察到其写入事件w。需要满足更严格的条件:

  1. w发生在r之前
  2. 任何针对共享变量v的写入事件都发生在w之前或者r之后。

因为在多个 Goroutine 的运行环境下,读写事件可能并行地发生,所以第一点更严格了一些,要求w必须在r之前发生,两者不能同时发生,且它们之间没有其他的写事件w'。 因此在多个 Goroutine 访问一个共享变量v的时候,我们必须使用同步事件去建立一个 happens before 条件来让读事件观察到目标写事件。

此外,还需要注意的是:

  1. 在变量v发生写事件之前,它被初始化成对应类型的零值。
  2. 大于一个机器字的变量的读写事件,会表现成 未指定顺序 的多次机器字大小的读写操作。

正确答案 V1

了解了 Go 内存模型中的 Happens Before 原则之后,我们接着再来分析上述的错误代码。

理解了这个错误之后,我们可以把代码稍微改一下,将index < 30这个比较操作也置于锁的保护中,就能够得到正确的结果了。

package main

import (
	"fmt"
	"sync"
)

var i int
var mu sync.Mutex
var endSignal = make(chan struct{})

func threadPrint(threadNum int, threadName string) {
	for {
		mu.Lock()
		if i >= 30 {
			mu.Unlock()
			break
		}

		if i%3 == threadNum {
			fmt.Printf("%d: %s\n", i, threadName)
			i++
		}
		mu.Unlock()
	}

	endSignal <- struct{}{}
}

func main() {
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(idx, name)
	}

	for _ = range names {
		<-endSignal
	}
}

正确答案 V2 -- 公平锁

上述代码是得到的结果是正确的,但是还存在一些问题。我们想要的执行顺讯是有序的, 但每次解锁的时候,都是 A, B, C 三个 Goroutine 上去抢锁资源。有时候某个 Goroutine 抢到了锁资源,但是其并不符合执行要求(i%3 != threadNum)。它就会将锁释放,然后让大家重新再来抢。

这样的争抢索锁资源造成了时间上的浪费,执行了一些不必要的操作。

为了避免这些浪费,我们可以参考 Java 中的公平锁,让解锁的顺序和上锁的顺序匹配,每次只有一个 Goroutine 获得锁资源,大家不必每次都去争抢锁资源。

package main

import (
	"fmt"
	"log"
	"sync"
)

type FailLock struct {
	mu        *sync.Mutex
	cond      *sync.Cond
	holdCount int
}

func NewFailLock() sync.Locker {
	mu := new(sync.Mutex)
	cond := sync.NewCond(mu)

	return &FailLock{
		holdCount: 0,
		mu:        mu,
		cond:      cond,
	}
}

func (fl *FailLock) Lock() {
	fl.mu.Lock()
	defer fl.mu.Unlock()

	fl.holdCount++
	if fl.holdCount == 1 {
		return
	}

	fl.cond.Wait()
}

func (fl *FailLock) Unlock() {
	fl.mu.Lock()
	defer fl.mu.Unlock()

	if fl.holdCount == 0 {
		log.Fatal("unlock of UnLocked mutex")
	}

	fl.holdCount--
	if fl.holdCount != 0 {
		fl.cond.Signal()
	}
}

var (
	end = make(chan struct{})
	i   int
)

func threadPrint(threadNum int, threadName string, mu sync.Locker) {
	for i < 30 {
		mu.Lock()
		if i >= 30 {
			mu.Unlock()
			continue
		}
		if i < 3 && i%3 != threadNum {
			mu.Unlock()
			continue
		}

		fmt.Printf("%d: %s\n", i, threadName)
		i += 1
		mu.Unlock()
	}
	end <- struct{}{}
}

func main() {
	mu := NewFailLock()
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(idx, name, mu)
	}

	for _ = range names {
		<-end
	}
}

上述代码中需要注意两点:

  1. 由于 Goroutine 无法保证启动顺序,即我们无法保证最开始上锁的顺序是A,B,C这样的顺序,所以需要在64~67行加一个判断,程序刚开始执行时如果获得的锁不对,就不执行任何操作,重新获得锁。
  2. 由于可见性的原因,需要在60~63行上锁之后加一个判断,保证i的值是最新的值。

正确答案 V3 -- AtomicInt

除了自己手动加锁外,我们也可以使用 Go 的 atomic 包中提供的原子操作来完成上述功能。 每个 Goroutine 原子性地获得i的值,如果符合i % 3 == threadNum的条件,则执行操作,否则作自旋。代码如下:

package main

import (
	"fmt"
	"sync/atomic"
)

var (
	end = make(chan struct{})
	i   int32
)

func threadPrint(threadNum int32, threadName string) {

	for {
		v := atomic.LoadInt32((*int32)(&i))
		if v >= 30 {
			break
		}

		if v%3 == threadNum {
			fmt.Printf("%d: %s\n", i, threadName)
			atomic.AddInt32((*int32)(&i), int32(1))
		} else {
			continue
		}
	}
	end <- struct{}{}
}

func main() {
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(int32(idx), name)
	}

	for _ = range names {
		<-end
	}
}

参考链接

9120 次点击
所在节点    Go 编程语言
83 条回复
whoisghost
2019-04-07 19:03:41 +08:00
@gamexg 嗯。至少是正确的。
tairan2006
2019-04-07 19:16:44 +08:00
这个就是并行输出串行化…用锁写很浪费 cpu 时间或者写出惊群代码呀。

这个扇出的代码也有问题,题目要求是在各自线程里面输出,你要是这么写,不如直接从主协程里面按顺序从管道里面拿数据,更省事。我也手痒写了个玩。

https://gist.github.com/YiuTerran/f39064e3aa2152c64503a616725a65d9
index90
2019-04-07 19:28:57 +08:00
```
package main

import "fmt"

type Message struct {
msg string
ch chan bool
}

func T(msg string) chan Message {
w := make(chan bool)
c := make(chan Message)
go func(){
defer close(c)
for i:=0;i<10;i++ {
c <- Message{msg:msg, ch:w}
<-w
}
}()
return c
}

func main() {
A := T("A")
B := T("B")
C := T("C")
for i:=0;i<10;i++ {
a := <-A;fmt.Println(a.msg)
b := <-B;fmt.Println(b.msg)
c := <-C;fmt.Println(c.msg)
a.ch <- true
b.ch <- true
c.ch <- true
}
}
```
出自油管[19:21] : <amp-youtube data-videoid="f6kdp27TYZs" layout="responsive" width="480" height="270"></amp-youtube>
index90
2019-04-07 19:31:01 +08:00
补充一下,在主线程打印有点作弊,如果在子线程中打印,应该才是题目本意?
Lpl
2019-04-07 20:25:47 +08:00
没认真审题,把代码改成了单向循环链表。每个协程判断上一层的信号量 signal,接收到就打印当前节点。

https://gist.github.com/penglongli/7f8eaee1fdec19e55097bb1d041dcac3
bwangel
2019-04-07 20:51:28 +08:00
@whoisghost #35

老哥抱歉,不知道啥时候把你 block 了。所以一直没看到你的评论。

你指出的这个问题非常对,我把数量改成 5/50 后。程序就会阻塞,请问这是为什么啊,有些没太理解。
reus
2019-04-07 20:58:11 +08:00
@hjc4869 你可以用“信号量”来理解,但别人未必需要。原帖 ( https://www.v2ex.com/t/547045 )几个 go 的实现,都没人提到“信号量”。
chan 还可以传递额外的信息,如果线程间不仅仅是同步而是需要传递一些数据,那单靠 semaphore 就无能为力了。go 运行时实现的 semaphore,是没有暴露到标准库的,所以没有”直接用 semaphore ”这个可选项。
realityone
2019-04-07 21:41:46 +08:00
@bwangel

$ GOMAXPROCS=10 go run main.go
bwangel
2019-04-07 21:47:55 +08:00
@realityone 嗯,找到了一篇相关的文章,还在研读

http://www.sarathlakshman.com/2016/06/15/pitfall-of-golang-scheduler
tonywangcn
2019-04-07 21:58:56 +08:00
@bwangel 在 go version go1.10.3 darwin/amd64 状态下,你的代码是没有问题的。https://stackoverflow.com/questions/13107958/what-exactly-does-runtime-gosched-do
index90
2019-04-07 22:04:57 +08:00
package main

import "fmt"

func T(msg string) chan struct{} {
ch := make(chan struct{})
go func() {
defer close(ch)
for i:=0;i<10;i++ {
<-ch
fmt.Println(msg)
ch<- struct{}{}
}
}()
return ch
}

func main() {
A := T("A")
B := T("B")
C := T("C")
t := struct{}{}
for ; ; {
A <- t
B <- <- A
C <- <- B
t = <- C
}
}
myself659
2019-04-07 22:24:25 +08:00
https://gist.github.com/myself659/8fd9af077add584ec7ba0a0f86bce3b5

一个 channel 搞定

更多 channel 特性 参考一下这篇文章

https://zhuanlan.zhihu.com/p/26393117
tairan2006
2019-04-07 22:33:30 +08:00
@bwangel @tonywangcn V3 答案协程是个无阻塞的死循环啊,CPU 根本不会让渡出去…所以只能手动让出了,这个实现本身是错误的…能跑几个取决于你的 CPU 核数。

如果说同步原语的话,这题用条件变量或者信号量都可以。不过对于 golang 而言,channel 就是它的同步原语,很多时候并不一定非要用更底层的东西(过早优化是万恶之源 2333
index90
2019-04-07 22:34:27 +08:00
@myself659
编写一个程序,开启 3 个线程 A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的 输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC....

关键点:
1. 3 个线程
2. 每个线程将自己的输出打印
3. 每个线程都打印 10 遍

所以我说,在 main 中 for 10 次,或者在 main 中打印,都应该算作弊
tairan2006
2019-04-07 22:38:24 +08:00
@myself659 大哥,人家只让创建 3 个线程…你创建 30 个协程不怕被面试官打死么
ethego
2019-04-07 22:45:40 +08:00
一个读写锁竟然能说这么啰嗦。。真理往往是简洁的
```go
package main

import (
"fmt"
"sync"
)

var mu sync.RWMutex
var index int
var endSignal = make(chan struct{})

func echoRoutine(routineIndex int, routineName string) {
for {
mu.RLock()
i := index
mu.RUnlock()

if i >= 30 {
break
}

if i % 3 == routineIndex {
fmt.Println(i, routineName)
mu.Lock()
index++
mu.Unlock()
}
}

endSignal <- struct{}{}
}

func main() {
routineNames := []string{"A", "B", "C"}

for idx, name := range routineNames {
go echoRoutine(idx, name)
}

for _ = range routineNames {
<-endSignal
}
}
```
tairan2006
2019-04-07 22:53:45 +08:00
@tonywangcn 哦,我刚看了一下,go 后面改了调度方式,现在实现了非协作式抢占,所以新版本应该是饿不死了。

不过,感觉还是会被面试官毙掉(
myself659
2019-04-07 23:05:28 +08:00
georgetso
2019-04-07 23:10:35 +08:00
@ethego 太暴力了
myself659
2019-04-07 23:35:14 +08:00

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/552620

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX