有状态的Goroutines
在前面的示例中,我们使用显式锁定[mutexes](互斥锁)来同步对多个goroutine的共享状态的访问。另一个选择是使用goroutines和channel的内置同步功能来实现相同的结果。这种基于通道的方法与Go的共享内存的想法保持一致,通过通信和每个数据拥有正好1 goroutine。
package mainimport ("fmt""math/rand""sync/atomic""time")
在这个例子中,我们的州将由一个单一的路由所有。这将保证数据永远不会因并发访问而受损。为了读取或写入该状态,其他goroutine将向拥有的goroutine发送消息并接收相应的提示。这些readOp和writeOp``structsencapsulate这些请求和一个拥有goutoutine响应的方式。
type readOp struct {key intresp chan int}type writeOp struct {key intval intresp chan bool}func main() {
和以前一样,我们将计算我们执行的操作数量。
var readOps uint64var writeOps uint64
“读取”和“写入”通道将被其他goroutines用于分别发出读取和写入请求。
reads := make(chan readOp)writes := make(chan writeOp)
这是拥有state的goroutine,这是一个如前例所示的地图,但现在私有的有状态goroutine。这个goroutine在“读取”和“写入”通道上反复选择,在它们到达时响应请求。响应是通过首先执行所请求的操作然后在responsechannelresp上发送一个值来表示成功(以及“读取”的情况下的期望值)来执行的。
go func() {var state = make(map[int]int)for {select {case read := <-reads:read.resp <- state[read.key]case write := <-writes:state[write.key] = write.valwrite.resp <- true}}}()
这将启动100个goroutines,通过reads通道向状态拥有的goroutine发出读取。每次读取都需要构建一个readOp,通过reads通道发送,并通过提供的resp通道接收结果。
for r := 0; r < 100; r++ {go func() {for {read := readOp{key: rand.Intn(5),resp: make(chan int)}reads <- read<-read.respatomic.AddUint64(&readOps, 1)time.Sleep(time.Millisecond)}}()}
我们也使用similarapproach开始10次写入。
for w := 0; w < 10; w++ {go func() {for {write := writeOp{key: rand.Intn(5),val: rand.Intn(100),resp: make(chan bool)}writes <- write<-write.respatomic.AddUint64(&writeOps, 1)time.Sleep(time.Millisecond)}}()}
让goroutines工作一秒钟。
time.Sleep(time.Second)
最后,捕获并报告操作计数。
readOpsFinal := atomic.LoadUint64(&readOps)fmt.Println("readOps:", readOpsFinal)writeOpsFinal := atomic.LoadUint64(&writeOps)fmt.Println("writeOps:", writeOpsFinal)}
运行我们的程序表明,基于goroutine的状态管理示例完成了大约80,000个总操作。
$ go run stateful-goroutines.goreadOps: 71708writeOps: 7177
对于这种特殊情况,基于goroutine的方法比基于互斥锁的方法更复杂。在某些情况下,它可能很有用,例如,当您涉及其他通道或管理多个此类互斥锁时容易出错。你应该使用任何最自然的方法,特别是在理解你的程序的正确性方面。
