sync.RWMutex - 并发读写问题
当多个线程访问相同的数据时, 就会出现并发读写问题(reader-writer problems)。线程访问数据有两种访问类型:
- 读线程 reader: 只进行数据读取
- 写线程 writer: 进行数据修改
当 writer 获取到数据的访问权限后, 其他任何线程(reader 或 writer)都无权限访问此数据。比如, 当 writer 在修改数据无法保证原子性时(如数据库), 此时读取未完成的修改必须被阻塞, 以防止加载脏数据, 其他诸如:
- writer 不能无限等待
- reader 不能无限等待
- 不允许线程出现无限等待
多读/单写互斥锁(如sync.RWMutex)的具体实现解决了一种并发读写问题。Go中如何保证数据可靠性?
用法
以下程序使用sync.RWMutex
的示例, 程序读写互斥锁来保护临界区–sleep()
(源码)。
1 | package main |
每次执行完一组 goroutine(reader 和 writer)的临界区代码后, 都会打印新的一行。很显然, RWMutex 允许至少一个 reader(一个或多个 reader)存在而 writer 同时只能存在一个。
writer 调用到Lock()
时, 将会使新的 reader/writer 被阻塞。当存在 reader 加了 RLock
时, writer 会等待这一组 reader 完成正在执行的任务, 当这一组任务完成后, writer 将开始执行。从输出可以很明显的看到, 每一行的 R 都会递减一个, 直到没有 R 之后将打印一个 W。
1 | ... |
一旦 writer 结束, 之前被阻塞的 reader 将恢复执行, 然后下一个 writer 也将开始启动。值得一提的是, 如果一个 writer 完成, 并且有 reader 和 writer 都在等待, 那么首个 reader 将解除阻塞, 然后才轮到 writer。这种交替执行的方式使得 writer 需等待当前这组 reader 完成, 所以无论 reader 还是 writer 都不会有无限等待的情况。
实现
读锁 RLock
1 | func (rw *RWMutex) RLock() { |
readerCount
字段是int32
类型的值, 表示待处理的 reader 数量(正在读取数据或被 writer 阻塞)。这基本上是已调用 RLock
函数, 但尚未调用 RUnlock
函数的 reader 数量。
atomic.AddInt32等价于如下原子性表达:
1 | *addr += delta |
addr
是*int32
类型变量, delta
是int32
类型。因为此操作具有原子性, 所以累加delta
操作不会影响其他线程。
如果没有 writer, 则
readerCount
总是会大于或等于 0(因为 writer 会把 readerCount 置为负数, 通过 Lock 函数的 atomic.AddInt32(&rw.readerCount, -rwmutexMaxreaders), 此时 reader 是一种运行速度很快的非阻塞方式, 因为只需要调用atomic.AddInt32
。
信号量 Semaphore
信号量是 Edsger Dijkstra 发明的数据结构, 在解决多种同步问题时很有用。其本质是一个整数, 并关联两个操作:
- 申请
acquire
(也称为wait
、decrement
或P
操作) - 释放
release
(也称signal
、increment
或V
操作)
acquire
操作将信号量减 1, 如果结果值为负则线程阻塞, 且直到其他线程进行了信号量累加为正数才能恢复。如结果为正数, 线程则继续执行。
release
操作将信号量加 1, 如存在被阻塞的线程, 此时他们中的一个线程将解除阻塞。
Go 运行时提供的runtime_SemacquireMutex
和runtime_Semrelease
函数可用来实现sync.RWMutex
互斥锁。
锁 Lock
1 | func (rw *RWMutex) Lock() { |
writer 通过Lock
方法获取共享数据的独占权限。首先, 它会申请阻塞其他写操作的互斥锁(rw.w.Lock()
), 此互斥锁在Unlock
函数的最后才会进行解锁。下一步, 将readerCount
减去rwmutexMaxreader
(值为 1 左移 30 位, 1<<30
)使其为负数。当readerCount
变为负数时, Rlock
将阻塞接下来的所有读请求。
再分析Rlock()
函数中逻辑:
1 | if atomic.AddInt32(&rw.readerCount, 1) < 0 { |
后续的 reader 将会被阻塞, 那么已运行的 reader 会怎样呢? readerWait
字段用来记录当前 reader 执行的数量。writer 被信号量writerSem
阻塞, 直到最后一个 reader 在使用后面讨论的RUnlock
方法解锁后会把writerSem
加 1, 此时信号量将变成 0, writer被解除阻塞(Tips: RUnlock 函数中的runtime_Semrelease(&rw.writerSem, false)
)
如果没有有效的 reader, 那么 writer 将继续其执行。
最大 reader 数 rwmutexMaxreader
在rwmutex.go中定义的常量:
1 | const rwmutexMaxreader = 1 << 30 |
readerCount
字段是int32类型, 其范围为:
[-1 << 31, (1 << 31) — 1] or [-2147483648, 2147483647]
RWMutext
使用此字段来计算挂起的 reader 和 writer 的标记(置为负数)。在Lock
方法中:
1 | r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxreader) + rwmutexMaxreader |
Lock
会将readerCount
字段减去1<<30
, 当readerCount
负值时表示 writer 调用了Lock
正等待被处理, atomic.AddInt32(&rw.readerCount, -rwmutexMaxreaders) + rwmutexMaxreaders
这个操作既让readerCount
变为负数又使r
存储回了 readerCount
。rwmutexMaxreaders
也可以限制被挂起 reader 的数量。如果有rwmutexMaxreader
个或更多挂起的 reader, 那么readerCount
将是非负值, 此时将导致整个机制的崩溃。所以, reader 实际的限制数是: rwmutexMaxreader - 1
, 此值1073741823
超过了10亿
。
解读锁 RUnlock
1 | func (rw *RWMutex) RUnlock() { |
每次调用此方法将使 readerCount
减 1(RLock 方法中增加 1)。如果减完后 readerCount
值为负, 则表示当前存在 writer 正在等待或运行。这是因为在 Lock()
方法中 readerCount
减去了 rwmutexMaxreader
。然后, 当检查到将完成的 reader 数量(readerWait 数值)最终为 0 时, 则表示 writer 可以最终申请信号量。(r < 0时, 存在两个分支, 当走 r+1 == 0 的分支时, 表示 readerCount 此时为 0 即没有 RLock, 所以 throw 了。当走下面那个分支时, r < 0
则是因为存在 writer 把 readerCount 置为了负数在等待 reader 结束, 那么当最后一个 reader 解锁时需要将 WriteSem 信号量加 1, 唤醒 writer)
解锁 Unlock
1 | func (rw *RWMutex) Unlock() { |
解锁被 writer 持有的互斥锁时, 首先通过atomic.AddInt32
将readerCount
加上rwmutexMaxreader
, 这时readerCount
将变成非负值。如readerCount
比 0 大, 则表示存在 reader 正在等待 writer 执行完成, 此时应唤醒这些等待的 reader。之后写锁将被释放, 从而允许其他 writer 为了写入而锁定互斥锁。(如果还存在挂起的 reader, 则在 writer 解锁之前需要通过信号量 readerSem 唤醒这些 reader 执行)
如果 reader 或 writer 尝试解锁未锁定的互斥锁时, 调用Unlock或Runlock方法将抛出错误(示例源码)。
1 | m := sync.RWMutex{} |
Response:
1 | fatal error: sync: Unlock of unlocked RWMutex |
递归读锁定 Recursive read locking
如果一个 reader goroutine 持有了读锁, 而此时另一个 writer goroutine 调用Lock申请加写锁, 此后在最初的读锁被释放前其他 goroutine 不能获取到读锁。特定情况下, 这能防止递归读锁, 这种策略保证了锁的可用性, Lock的调用会阻止其他新的 reader 来获得锁。
RWMutex 的工作方式是, 如果有一个 writer 调用了 Lock, 则所有调用 RLock 都将被锁定, 无论是否已经获得了读锁定(示例源码):
1 | package main |
Response:
1 | RLock |
Tips: 为什么会发送死锁呢?原作者用递归函数在 defer 里面解锁, 那么在加第三层读锁的时候, 还没有读锁解锁。这时, readCount 是 3, 此时正好加了一个 Lock 写锁, 由于 readCount 是 3
1 | if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { |
由上可知, 此时 writer 需要等待所有进行中的 reader 完成, 此时又调用了 RLock,
1 | if atomic.AddInt32(&rw.readerCount, 1) < 0 { |
由于在第四个 RLock 前, 加了 Lock 操作, 使得 readerCount 为负数。所以就造成了死锁, 即 reader 在等待 readerSem, writer 在等待 writerSem*
复制锁 Copying locks
go tool vet
可以检测锁是否被复制了, 因为复制锁会导致死锁。
性能 Performance
在 CPU 核数增多时 RWMutex 的性能会有下降
争用 Contention
1 | package main |
当 mutexcontention
程序运行时, 执行 pprof:
1 | > go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1 |
Tips 耗时 57.28s
, 且指向了 mu.Un-lock
?
当 goroutine 调用 Lock
而阻塞时, 会记录当前发生的准确时间–叫做acquiretime
。当另一个 groutine 解锁, 至少存在一个 goroutine 在等待获得锁, 则其中一个解除阻塞并调用其 mutexevent
函数。该 mutexevent
函数通过检查 SetMutexProfileFraction
设置的速率来决定此事件应被保留还是丢弃。此事件包含整个等待的时间(当前时间 - 获得时间)。从上面的例子可以看出, 所有阻塞在特定互斥锁的 goroutines 的总等待时间会被收集和展示。
在 Go 1.11(sync: enable profiling of RWMutex)中增加读锁(Rlock 和 RUnlock)的争用。
备注
- 原文地址: https://medium.com/golangspec/sync-rwmutex-ca6c6c3208a0
- 原文作者: Michał Łowicki