Go 语言实现安全计数的若干种方法

时间:2022-08-26 21:33:16

Go 语言实现安全计数的若干种方法

原文如下:

有一天,我正研究共享计数器的简单经典实现,实现方式使用的是 C++ 中的互斥锁,这时,我非常想知道还有哪些线程安全的实现方式。我通常使用 Go 来满足自己的好奇心,本文就是一篇如何用 goroutine-safe 的方式实现计数器的方法汇总。

不要这样做

我们先从非安全的实现方式开始:

  1. type NotSafeCounter struct { 
  2.  number uint64 
  3.  
  4. func NewNotSafeCounter() Counter { 
  5.  return &NotSafeCounter{0} 
  6.  
  7. func (c *NotSafeCounter) Add(num uint64) { 
  8.  c.number = c.number + num 
  9.  
  10. func (c *NotSafeCounter) Read() uint64 { 
  11.  return c.number 

代码上没什么特别的地方。我们来测试下结果正确与否:创建 100 个 goroutine,其中三分之二的 goroutine 对共享计数器加一。

  1. func testCorrectness(t *testing.T, counter Counter) { 
  2.  wg := &sync.WaitGroup{} 
  3.  for i := 0; i < 100; i++ { 
  4.   wg.Add(1) 
  5.   if i%3 == 0 { 
  6.    go func(counter Counter) { 
  7.     counter.Read() 
  8.     wg.Done() 
  9.    }(counter) 
  10.   } else if i%3 == 1 { 
  11.    go func(counter Counter) { 
  12.     counter.Add(1) 
  13.     counter.Read() 
  14.     wg.Done() 
  15.    }(counter) 
  16.   } else { 
  17.    go func(counter Counter) { 
  18.     counter.Add(1) 
  19.     wg.Done() 
  20.    }(counter) 
  21.   } 
  22.  } 
  23.  
  24.  wg.Wait() 
  25.  
  26.  if counter.Read() != 66 { 
  27.   t.Errorf("counter should be %d and was %d", 66, counter.Read()) 
  28.  } 

测试的结果是不确定的,有时候能正确运行,有时候会出现类似这样的错误:

  1. counter_test.go:34: counter should be 66 and was 65 

经典实现方式

实现一个正确计数器的传统方式是使用互斥锁,保证任意时间只有一个协程操作计数器。Go 语言的话,我们可以使用 sync 包。

  1. type MutexCounter struct { 
  2.  mu     *sync.RWMutex 
  3.  number uint64 
  4.  
  5. func NewMutexCounter() Counter { 
  6.  return &MutexCounter{&sync.RWMutex{}, 0} 
  7.  
  8. func (c *MutexCounter) Add(num uint64) { 
  9.  c.mu.Lock() 
  10.  defer c.mu.Unlock() 
  11.  c.number = c.number + num 
  12.  
  13. func (c *MutexCounter) Read() uint64 { 
  14.  c.mu.RLock() 
  15.  defer c.mu.RUnlock() 
  16.  return c.number 

现在测试结果每次都能通过且都是正确的。

使用 channel

锁是一种保证同步的低级原语。Go 也提供了更高级实现方式 - channel。

关于 mutexe 和 channel,现在有太多类似这样的讨论:“mutexe vs channel ”、“哪个更好”、“我应当使用哪一个”等。其中一些讨论非常有趣且有益,但这并不是本文讨论的重点。

我们使用 channel 来实现协程安全的计数器,使用 channel 充当队列,对计数器的操作(读、写)都缓存在队列中,按顺序操作。具体的操作通过传递 func() 实现。创建时,计数器会衍生出一个 goroutine 并且按顺序执行队列里的操作。

下面是计数器的定义:

  1. type ChannelCounter struct { 
  2.  ch     chan func() 
  3.  number uint64 
  4.  
  5. func NewChannelCounter() Counter { 
  6.  counter := &ChannelCounter{make(chan func(), 100), 0} 
  7.  go func(counter *ChannelCounter) { 
  8.   for f := range counter.ch { 
  9.    f() 
  10.   } 
  11.  }(counter) 
  12.  return counter 

当一个协程调用 Add(),就往队列里面添加一个写操作:

  1. func (c *ChannelCounter) Add(num uint64) { 
  2.  c.ch <- func() { 
  3.   c.number = c.number + num 
  4.  } 

当一个协程调用 Read(),就往队列里面添加一个读操作:

  1. func (c *ChannelCounter) Read() uint64 { 
  2.  ret := make(chan uint64) 
  3.  c.ch <- func() { 
  4.   ret <- c.number 
  5.   close(ret) 
  6.  } 
  7.  return <-ret 

我真正喜欢这个实现的地方在于,这种按顺序执行的方式非常的清晰。

原子方式

我们甚至可以用更低级别的原语,利用 sync/atomic 包执行原子操作。

  1. type AtomicCounter struct { 
  2.  number uint64 
  3.  
  4. func NewAtomicCounter() Counter { 
  5.  return &AtomicCounter{0} 
  6.  
  7. func (c *AtomicCounter) Add(num uint64) { 
  8.  atomic.AddUint64(&c.number, num) 
  9.  
  10. func (c *AtomicCounter) Read() uint64 { 
  11.  return atomic.LoadUint64(&c.number) 

比较和交换

或者,我们可以使用非常经典的原语:CAS,对计时器进行计数。

  1. func (c *CASCounter) Add(num uint64) { 
  2.  for { 
  3.   v := atomic.LoadUint64(&c.number) 
  4.   if atomic.CompareAndSwapUint64(&c.number, v, v+num) { 
  5.    return 
  6.   } 
  7.  } 
  8.  
  9. func (c *CASCounter) Read() uint64 { 
  10.  return atomic.LoadUint64(&c.number) 

float 类型该如何实现

在我探索学习过程中,看到一个非常棒的视频 - 《Prometheus: Designing and Implementing a Modern Monitoring Solution in Go[1]》。在视频的最后,讨论了如何实现浮点数计数器。到目前为止,所有的技术都适用于浮点数,除了 sync/atomic 包,还没提供浮点数的原子操作。

在视频里,Bj?rn Rabenstein 介绍了如何通过将浮点数存储为 uint64 并使用 math.Float64bits 和 math.Float64frombits 在 float64 和 uint64 之间进行转换来解决此问题。

  1. type CASFloatCounter struct { 
  2.  number uint64 
  3.  
  4. func NewCASFloatCounter() *CASFloatCounter { 
  5.  return &CASFloatCounter{0} 
  6.  
  7. func (c *CASFloatCounter) Add(num float64) { 
  8.  for { 
  9.   v := atomic.LoadUint64(&c.number) 
  10.   newValue := math.Float64bits(math.Float64frombits(v) + num) 
  11.   if atomic.CompareAndSwapUint64(&c.number, v, newValue) { 
  12.    return 
  13.   } 
  14.  } 
  15.  
  16. func (c *CASFloatCounter) Read() float64 { 
  17.  return math.Float64frombits(atomic.LoadUint64(&c.number)) 

最后

这篇文章是共享计数器的实现汇总。这是我好奇心驱使的结果,此外对并发也有一个基本的了解。如果你有其他实现共享计数的方式,请告诉我。

本文提到的实现方式对应的代码可以看这里[2],此外还包括运行用例和基准测试。

参考资料

[1]Prometheus: Designing and Implementing a Modern Monitoring Solution in Go: https://www.youtube.com/watch?v=1V7eJ0jN8-E

[2]看这里: https://github.com/brunocalza/sharedcounter

via:https://brunocalza.me/there-are-many-ways-to-safely-count/

作者:BRUNO CALZA

四哥水平有限,如有翻译或理解错误,烦请帮忙指出,感谢!

原文链接:https://mp.weixin.qq.com/s/9fZUEJSLLnq0sn_6Y9YZ-g