一条面试题引发的思考 Go 版本

2019-04-07 09:42:15 +08:00
 bwangel

说明

前两天看到了这个帖子,https://www.v2ex.com/t/547045#reply53 感觉其中描述的问题很有意思,我就用 Go 把它提到的解决方案都实现了一遍,文章链接:一条面试题引发的关于 Go 语言中锁的思考,还请大家多多指教!

~~~正文分割线~~~

一条面试题引发的关于 Go 语言中锁的思考

前言

前两天在 V2EX 上看到了一篇博文,《一条经典面试题引发的思考》,感觉很有意思,我就用 Go 把它的答案实现了一遍。

问题描述及一个错误解法

问题如下:

编写一个程序,开启 3 个线程 A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的 输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC....

一个错误解法如下:

     1	package main
     2	
     3	import (
     4		"fmt"
     5		"sync"
     6	)
     7	
     8	var mu sync.Mutex
     9	var index int
    10	var endSignal = make(chan struct{})
    11	
    12	func echoRoutine(routineIndex int, routineName string) {
    13		for index < 30 {
    14			mu.Lock()
    15			if index%3 == routineIndex {
    16				fmt.Println(index, routineName)
    17				index++
    18			}
    19			mu.Unlock()
    20		}
    21	
    22		endSignal <- struct{}{}
    23	}
    24	
    25	func main() {
    26		routineNames := []string{"A", "B", "C"}
    27	
    28		for idx, name := range routineNames {
    29			go echoRoutine(idx, name)
    30		}
    31	
    32		for _ = range routineNames {
    33			<-endSignal
    34		}
    35		//Output:
    36		//0 A
    37		//1 B
    38		//2 C
    39		//3 A
    40		//4 B
    41		//5 C
    42		//6 A
    43		//7 B
    44		//8 C
    45		//9 A
    46		//10 B
    47		//11 C
    48		//12 A
    49		//13 B
    50		//14 C
    51		//15 A
    52		//16 B
    53		//17 C
    54		//18 A
    55		//19 B
    56		//20 C
    57		//21 A
    58		//22 B
    59		//23 C
    60		//24 A
    61		//25 B
    62		//26 C
    63		//27 A
    64		//28 B
    65		//29 C
    66		//30 A
    67		//31 B
    68	}

从上面的输出可以看到,程序还额外输出了两次 A 和 B。首先说结论,这是因为检查条件index < 30 没有加锁。为了理解这个错误,首先我们需要了解一下 Go 的内存模型。

Go 语言的内存模型

首先说什么是 Go 的内存模型,在官方文档中是这样定义的:

The Go memory model specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine.

Go 语言的内存模型规定了一个 Goroutine 可以看见另外一个 Goroutine 修改同一个变量的值的条件,这类似于 Java 内存模型中 内存可见性 问题。

Happens Before 原则

在 Go 语言中,编译器和处理器会对执行的指令进行重排序。Go 语言会保证的是,在单个 Goroutine 内,重排序后的执行效果和未进行重排序(即在代码中定义的执行顺序)的执行效果是一样。但是在多个 Goroutine 的运行环境下,由于重排序的原因,一个 Goroutine 观察到的执行顺序可能和另外一个 Goroutine 观察到的不一样。例如一个 Goroutine 执行a = 1; b = 2,另一个 Goroutine 可能会在a被赋值之前先观察到b被赋值了。

为了规范读和写的操作,Go 定义了 happens before 原则。对于变量读写操作,我们可以将其称作是事件。事件之间的顺序可以定义为三种,E1 发生在 E2 之前,E1 发生在 E2 之后,E1 和 E2 同时发生。

在单个 Goroutine 内,happens before 顺序就是程序执行的顺序,如果下面两个条件都被满足的话,变量v的读取事件r,可以观察到变量v的写入事件w

  1. r没有在w之前发生。
  2. w之后,r之前,没有另外一个变量v的写入事件w'发生。

总的来说,在单个 Goroutine 的环境下,读事件r会观察到最近的一个写事件w

在多个 Goroutine 的运行环境下,如果需要让v的读取事件r观察到其写入事件w。需要满足更严格的条件:

  1. w发生在r之前
  2. 任何针对共享变量v的写入事件都发生在w之前或者r之后。

因为在多个 Goroutine 的运行环境下,读写事件可能并行地发生,所以第一点更严格了一些,要求w必须在r之前发生,两者不能同时发生,且它们之间没有其他的写事件w'。 因此在多个 Goroutine 访问一个共享变量v的时候,我们必须使用同步事件去建立一个 happens before 条件来让读事件观察到目标写事件。

此外,还需要注意的是:

  1. 在变量v发生写事件之前,它被初始化成对应类型的零值。
  2. 大于一个机器字的变量的读写事件,会表现成 未指定顺序 的多次机器字大小的读写操作。

正确答案 V1

了解了 Go 内存模型中的 Happens Before 原则之后,我们接着再来分析上述的错误代码。

理解了这个错误之后,我们可以把代码稍微改一下,将index < 30这个比较操作也置于锁的保护中,就能够得到正确的结果了。

package main

import (
	"fmt"
	"sync"
)

var i int
var mu sync.Mutex
var endSignal = make(chan struct{})

func threadPrint(threadNum int, threadName string) {
	for {
		mu.Lock()
		if i >= 30 {
			mu.Unlock()
			break
		}

		if i%3 == threadNum {
			fmt.Printf("%d: %s\n", i, threadName)
			i++
		}
		mu.Unlock()
	}

	endSignal <- struct{}{}
}

func main() {
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(idx, name)
	}

	for _ = range names {
		<-endSignal
	}
}

正确答案 V2 -- 公平锁

上述代码是得到的结果是正确的,但是还存在一些问题。我们想要的执行顺讯是有序的, 但每次解锁的时候,都是 A, B, C 三个 Goroutine 上去抢锁资源。有时候某个 Goroutine 抢到了锁资源,但是其并不符合执行要求(i%3 != threadNum)。它就会将锁释放,然后让大家重新再来抢。

这样的争抢索锁资源造成了时间上的浪费,执行了一些不必要的操作。

为了避免这些浪费,我们可以参考 Java 中的公平锁,让解锁的顺序和上锁的顺序匹配,每次只有一个 Goroutine 获得锁资源,大家不必每次都去争抢锁资源。

package main

import (
	"fmt"
	"log"
	"sync"
)

type FailLock struct {
	mu        *sync.Mutex
	cond      *sync.Cond
	holdCount int
}

func NewFailLock() sync.Locker {
	mu := new(sync.Mutex)
	cond := sync.NewCond(mu)

	return &FailLock{
		holdCount: 0,
		mu:        mu,
		cond:      cond,
	}
}

func (fl *FailLock) Lock() {
	fl.mu.Lock()
	defer fl.mu.Unlock()

	fl.holdCount++
	if fl.holdCount == 1 {
		return
	}

	fl.cond.Wait()
}

func (fl *FailLock) Unlock() {
	fl.mu.Lock()
	defer fl.mu.Unlock()

	if fl.holdCount == 0 {
		log.Fatal("unlock of UnLocked mutex")
	}

	fl.holdCount--
	if fl.holdCount != 0 {
		fl.cond.Signal()
	}
}

var (
	end = make(chan struct{})
	i   int
)

func threadPrint(threadNum int, threadName string, mu sync.Locker) {
	for i < 30 {
		mu.Lock()
		if i >= 30 {
			mu.Unlock()
			continue
		}
		if i < 3 && i%3 != threadNum {
			mu.Unlock()
			continue
		}

		fmt.Printf("%d: %s\n", i, threadName)
		i += 1
		mu.Unlock()
	}
	end <- struct{}{}
}

func main() {
	mu := NewFailLock()
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(idx, name, mu)
	}

	for _ = range names {
		<-end
	}
}

上述代码中需要注意两点:

  1. 由于 Goroutine 无法保证启动顺序,即我们无法保证最开始上锁的顺序是A,B,C这样的顺序,所以需要在64~67行加一个判断,程序刚开始执行时如果获得的锁不对,就不执行任何操作,重新获得锁。
  2. 由于可见性的原因,需要在60~63行上锁之后加一个判断,保证i的值是最新的值。

正确答案 V3 -- AtomicInt

除了自己手动加锁外,我们也可以使用 Go 的 atomic 包中提供的原子操作来完成上述功能。 每个 Goroutine 原子性地获得i的值,如果符合i % 3 == threadNum的条件,则执行操作,否则作自旋。代码如下:

package main

import (
	"fmt"
	"sync/atomic"
)

var (
	end = make(chan struct{})
	i   int32
)

func threadPrint(threadNum int32, threadName string) {

	for {
		v := atomic.LoadInt32((*int32)(&i))
		if v >= 30 {
			break
		}

		if v%3 == threadNum {
			fmt.Printf("%d: %s\n", i, threadName)
			atomic.AddInt32((*int32)(&i), int32(1))
		} else {
			continue
		}
	}
	end <- struct{}{}
}

func main() {
	names := []string{"A", "B", "C"}

	for idx, name := range names {
		go threadPrint(int32(idx), name)
	}

	for _ = range names {
		<-end
	}
}

参考链接

9080 次点击
所在节点    Go 编程语言
83 条回复
richzhu
2019-04-07 10:15:21 +08:00
请教一下,为什么通道要用 struct{} 类型
bwangel
2019-04-07 10:27:55 +08:00
@richzhu

字面量 struct{}代表了空的结构体类型。这样的类型既不包含任何字段也没有任何方法。该类型的值所需的存储空间几乎可以忽略不计。

因此,我们可以把这样的值作为占位值来使用。比如:在同一个应用场景下,map[int] [int]bool 类型的值占用更少的存储空间。
JaguarJack
2019-04-07 10:39:09 +08:00
学习了 正好看到锁这里
exiaoxing
2019-04-07 10:44:39 +08:00
好文,赞
xylophone21
2019-04-07 10:51:02 +08:00
这道题,感觉怪怪的。开 3 个线程,然后居然没有一点并行的需求
Mark3K
2019-04-07 10:54:50 +08:00
可以多开个 channel,避免加锁
func fanIn() <-chan string{
inCh := make(chan string)
go func() {
defer close(inCh)
for i := 0; i < 10; i++{
inCh <- aCh
inCh <- bCh
inCh <- cCh
}
}()
return inCh
}
deepdark
2019-04-07 10:55:46 +08:00
学习了,赞一个
mengzhuo
2019-04-07 11:02:59 +08:00
还自旋锁哈哈哈哈
是个正常的 Go 程序员都会选择用 fan in channel 模式的
bwangel
2019-04-07 11:11:55 +08:00
@Mark3K 可以贴一下完整代码吗?有些没太理解?

@mengzhuo 你这样说话的态度让人很不爽,我决定把你拉黑了。
Mark3K
2019-04-07 11:13:11 +08:00
@bwangel
```go
package main

import "fmt"

func gen(v string, times int) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
for i := 0; i < times; i++ {
ch <- v
}
}()
return ch
}

func fanIn(times int, inputs []<-chan string) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
for i := 0; i < times; i++ {
for _, input := range inputs {
v := <-input
ch <- v
}
}
}()
return ch
}

func main() {
times := 10
inputs := make([]<-chan string, 0, 3)
for _, K := range []string{"A", "B", "C"} {
inputs = append(inputs, gen(K, times))
}
for i := range fanIn(times, inputs) {
fmt.Println(i)
}
}

```
whoisghost
2019-04-07 11:20:14 +08:00
V3 -- AtomicInt 版本,把 names 再追加 “ D ”, "E", 把 3 => 5, 30 => 50, 还能正常运行吗?
bwangel
2019-04-07 11:24:10 +08:00
@Mark3K 理解了,谢谢。我补充一下。
hjc4869
2019-04-07 11:46:51 +08:00
简单问题复杂化。整层楼没人提到 Semaphore 也是惊了。
jadeity
2019-04-07 12:06:28 +08:00
并行无序,阻塞有序。
需要逻辑顺序的阻塞放在主线程保证顺序,其他不需要的阻塞放在其他线程提高性能。
不管是 channel,锁,信号还是其他什么名字,在合适的地方阻塞,把不合适的阻塞并行。
yianing
2019-04-07 12:10:19 +08:00
@hjc4869 哈哈,我看到这个问题的第一想法就是操作系统中学的信号量,read copy write
bwangel
2019-04-07 12:35:40 +08:00
@hjc4869 我刚刚上维基百科简单看了一下 Semaphore 的定义: https://zh.wikipedia.org/wiki/%E4%BF%A1%E5%8F%B7%E9%87%8F

感觉和我在文中写的 [正确答案 V2 – 公平锁] 的实现方式很像,可以详述一下 Semaphore 的解决方案吗?最好可以贴一些代码。
bwangel
2019-04-07 12:37:35 +08:00
@jadeity @jadeity 嗯,感觉这才是正确使用线程 /Goroutine 的姿势。这个题现在看来感觉有些奇怪,使用线程这种并行处理方式来做一些同步的事情。可能是面试者想要考察线程同步的方式吧。
reus
2019-04-07 12:39:53 +08:00
@Mark3K chan 读写一样有锁,而且开销更大……
hjc4869
2019-04-07 12:58:39 +08:00
@bwangel

static void Worker(char c, int o, SemaphoreSlim wait, SemaphoreSlim signal, SemaphoreSlim finish)
{
for (int i = 0; i < 10; i++)
{
wait.Wait();
Console.WriteLine($"{i*3+o}: {c}");
signal.Release();
}

if (finish != null) finish.Release();
}

static void Main(string[] args)
{
SemaphoreSlim s1 = new SemaphoreSlim(0), s2 = new SemaphoreSlim(0), s3 = new SemaphoreSlim(0);
SemaphoreSlim sTerm = new SemaphoreSlim(0);
Task.Run(() => Worker('A', 0, s1, s2, null));
Task.Run(() => Worker('B', 1, s2, s3, null));
Task.Run(() => Worker('C', 2, s3, s1, sTerm));
s1.Release();
sTerm.Wait();
}
ngn999
2019-04-07 13:02:05 +08:00
我们部门面试也问这个问题,非 go, 是考信号量

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/552620

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX