首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
宝塔
V2EX  ›  程序员

如何优雅的关闭 Go Channel「译」

  •  
  •   codehole · 2018-04-08 17:57:55 +08:00 · 1699 次点击
    这是一个创建于 591 天前的主题,其中的信息可能已经有所发展或是发生改变。

    Channel 关闭原则

    不要在消费端关闭 channel,不要在有多个并行的生产者时对 channel 执行关闭操作。

    也就是说应该只在[唯一的或者最后唯一剩下]的生产者协程中关闭 channel,来通知消费者已经没有值可以继续读了。只要坚持这个原则,就可以确保向一个已经关闭的 channel 发送数据的情况不可能发生。

    暴力关闭 channel 的正确方法

    如果想要在消费端关闭 channel,或者在多个生产者端关闭 channel,可以使用 recover 机制来上个保险,避免程序因为 panic 而崩溃。

    func SafeClose(ch chan T) (justClosed bool) {
    	defer func() {
    		if recover() != nil {
    			justClosed = false
    		}
    	}()
    	
    	// assume ch != nil here.
    	close(ch) // panic if ch is closed
    	return true // <=> justClosed = true; return
    }
    

    使用这种方法明显违背了上面的 channel 关闭原则,然后性能还可以,毕竟在每个协程只会调用一次 SafeClose,性能损失很小。

    同样也可以在生产消息的时候使用 recover 方法。

    func SafeSend(ch chan T, value T) (closed bool) {
    	defer func() {
    		if recover() != nil {
    			// The return result can be altered 
    			// in a defer function call.
    			closed = true
    		}
    	}()
    	
    	ch <- value // panic if ch is closed
    	return false // <=> closed = false; return
    }
    

    礼貌地关闭 channel 方法

    还有不少人经常使用用 sync.Once 来关闭 channel,这样可以确保只会关闭一次

    type MyChannel struct {
    	C    chan T
    	once sync.Once
    }
    
    func NewMyChannel() *MyChannel {
    	return &MyChannel{C: make(chan T)}
    }
    
    func (mc *MyChannel) SafeClose() {
    	mc.once.Do(func() {
    		close(mc.C)
    	})
    }
    

    同样我们也可以使用 sync.Mutex 达到同样的目的。

    type MyChannel struct {
    	C      chan T
    	closed bool
    	mutex  sync.Mutex
    }
    
    func NewMyChannel() *MyChannel {
    	return &MyChannel{C: make(chan T)}
    }
    
    func (mc *MyChannel) SafeClose() {
    	mc.mutex.Lock()
    	if !mc.closed {
    		close(mc.C)
    		mc.closed = true
    	}
    	mc.mutex.Unlock()
    }
    
    func (mc *MyChannel) IsClosed() bool {
    	mc.mutex.Lock()
    	defer mc.mutex.Unlock()
    	return mc.closed
    }
    

    要知道 golang 的设计者不提供 SafeClose 或者 SafeSend 方法是有原因的,他们本来就不推荐在消费端或者在并发的多个生产端关闭 channel,比如关闭只读 channel 在语法上就彻底被禁止使用了。

    优雅地关闭 channel 的方法

    上文的 SafeSend 方法一个很大的劣势在于它不能用在 select 块的 case 语句中。而另一个很重要的劣势在于像我这样对代码有洁癖的人来说,使用 panic/recover 和 sync/mutex 来搞定不是那么的优雅。下面我们引入在不同的场景下可以使用的纯粹的优雅的解决方法。

    多个消费者,单个生产者。这种情况最简单,直接让生产者关闭 channel 好了。

    package main
    
    import (
    	"time"
    	"math/rand"
    	"sync"
    	"log"
    )
    
    func main() {
    	rand.Seed(time.Now().UnixNano())
    	log.SetFlags(0)
    	
    	// ...
    	const MaxRandomNumber = 100000
    	const NumReceivers = 100
    	
    	wgReceivers := sync.WaitGroup{}
    	wgReceivers.Add(NumReceivers)
    	
    	// ...
    	dataCh := make(chan int, 100)
    	
    	// the sender
    	go func() {
    		for {
    			if value := rand.Intn(MaxRandomNumber); value == 0 {
    				// The only sender can close the channel safely.
    				close(dataCh)
    				return
    			} else {			
    				dataCh <- value
    			}
    		}
    	}()
    	
    	// receivers
    	for i := 0; i < NumReceivers; i++ {
    		go func() {
    			defer wgReceivers.Done()
    			
    			// Receive values until dataCh is closed and
    			// the value buffer queue of dataCh is empty.
    			for value := range dataCh {
    				log.Println(value)
    			}
    		}()
    	}
    	
    	wgReceivers.Wait()
    }
    

    多个生产者,单个消费者。这种情况要比上面的复杂一点。我们不能在消费端关闭 channel,因为这违背了 channel 关闭原则。但是我们可以让消费端关闭一个附加的信号来通知发送端停止生产数据。

    package main
    
    import (
    	"time"
    	"math/rand"
    	"sync"
    	"log"
    )
    
    func main() {
    	rand.Seed(time.Now().UnixNano())
    	log.SetFlags(0)
    	
    	// ...
    	const MaxRandomNumber = 100000
    	const NumSenders = 1000
    	
    	wgReceivers := sync.WaitGroup{}
    	wgReceivers.Add(1)
    	
    	// ...
    	dataCh := make(chan int, 100)
    	stopCh := make(chan struct{})
    	// stopCh is an additional signal channel.
    	// Its sender is the receiver of channel dataCh.
    	// Its reveivers are the senders of channel dataCh.
    	
    	// senders
    	for i := 0; i < NumSenders; i++ {
    		go func() {
    			for {
    				// The first select here is to try to exit the goroutine
    				// as early as possible. In fact, it is not essential
    				// for this example, so it can be omitted.
    				select {
    				case <- stopCh:
    					return
    				default:
    				}
    				
    				// Even if stopCh is closed, the first branch in the
    				// second select may be still not selected for some
    				// loops if the send to dataCh is also unblocked.
    				// But this is acceptable, so the first select
    				// can be omitted.
    				select {
    				case <- stopCh:
    					return
    				case dataCh <- rand.Intn(MaxRandomNumber):
    				}
    			}
    		}()
    	}
    	
    	// the receiver
    	go func() {
    		defer wgReceivers.Done()
    		
    		for value := range dataCh {
    			if value == MaxRandomNumber-1 {
    				// The receiver of the dataCh channel is
    				// also the sender of the stopCh cahnnel.
    				// It is safe to close the stop channel here.
    				close(stopCh)
    				return
    			}
    			
    			log.Println(value)
    		}
    	}()
    	
    	// ...
    	wgReceivers.Wait()
    }
    

    就上面这个例子,生产者同时也是退出信号 channel 的接受者,退出信号 channel 仍然是由它的生产端关闭的,所以这仍然没有违背 channel 关闭原则。值得注意的是,这个例子中生产端和接受端都没有关闭消息数据的 channel,channel 在没有任何 goroutine 引用的时候会自行关闭,而不需要显示进行关闭。

    多个生产者,多个消费者

    这是最复杂的一种情况,我们既不能让接受端也不能让发送端关闭 channel。我们甚至都不能让接受者关闭一个退出信号来通知生产者停止生产。因为我们不能违反 channel 关闭原则。但是我们可以引入一个额外的协调者来关闭附加的退出信号 channel。

    package main
    
    import (
    	"time"
    	"math/rand"
    	"sync"
    	"log"
    	"strconv"
    )
    
    func main() {
    	rand.Seed(time.Now().UnixNano())
    	log.SetFlags(0)
    	
    	// ...
    	const MaxRandomNumber = 100000
    	const NumReceivers = 10
    	const NumSenders = 1000
    	
    	wgReceivers := sync.WaitGroup{}
    	wgReceivers.Add(NumReceivers)
    	
    	// ...
    	dataCh := make(chan int, 100)
    	stopCh := make(chan struct{})
    		// stopCh is an additional signal channel.
    		// Its sender is the moderator goroutine shown below.
    		// Its reveivers are all senders and receivers of dataCh.
    	toStop := make(chan string, 1)
    		// The channel toStop is used to notify the moderator
    		// to close the additional signal channel (stopCh).
    		// Its senders are any senders and receivers of dataCh.
    		// Its reveiver is the moderator goroutine shown below.
    	
    	var stoppedBy string
    	
    	// moderator
    	go func() {
    		stoppedBy = <- toStop
    		close(stopCh)
    	}()
    	
    	// senders
    	for i := 0; i < NumSenders; i++ {
    		go func(id string) {
    			for {
    				value := rand.Intn(MaxRandomNumber)
    				if value == 0 {
    					// Here, a trick is used to notify the moderator
    					// to close the additional signal channel.
    					select {
    					case toStop <- "sender#" + id:
    					default:
    					}
    					return
    				}
    				
    				// The first select here is to try to exit the goroutine
    				// as early as possible. This select blocks with one
    				// receive operation case and one default branches will
    				// be optimized as a try-receive operation by the
    				// official Go compiler.
    				select {
    				case <- stopCh:
    					return
    				default:
    				}
    				
    				// Even if stopCh is closed, the first branch in the
    				// second select may be still not selected for some
    				// loops (and for ever in theory) if the send to
    				// dataCh is also unblocked.
    				// This is why the first select block is needed.
    				select {
    				case <- stopCh:
    					return
    				case dataCh <- value:
    				}
    			}
    		}(strconv.Itoa(i))
    	}
    	
    	// receivers
    	for i := 0; i < NumReceivers; i++ {
    		go func(id string) {
    			defer wgReceivers.Done()
    			
    			for {
    				// Same as the sender goroutine, the first select here
    				// is to try to exit the goroutine as early as possible.
    				select {
    				case <- stopCh:
    					return
    				default:
    				}
    				
    				// Even if stopCh is closed, the first branch in the
    				// second select may be still not selected for some
    				// loops (and for ever in theory) if the receive from
    				// dataCh is also unblocked.
    				// This is why the first select block is needed.
    				select {
    				case <- stopCh:
    					return
    				case value := <-dataCh:
    					if value == MaxRandomNumber-1 {
    						// The same trick is used to notify
    						// the moderator to close the
    						// additional signal channel.
    						select {
    						case toStop <- "receiver#" + id:
    						default:
    						}
    						return
    					}
    					
    					log.Println(value)
    				}
    			}
    		}(strconv.Itoa(i))
    	}
    	
    	// ...
    	wgReceivers.Wait()
    	log.Println("stopped by", stoppedBy)
    }
    

    以上三种场景不能涵盖全部,但是它们是最常见最通用的三种场景,基本上所有的场景都可以划分为以上三类。

    结论

    没有任何场景值得你去打破 channel 关闭原则,如果你遇到这样的一种特殊场景,还是建议你好好思考一下自己设计,是不是该重构一下了。

    阅读相关文章,关注公众号「码洞」

        1
    pagxir   2018-04-08 18:42:26 +08:00 via Android
        2
    tempdban   2018-04-08 19:19:23 +08:00 via Android
    我一直有个疑问,为啥都要关闭 channel
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   2514 人在线   最高记录 5043   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.3 · 24ms · UTC 14:16 · PVG 22:16 · LAX 06:16 · JFK 09:16
    ♥ Do have faith in what you're doing.