发布于 

sync.RWMutex - 并发读写问题

当多个线程访问相同的数据时, 就会出现并发读写问题(reader-writer problems)。线程访问数据有两种访问类型:

  • 读线程 reader: 只进行数据读取
  • 写线程 writer: 进行数据修改

当 writer 获取到数据的访问权限后, 其他任何线程(reader 或 writer)都无权限访问此数据。比如, 当 writer 在修改数据无法保证原子性时(如数据库), 此时读取未完成的修改必须被阻塞, 以防止加载脏数据, 其他诸如:

  • writer 不能无限等待
  • reader 不能无限等待
  • 不允许线程出现无限等待
    多读/单写互斥锁(如sync.RWMutex)的具体实现解决了一种并发读写问题。Go中如何保证数据可靠性?

用法

以下程序使用sync.RWMutex的示例, 程序读写互斥锁来保护临界区–sleep()(源码)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
)

func init() {
rand.Seed(time.Now().Unix())
}

func sleep() {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}

func reader(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
sleep()
m.RLock()
c <- 1
sleep()
c <- -1
m.RUnlock()
wg.Done()
}

func writer(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
sleep()
m.Lock()
c <- 1
sleep()
c <- -1
m.Unlock()
wg.Done()
}

func main() {
var m sync.RWMutex
var rs, ws int
rsCh := make(chan int)
wsCh := make(chan int)
go func() {
for {
select {
case n := <-rsCh:
rs += n
case n := <-wsCh:
ws += n
}
fmt.Printf("%s%s\n", strings.Repeat("R", rs),
strings.Repeat("W", ws))
}
}()
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go reader(rsCh, &m, &wg)
}
for i := 0; i < 3; i++ {
wg.Add(1)
go writer(wsCh, &m, &wg)
}
wg.Wait()
}

每次执行完一组 goroutine(reader 和 writer)的临界区代码后, 都会打印新的一行。很显然, RWMutex 允许至少一个 reader(一个或多个 reader)存在而 writer 同时只能存在一个。

writer 调用到Lock()时, 将会使新的 reader/writer 被阻塞。当存在 reader 加了 RLock 时, writer 会等待这一组 reader 完成正在执行的任务, 当这一组任务完成后, writer 将开始执行。从输出可以很明显的看到, 每一行的 R 都会递减一个, 直到没有 R 之后将打印一个 W。

1
2
3
4
5
6
7
8
9
...
RRRRR
RRRR
RRR
RR
R

W
...

一旦 writer 结束, 之前被阻塞的 reader 将恢复执行, 然后下一个 writer 也将开始启动。值得一提的是, 如果一个 writer 完成, 并且有 reader 和 writer 都在等待, 那么首个 reader 将解除阻塞, 然后才轮到 writer。这种交替执行的方式使得 writer 需等待当前这组 reader 完成, 所以无论 reader 还是 writer 都不会有无限等待的情况。

实现

读锁 RLock

1
2
3
4
5
6
7
func (rw *RWMutex) RLock() {
//... 竞态检测
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.ReadeSem, false)
}
//... 竞态检测
}

readerCount字段是int32类型的值, 表示待处理的 reader 数量(正在读取数据或被 writer 阻塞)。这基本上是已调用 RLock 函数, 但尚未调用 RUnlock 函数的 reader 数量。

atomic.AddInt32等价于如下原子性表达:

1
2
*addr += delta
return *addr

addr*int32类型变量, deltaint32类型。因为此操作具有原子性, 所以累加delta操作不会影响其他线程。

如果没有 writer, 则readerCount总是会大于或等于 0(因为 writer 会把 readerCount 置为负数, 通过 Lock 函数的 atomic.AddInt32(&rw.readerCount, -rwmutexMaxreaders), 此时 reader 是一种运行速度很快的非阻塞方式, 因为只需要调用atomic.AddInt32

信号量 Semaphore

信号量是 Edsger Dijkstra 发明的数据结构, 在解决多种同步问题时很有用。其本质是一个整数, 并关联两个操作:

  • 申请acquire(也称为 waitdecrementP 操作)
  • 释放release(也称 signalincrementV 操作)

acquire操作将信号量减 1, 如果结果值为负则线程阻塞, 且直到其他线程进行了信号量累加为正数才能恢复。如结果为正数, 线程则继续执行。

release操作将信号量加 1, 如存在被阻塞的线程, 此时他们中的一个线程将解除阻塞。

Go 运行时提供的runtime_SemacquireMutexruntime_Semrelease函数可用来实现sync.RWMutex互斥锁。

锁 Lock

1
2
3
4
5
6
7
8
9
func (rw *RWMutex) Lock() {
//... 竞态检测
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxreader) + rwmutexMaxreader
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}
//... 竞态检测
}

writer 通过Lock方法获取共享数据的独占权限。首先, 它会申请阻塞其他写操作的互斥锁(rw.w.Lock()), 此互斥锁在Unlock函数的最后才会进行解锁。下一步, 将readerCount减去rwmutexMaxreader(值为 1 左移 30 位, 1<<30)使其为负数。当readerCount变为负数时, Rlock 将阻塞接下来的所有读请求。

再分析Rlock()函数中逻辑:

1
2
3
4
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.SeadeSem, false)
}

后续的 reader 将会被阻塞, 那么已运行的 reader 会怎样呢? readerWait字段用来记录当前 reader 执行的数量。writer 被信号量writerSem阻塞, 直到最后一个 reader 在使用后面讨论的RUnlock方法解锁后会把writerSem加 1, 此时信号量将变成 0, writer被解除阻塞(Tips: RUnlock 函数中的runtime_Semrelease(&rw.writerSem, false)

如果没有有效的 reader, 那么 writer 将继续其执行。

最大 reader 数 rwmutexMaxreader

在rwmutex.go中定义的常量:

1
const rwmutexMaxreader = 1 << 30

readerCount 字段是int32类型, 其范围为:

[-1 << 31, (1 << 31) — 1] or [-2147483648, 2147483647]

RWMutext使用此字段来计算挂起的 reader 和 writer 的标记(置为负数)。在Lock方法中:

1
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxreader) + rwmutexMaxreader

Lock会将readerCount字段减去1<<30, 当readerCount负值时表示 writer 调用了Lock正等待被处理, atomic.AddInt32(&rw.readerCount, -rwmutexMaxreaders) + rwmutexMaxreaders这个操作既让readerCount变为负数又使r存储回了 readerCountrwmutexMaxreaders也可以限制被挂起 reader 的数量。如果有rwmutexMaxreader个或更多挂起的 reader, 那么readerCount将是非负值, 此时将导致整个机制的崩溃。所以, reader 实际的限制数是: rwmutexMaxreader - 1, 此值1073741823超过了10亿

解读锁 RUnlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (rw *RWMutex) RUnlock() {
//... 竞态检测
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxreader {
race.Enable()
thrSw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.WriteSem, false)
}
}
//... 竞态检测
}

每次调用此方法将使 readerCount 减 1(RLock 方法中增加 1)。如果减完后 readerCount 值为负, 则表示当前存在 writer 正在等待或运行。这是因为在 Lock() 方法中 readerCount 减去了 rwmutexMaxreader。然后, 当检查到将完成的 reader 数量(readerWait 数值)最终为 0 时, 则表示 writer 可以最终申请信号量。(r < 0时, 存在两个分支, 当走 r+1 == 0 的分支时, 表示 readerCount 此时为 0 即没有 RLock, 所以 throw 了。当走下面那个分支时, r < 0则是因为存在 writer 把 readerCount 置为了负数在等待 reader 结束, 那么当最后一个 reader 解锁时需要将 WriteSem 信号量加 1, 唤醒 writer)

解锁 Unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
func (rw *RWMutex) Unlock() {
//... 竞态检测
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxreader)
if r >= rwmutexMaxreader {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
rw.w.Unlock()
//... 竞态检测
}

解锁被 writer 持有的互斥锁时, 首先通过atomic.AddInt32readerCount加上rwmutexMaxreader, 这时readerCount将变成非负值。如readerCount比 0 大, 则表示存在 reader 正在等待 writer 执行完成, 此时应唤醒这些等待的 reader。之后写锁将被释放, 从而允许其他 writer 为了写入而锁定互斥锁。(如果还存在挂起的 reader, 则在 writer 解锁之前需要通过信号量 readerSem 唤醒这些 reader 执行)

如果 reader 或 writer 尝试解锁未锁定的互斥锁时, 调用Unlock或Runlock方法将抛出错误(示例源码)。

1
2
m := sync.RWMutex{}
m.Unlock()

Response:

1
2
fatal error: sync: Unlock of unlocked RWMutex
...

递归读锁定 Recursive read locking

如果一个 reader goroutine 持有了读锁, 而此时另一个 writer goroutine 调用Lock申请加写锁, 此后在最初的读锁被释放前其他 goroutine 不能获取到读锁。特定情况下, 这能防止递归读锁, 这种策略保证了锁的可用性, Lock的调用会阻止其他新的 reader 来获得锁。

RWMutex 的工作方式是, 如果有一个 writer 调用了 Lock, 则所有调用 RLock 都将被锁定, 无论是否已经获得了读锁定(示例源码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"fmt"
"sync"
"time"
)

var m sync.RWMutex

func f(n int) int {
if n < 1 {
return 0
}
fmt.Println("RLock")
m.RLock()
defer func() {
fmt.Println("RUnlock")
m.RUnlock()
}()
time.Sleep(100 * time.Millisecond)
return f(n-1) + n
}

func main() {
done := make(chan int)
go func() {
time.Sleep(200 * time.Millisecond)
fmt.Println("Lock")
m.Lock()
fmt.Println("Unlock")
m.Unlock()
done <- 1
}()
f(4)
<-done
}

Response:

1
2
3
4
5
6
RLock
RLock
RLock
Lock
RLock
fatal error: all goroutines are asleep - deadlock!

Tips: 为什么会发送死锁呢?原作者用递归函数在 defer 里面解锁, 那么在加第三层读锁的时候, 还没有读锁解锁。这时, readCount 是 3, 此时正好加了一个 Lock 写锁, 由于 readCount 是 3

1
2
3
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}

由上可知, 此时 writer 需要等待所有进行中的 reader 完成, 此时又调用了 RLock,

1
2
3
4
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem)
}

由于在第四个 RLock 前, 加了 Lock 操作, 使得 readerCount 为负数。所以就造成了死锁, 即 reader 在等待 readerSem, writer 在等待 writerSem*

复制锁 Copying locks

go tool vet可以检测锁是否被复制了, 因为复制锁会导致死锁。

性能 Performance

在 CPU 核数增多时 RWMutex 的性能会有下降

争用 Contention

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"net/http"
_ "net/http/pprof"
"runtime"
"sync"
"time"
)

func main() {
var mu sync.Mutex
runtime.SetMutexProfileFraction(5)
for i := 0; i < 10; i++ {
go func() {
for {
mu.Lock()
time.Sleep(100 * time.Millisecond)
mu.Unlock()
}
}()
}
http.ListenAndServe(":8888", nil)
}
> go build mutexcontention.go
> ./mutexcontention

mutexcontention 程序运行时, 执行 pprof:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
> go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1
Fetching profile over HTTP from http://localhost:8888/debug/pprof/mutex?debug=1
Saved profile in /Users/mlowicki/pprof/pprof.mutexcontention.contentions.delay.003.pb.gz
File: mutexcontention
Type: delay
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) list main
Total: 57.28s
ROUTINE main.main.func1 in .../src/github.com/mlowicki/mutexcontention/mutexcontention.go
0 57.28s (flat, cum) 100% of Total
. . 14: for i := 0; i < 10; i++ {
. . 15: go func() {
. . 16: for {
. . 17: mu.Lock()
. . 18: time.Sleep(100 * time.Millisecond)
. 57.28s 19: mu.Unlock()
. . 20: }
. . 21: }()
. . 22: }
. . 23:
. . 24: http.ListenAndServe(":8888", nil)

Tips 耗时 57.28s, 且指向了 mu.Un-lock?

当 goroutine 调用 Lock 而阻塞时, 会记录当前发生的准确时间–叫做acquiretime。当另一个 groutine 解锁, 至少存在一个 goroutine 在等待获得锁, 则其中一个解除阻塞并调用其 mutexevent 函数。该 mutexevent 函数通过检查 SetMutexProfileFraction 设置的速率来决定此事件应被保留还是丢弃。此事件包含整个等待的时间(当前时间 - 获得时间)。从上面的例子可以看出, 所有阻塞在特定互斥锁的 goroutines 的总等待时间会被收集和展示。

在 Go 1.11(sync: enable profiling of RWMutex)中增加读锁(Rlock 和 RUnlock)的争用。

备注