同时启很多个goroutine来完成一个任务,在一些必要的情况下如何跟踪或取消这些goroutine?常见的有下面几种方式:

  • WaitGroup,goroutine之间的同步
  • for select 加上 stop channel来监听消息管理协程
  • context包

context包就是用来在goroutine之间传递上下文信息的,它提供了超时timeout和取消cancel机制,利用了channel进行信息传递,方便的管理具有复杂层级关系的多个goroutine,这些复杂的层级关系类似于一棵树,可以有多个分叉。Go标准库中的net/http,database/sql等都用到了context包。

接口

context包定义了两个接口

  1. Context

    1
    2
    3
    4
    5
    6
    
    type Context interface {
    	Deadline() (deadline time.Time, ok bool)
    	Done() <-chan struct{}
    	Err() error
    	Value(key interface{}) interface{}
    }
    
  • Deadline 方法,第一个返回值是该上下文执行完的截止时间,第二个返回值是布尔值,表示是否设置了截止日期,没设置返回ok==false,否则为true。该方法幂等
  • Done,返回一个只读通道,在context被取消或者context到了deadline时,此通道会被关闭
    • WithCancel 上下文,当调用cancel后此通道关闭关闭
    • WithDeadline,到达deadline时间点后自动关闭此通道
    • WithTimeout,指定的时间超时后自动关闭此通道
  • Err,Done返回的通道未被关闭时,Err的返回值是nil,关闭后,返回 context 取消的原因,被取消时返回Canceled,到了截止时间返回 DeadlineExceeded
  • Value,获取该Context上绑定的值,是一个键值对。以上四个方法都是幂等的
  1. canceler ,私有接口,表示一个可以被取消cancel的context对象,包内的*cancelCtx*timerCtx结构体实现了此接口

    1
    2
    3
    4
    
    type canceler interface {
    	cancel(removeFromParent bool, err, cause error)
    	Done() <-chan struct{}
    }
    

创建

几个实现此接口的结构体关系:

1
2
3
4
graph TB
emptyCtx --"context.Withcancel()"--> cancelCtx
emptyCtx --"context.WithValue()"--> valueCtx
cancelCtx --"context.WithDeadline()\ncontext.WithTimeout()"--> timerCtx
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
	cause    error                 // set to non-nil by the first cancel call
}

type timerCtx struct {
	*cancelCtx
	timer *time.Timer // 定时器事件
	deadline time.Time // 时间点
}

type valueCtx struct {
	Context
	key, val any
}

goroutine之间的层级关系保存在cancelCtx.children这个map中。树状的goroutine层级关系有根节点,即context.Background()context.TODO(),这两者都返回一个空上下文,作为根上下文,且不可被取消。

新建上下文常用的四个方法:

  • func *WithCancel*(parent Context) (ctx Context, cancel CancelFunc) 取消goroutine
  • func *WithTimeout*(parent Context, timeout time.Duration) (Context, CancelFunc) 超时取消
  • func *WithDeadline*(parent Context, d time.Time) (Context, CancelFunc) 截止时间取消
  • func *WithValue*(parent Context, key, val any) Context 携带键值对

WithCancel在其父context的Done通道被关闭或者其返回的cancel方法被调用时,它的Done通道会被关闭,此时所有的goroutine会从Done()返回的通道中读到值而执行其他动作。

context.WithTimeout()和context.WithDeadline()基本一样,只是传递的参数形式不一样,一个是时间段,一个是时间点。在超时或者到达deadline时间后Done()返回的通道关闭,也可以提前调用cancel()方法提前关闭Done()通道

示例

示例1

① WithCancel 在3秒后主动调用cancel()方法取消子协程

② WithTimeout 3秒后到达超时时间,自动通知子协程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
	"context"
	"fmt"
	"log"
	"time"
)

func main() {
	log.Println("start main")
	// ctx, cancel := context.WithCancel(context.Background())  // ①
	ctx, _ := context.WithTimeout(context.Background(), time.Second*3)  // ②
	go task(ctx, "task1")
	go task(ctx, "task2")
	time.Sleep(time.Second * 3)
	// cancel()  // ①
	time.Sleep(time.Second)
	log.Println("main finished.")
}

func task(ctx context.Context, name string) {
	for true {
		select {
		case <-ctx.Done():
			log.Println(name, " 被取消")
			return
		default:
			fmt.Println(name, " 执行default任务中,没有可执行的任务则阻塞")
			time.Sleep(time.Second)
		}
	}
}

示例2

改变timeout的时间长短,比如从5s改为1s,context的上下文超时取消时间是会发生改变的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
	"context"
	"log"
	"time"
)

var timeout int64 = 5

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
	defer cancel()
	go task1(ctx)
	time.Sleep(time.Second * 13)
	log.Println("main goroutine finished")
}

func task1(ctx context.Context) {
	log.Println("start task")
	ctx1, _ := context.WithTimeout(ctx, time.Second*time.Duration(timeout))
	go subTask(ctx1, "task1")
	go subTask(ctx1, "task2")
}

func subTask(ctx context.Context, name string) {
	for {
		select {
		case <-time.After(time.Second * 10):
			log.Println(name, " finished")
			return
		case <-ctx.Done():
			log.Println(name, " canceled!", ctx.Err().Error())
			return
		}
	}
}

因为在新建context的时候,判断父deadline是否比新的deadline早,如果父deadline早则新传入的超时时间没用,这个超时取消时间还是父deadline指定的,否则就是新传入的的超时时间了。

1
2
3
4
5
6
7
8
9
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
  ......