关闭channel的基本原则
只在发送端关闭channel,而且只能在有且只有一个发送者的情况下
多sender或者在receive端关闭
比较粗暴的方式
如果必须在接收方关闭,或者在多sender中的任意一个sender中关闭channel,可以尝试使用recover的方式处理panic(which is not very recommended),例如
func SafeClose(ch chan T) (justClosed bool) {defer func() {if recover() != nil {// The return result can be altered// in a defer function call.justClosed = false}}()// assume ch != nil here.close(ch) // panic if ch is closedreturn true // <=> justClosed = true; return}
同样的,相同的方法可以用来处理向可能已经关闭了的ch中发送内容
func SafeSend(ch chan T, value T) (closed bool) {defer func() {if recover() != nil {closed = true}}()ch <- value // panic if ch is closedreturn false // <=> closed = false; return}
比较优雅的方式
sync.Once
type MyChannel struct {C chan Tonce sync.Once}func NewMyChannel() *MyChannel {return &MyChannel{C: make(chan T)}}func (mc *MyChannel) SafeClose() {mc.once.Do(func() {close(mc.C)})}
但是如果用once,再多sender的情况下,还是会造成其他sender向已经关闭了的channel中塞数据,所以使用一个锁是不错的选择
type MyChannel struct {C chan Tclosed boolmutex sync.Mutex}func NewMyChannel() *MyChannel {return &MyChannel{C: make(chan T)}}func (mc *MyChannel) SafeClose() {mc.mutex.Lock()defer mc.mutex.Unlock()if !mc.closed {close(mc.C)mc.closed = true}}func (mc *MyChannel) IsClosed() bool {mc.mutex.Lock()defer mc.mutex.Unlock()return mc.closed}
再次强调,不推荐在接收方close(ch)
优雅的方式
一个sender,多个receiver
最简单的一个模型,sender关闭channel即可
一个receiver,多个sender
可以通过关闭额外的一个channel去通知那多个sender
package mainimport ("time""math/rand""sync""log")func main() {rand.Seed(time.Now().UnixNano())log.SetFlags(0)// ...const MaxRandomNumber = 100000const NumSenders = 1000wgReceivers := sync.WaitGroup{}wgReceivers.Add(1)// ...dataCh := make(chan int, 100)stopCh := make(chan struct{})// stopCh is an additional signal channel.// Its sender is the receiver of channel dataCh.// Its reveivers are the senders of channel dataCh.// sendersfor i := 0; i < NumSenders; i++ {go func() {for {// The first select is to try to exit the goroutine// as early as possible. In fact, it is not essential// for this specified example, so it can be omitted.select {case <- stopCh:returndefault:}// Even if stopCh is closed, the first branch in the// second select may be still not selected for some// loops if the send to dataCh is also unblocked.// But this is acceptable for this example, so the// first select block above can be omitted.select {case <- stopCh:returncase dataCh <- rand.Intn(MaxRandomNumber):}}}()}// the receivergo func() {defer wgReceivers.Done()for value := range dataCh {if value == MaxRandomNumber-1 {// The receiver of the dataCh channel is// also the sender of the stopCh channel.// It is safe to close the stop channel here.close(stopCh)return}log.Println(value)}}()// ...wgReceivers.Wait()}
在golang中可以不用关闭channel,channel在不被任何goroutine使用的时候,最后都会被垃圾回收机制回收,无论channel已经关闭(原作者说的,我还不太确定,因为还没找到相关资料也还没去看源码)
多个receiver,多个sender
可以由任意一个玩家(receiver或者sender皆可)告诉一个管理者(可以单独启一个goroutine之类),然后管理者去关闭一个signalChannel来达到关闭所有channel的目的
package mainimport ("time""math/rand""sync""log""strconv")func main() {rand.Seed(time.Now().UnixNano())log.SetFlags(0)// ...const MaxRandomNumber = 100000const NumReceivers = 10const NumSenders = 1000wgReceivers := sync.WaitGroup{}wgReceivers.Add(NumReceivers)// ...dataCh := make(chan int, 100)stopCh := make(chan struct{})// stopCh is an additional signal channel.// Its sender is the moderator goroutine shown below.// Its reveivers are all senders and receivers of dataCh.toStop := make(chan string, 1)// The channel toStop is used to notify the moderator// to close the additional signal channel (stopCh).// Its senders are any senders and receivers of dataCh.// Its reveiver is the moderator goroutine shown below.var stoppedBy string// moderatorgo func() {stoppedBy = <-toStopclose(stopCh)}()// sendersfor i := 0; i < NumSenders; i++ {go func(id string) {for {value := rand.Intn(MaxRandomNumber)if value == 0 {// Here, a trick is used to notify the moderator// to close the additional signal channel.select {case toStop <- "sender#" + id:default:}return}// The first select here is to try to exit the goroutine// as early as possible. This select blocks with one// receive operation case and one default branches will// be specially optimized as a try-receive operation by// the standard Go compiler.select {case <- stopCh:returndefault:}// Even if stopCh is closed, the first branch in the// second select may be still not selected for some// loops (and for ever in theory) if the send to// dataCh is also non-blocking.// This is why the first select block above is needed.select {case <- stopCh:returncase dataCh <- value:}}}(strconv.Itoa(i))}// receiversfor i := 0; i < NumReceivers; i++ {go func(id string) {defer wgReceivers.Done()for {// Same as the sender goroutine, the first select here// is to try to exit the goroutine as early as possible.// This select blocks with one send operation case and// one default branches will be specially optimized as// a try-send operation by the standard Go compiler.select {case <- stopCh:returndefault:}// Even if stopCh is closed, the first branch in the// second select may be still not selected for some// loops (and for ever in theory) if the receive from// dataCh is also non-blocking.// This is why the first select block is needed.select {case <- stopCh:returncase value := <-dataCh:if value == MaxRandomNumber-1 {// The same trick is used to notify// the moderator to close the// additional signal channel.select {case toStop <- "receiver#" + id:default:}return}log.Println(value)}}}(strconv.Itoa(i))}// ...wgReceivers.Wait()log.Println("stopped by", stoppedBy)}
