发布于 

Go并发下的Context工作流程

什么是Context?

读到很多关于Context(上下文)的术语,如应用上下文,请求上下文等,查阅资料但没有得到理解?有没有比较好的解释?

Context 指做一件事情的背景/环境/上下文/所需要的必要的数据。

本文就Golang的Context标准库介绍context的工作机制;通过外部API创建并使用context标准库;从源码角度分析介绍context工作流程。

context标准库很好的解决了多goroutine下通知传递和元数据处理。由于Golang中的goroutine之间没有父子关系,所以也不存在子进程退出后的通知机制。多个goroutine之间协同工作设计了以下四个方面:通信、同步、通知、退出四个方面:

  • 通信:chan通道是各goroutine之间通信的基础。(程序的数据通道)
  • 同步:使用不带缓冲的chan;sync.WaitGroup为多个goroutine提供同步等待机制;mutex锁与读写锁机制
  • 通知:通知与上文通信的区别是,通知的作用为管理,控制流数据。一般的解决方法是在输入端绑定两个chan,通过select收敛处理。
  • 退出:与通知类似,即增加一个单独的通道,借助chan和select的广播机制(close chan to broadcast)实现退出

从Go1.7版本开始使用context标准库来处理退出机制。主要提供了两个功能:退出通知元数据传递。可以传递给整个goroutine调用树的每个goroutine。

工作机制

每一个创建context的goroutine定义为root节点:root节点负责创建一个实现context接口的具体对象,并将该对象作为参数传递至新拉起的goroutine作为其上下文。下游goroutine继续封装该对象并以此类推向下传递。

interface

context接口:作为一个基本接口,所有的context对象都要实现该接口,并将其作为使用者调度时的参数类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 上下文接口包含截止时间、退出信号和其他value值
//
// Context方法可以同是被多个goroutine调用.
type Context interface {
// Deadline 返回的time为上下文工作完成时间
// 如果Context实现了超时机制,该方法返回超时时间,true。否则ok==false
Deadline() (deadline time.Time, ok bool)
// Done 当上下文工作完成时返回一个退出信号
// Done 返回nil时表示当前上下文可以退出
// 当连续调用Done返回相同的值时,当前Context将不会退出.
//
// 当触发退出信号时,退出操作在 WithCancel 中执行;
// WithDeadline 将在Deadline 时间之间关闭
// WithTimeout 将在超时时关闭Done
// func Stream(ctx context.Context, out chan<- Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
// 使用<-cahn struct{}来通知退出,供被调用的goroutine监听.
Done() <-chan struct{}
// 当Done未被关闭时,Err返回nil.
// 当Done被关闭时,返回一个非nil的错误信息
Err() error
// package user
//
// import "context"
//
// type User struct {...}
//
// type key int
//
// var userKey key
//
// //NewContext 返回了一个新的context传递元数据.
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
//
// //FromContext 返回了存储在上下文关系中的user信息.
//
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
Value(key interface{}) interface{}
}

canceler接口:拓展接口,规定了取消通知的Context具体类型需要实现的接口:

1
2
3
4
5
type canceler interface {
// 通知后续创建的goroutine退出
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}

struct

emptyCtx:实现了一个不具备任何功能的Context接口,其存在的目的就是作为Context对象树的root节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (*emptyCtx) Done() <-chan struct{} {
return nil
}

func (*emptyCtx) Err() error {
return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
//......
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)

func Background() Context {
return background
}

func TODO() Context {
return todo
}
//这两者返回值是一样的,文档上建议main函数可以使用Background()创建root context

**cancelCtx:**可以认为它与emptyCtx最大的区别在于,具体实现了cancel函数。即它可以向子goroutine传递cancel消息。

**timerCtx:**另一个实现Context接口的具体类型,内部封装了cancelCtx类型实例,同时拥有deadline变量,用于实现实时退出通知。

**valueCtx:**实现了Context接口的具体类型,内部封装cancelCtx类型实例,同时封装了一个kv存储变量,用于传递通知消息。

API

除了root context可以使用Background()创建以外,其余的context都应该从cancelCtxtimerCtxvalueCtx中选取一个来构建具体对象:

  • func WithCancel(parent Context) (Context, CancelFunc):创建cancelCtx实例。
  • func WithDeadline(parent Context, deadline time.Time)(Context, CancelFunc)与func WithTimeout(parent Context, timeout time.Duration)(Context, CancelFunc):两种方法都可以创建一个带有超时通知的Context具体对象timerCtx,具体差别在于传递绝对或相对时间。
  • func WithValue(parent Context, key, val interface{}) Context:创建valueCtx实例。

1、创建root context并构建一个WithCancel类型的上下文,使用该上下文注册一个goroutine模拟运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main(){
ctxa, cancel := context.WithCancel(context.Background())
go work(ctxa, "work1")
}

func work(ctx context.Context, name string){
for{
select{
case <-ctx.Done():
println(name," get message to quit")
return
default:
println(name," is running")
time.Sleep(time.Second)
}
}
}

2、使用WithDeadline包装ctxa,并使用新的上下文注册另一个goroutine:

1
2
3
4
func main(){
ctxb, _ := context.WithTimeout(ctxa, time.Second * 3)
go work(ctxb, "work2")
}

3、使用WithValue包装ctxb,并注册新的goroutine:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main(){
ctxc := context.WithValue(ctxb, "key", "custom value")
go workWithValue(ctxc, "work3")
}

func workWithValue(ctx context.Context, name string){
for{
select {
case <-ctx.Done():
println(name," get message to quit")
return
default:
value:=ctx.Value("key").(string)
println(name, " is running with value", value)
time.Sleep(time.Second)
}
}
}

4、最后在main函数中手动关闭ctxa,并等待输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main(){
time.Sleep(5*time.Second)
cancel()
time.Sleep(time.Second)
}

//运行程序并查看输出结果:
//work1 is running
//work3 is running with value custom value
//work2 is running
//work1 is running
//work2 is running
//work3 is running with value custom value
//work2 is running
//work3 is running with value custom value
//work1 is running
// //work2超时并通知work3退出
//work2 get message to quit
//work3 get message to quit
//work1 is running
//work1 is running
//work1 get message to quit

可以看到,当ctxb因超时而退出之后,会通知由他包装的所有子goroutine(ctxc),并通知退出。各context的关系结构如下:

Background() -> ctxa -> ctxd -> ctxc

源码分析

主要研究两个问题,即各Context如何保存父类和子类上下文;以及cancel方法如何实现通知子类context实现退出功能。

context的数据结构

1、emptyCtx只是一个uint类型的变量,其目的只是为了作为第一个goroutine ctx的parent,因此他不需要,也没法保存子类上下文结构。

2、cancelCtx的数据结构:

1
2
3
4
5
6
7
8
type cancelCtx struct {
Context

mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}

Context接口保存的就是父类的context。children map[canceler]struct{}保存的是所有直属与这个context的子类context。done chan struct{}用于发送退出信号。
我们查看创建cancelCtx的APIfunc WithCancel(…)…

1
2
3
4
5
6
7
8
9
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}

func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

propagateCancel函数的作用是将自己注册至parent context。我们稍后会讲解这个函数。

3、timerCtx的数据结构:

1
2
3
4
5
6
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.

deadline time.Time
}

timerCtx继承于cancelCtx,并为定时退出功能新增自己的数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
// 定时退出机制
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

timerCtx查看parent context的方法是timerCtx.cancelCtx.Context

4、valueCtx的数据结构:

1
2
3
4
type valueCtx struct {
Context
key, val interface{}
}

相较于timerCtx而言非常简单,没有继承于cancelCtx struct,而是直接继承于Context接口。

1
2
3
4
5
6
7
8
9
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}

辅助函数

两个疑问:

  • 第一,valueCtx为什么没有propagateCancel函数向parent context注册自己。既然没有注册,为何ctxb超时后能通知ctxc一起退出。

  • 第二,valueCtx是如何存储children和parent context结构的。相较于同样绑定Context接口的cancelCtx,valueCtx并没有children数据。

    第二个问题能解决一半第一个问题,即为何不向parent context注册。先说结论:valueCtx的children context注册在valueCtx的parent context上。函数func propagateCancel(…)负责注册信息,看一下构造:

func propagateCancel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}

if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

这个函数的主要逻辑如下:接收parent context 和 child canceler方法,若parent为emptyCtx,则不注册;否则通过funcparentCancelCtx寻找最近的一个*cancelCtx;若该cancelCtx已经结束,则调用child的cancel方法,否则向该cancelCtx注册child。

func parentCancelCtx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}

func parentCancelCtx从parentCtx中向上迭代寻找第一个cancelCtx并返回。从函数逻辑中可以看到,只有当parent.(type)为*valueCtx的时候,parent才会向上迭代而不是立即返回。否则该函数都是直接返回或返回经过包装的*cancelCtx。因此我们可以发现,valueCtx是依赖于parentCtx的*cancelCtx结构的。

第二个问题,事实上,parentCtx根本无需,也没有办法通过Done()方法通知valueCtx,valueCtx也没有额外实现Done()方法。可以理解为:valueCtx与parentCtx公用一个done channel,当parentCtx调用了cancel方法并关闭了done channel时,监听valueCtx的done channel的goroutine同样会收到退出信号。另外,当parentCtx没有实现cancel方法(如emptyCtx)时,可以认为valueCtx也是无法cancel的。

Ctx

func (c *cancelCtx) cancel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}

c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}

c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}

for child := range c.children {
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()

if removeFromParent {
removeChild(c.Context, c)
}
}

该方法的主要逻辑如下:

  • 若外部err为空,则代表这是一个非法的cancel操作,抛出panic;
  • 若cancelCtx内部err不为空,说明该Ctx已经执行过cancel操作,直接返回;
  • 关闭done channel,关联该Ctx的goroutine收到退出通知;
  • 遍历children,若有的话,执行child.cancel操作;
  • 调用removeChild将自己从parent context中移除。

func (c *timerCtx) cancel

与cancelCtx十分相似。