更复杂的场景如何做并发控制呢?比如子协程中开启了新的子协程,或者需要同时控制多个子协程。这种场景下,select+chan
的方式就显得力不从心了。Go 语言提供了 Context 标准库可以解决这类场景的问题,Context 的作用和它的名字很像,上下文,即子协程的下上文。Context 有两个主要的功能:
- 通知子协程退出(正常退出,超时退出等);
- 传递必要的参数。
1.context.WithCancel
select+chan
定时轮询
var stop chan bool
func reqTask(name string) {
for {
select {
case <-stop:
fmt.Println("stop", name)
return
default:
fmt.Println(name, "send ing")
time.Sleep(time.Second)
}
}
}
func main() {
stop = make(chan bool)
fmt.Println(stop)
go reqTask("work1")
time.Sleep(3 * time.Second)
stop <- true
fmt.Println("准备结束")
time.Sleep(3 * time.Second)
}
/* 输出:
work1 send ing
work1 send ing
work1 send ing
准备结束
stop work1
*/
在上述基础上使用context.WithCancel
context.WithCancel()
创建可取消的 Context 对象,即可以主动通知子协程退出。
控制单个协程
context.Backgroud()
创建根 Context,通常在 main 函数、初始化和测试代码中创建,作为顶层 Context。context.WithCancel(parent)
创建可取消的子 Context,同时返回函数cancel
。- 在子协程中,使用 select 调用
<-ctx.Done()
判断是否需要退出。 - 主协程中,调用
cancel()
函数通知子协程退出。
func reqTask(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println("stop", name)
return
default:
fmt.Println(name, "send ing")
time.Sleep(time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go reqTask(ctx, "work2")
time.Sleep(3 * time.Second)
cancel()
fmt.Println("准备结束")
time.Sleep(3 * time.Second)
}
/*输出:
work2 send ing
work2 send ing
work2 send ing
准备结束
stop work2
*/
控制多个协程
只需要多启用协程即可
func main() {
ctx, cancel := context.WithCancel(context.Background())
//多协程示例
go reqTask(ctx, "work2")
go reqTask(ctx, "work3")
go reqTask(ctx, "work4")
time.Sleep(3 * time.Second)
cancel()
fmt.Println("准备结束")
time.Sleep(3 * time.Second)
}
/*输出:
work4 send ing
work2 send ing
work3 send ing
work2 send ing
work3 send ing
work4 send ing
work2 send ing
work3 send ing
work4 send ing
准备结束
stop work4
stop work3
stop work2
*/
2.context.WithValue
如果需要往子协程中传递参数,可以使用 context.WithValue()
。
context.WithValue()
创建了一个基于ctx
的子 Context,并携带了值options
。- 在子协程中,使用
ctx.Value("options")
获取到传递的值,读取/修改该值。
type Target struct {
name string
}
func reqTask(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println("stop", name)
return
default:
//Value获取的是any 接口型值 需要.(*Target)类型断言 将v获取为*Target类型
v := ctx.Value("name").(*Target)
fmt.Println(name, "send ing--", v.name)
time.Sleep(time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
testV1 := &Target{"小张"}
testV2 := &Target{"小李"}
vCtx1 := context.WithValue(ctx, "name", testV1)
vCtx2 := context.WithValue(ctx, "name", testV2)
go reqTask(vCtx1, "work2")
go reqTask(vCtx2, "work3")
time.Sleep(3 * time.Second)
cancel()
fmt.Println("准备结束")
time.Sleep(3 * time.Second)
}
/*
work3 send ing-- 小李
work2 send ing-- 小张
work3 send ing-- 小李
work2 send ing-- 小张
work2 send ing-- 小张
work3 send ing-- 小李
work2 send ing-- 小张
准备结束
stop work3
stop work2
*/
3.context.WithTimeout
如果需要控制子协程的执行时间,可以使用 context.WithTimeout
创建具有超时通知机制的 Context 对象
因为超时时间设置为 2s,但是 main 函数中,3s 后才会调用 cancel()
,因此,在调用 cancel()
函数前,子协程因为超时已经退出了。
type Target struct {
name string
}
func reqTask(ctx context.Context, name string) {
i := 0
for {
i++
select {
case <-ctx.Done():
fmt.Println("stop", name)
return
default:
//Value获取的是any 接口型值 需要.(*Target)类型断言 将v获取为*Target类型
time.Sleep(time.Second)
v := ctx.Value("name").(*Target)
fmt.Println(name, "send ing--", v.name, "第:", i, "秒")
}
}
}
func main() {
//ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
testV1 := &Target{"小张"}
testV2 := &Target{"小李"}
vCtx1 := context.WithValue(ctx, "name", testV1)
vCtx2 := context.WithValue(ctx, "name", testV2)
go reqTask(vCtx1, "work2")
go reqTask(vCtx2, "work3")
time.Sleep(3 * time.Second)
fmt.Println("准备结束")
cancel()
time.Sleep(3 * time.Second)
}
/*
work3 send ing-- 小李 第: 1 秒
work2 send ing-- 小张 第: 1 秒
work2 send ing-- 小张 第: 2 秒
stop work2
work3 send ing-- 小李 第: 2 秒
stop work3
准备结束
*/
4.context.WithDeadline
超时退出可以控制子协程的最长执行时间,那 context.WithDeadline()
则可以控制子协程的最迟退出时间
WithDeadline
用于设置截止时间。在这个例子中,将截止时间设置为2s后,cancel()
函数在 3s 后调用,因此子协程将在调用cancel()
函数前结束。- 在子协程中,可以通过
ctx.Err()
获取到子协程退出的错误原因。
type Target struct {
name string
}
func reqTask(ctx context.Context, name string) {
i := 0
for {
i++
select {
case <-ctx.Done():
fmt.Println("stop", name, "退出错误显示:", ctx.Err())
return
default:
//Value获取的是any 接口型值 需要.(*Target)类型断言 将v获取为*Target类型
time.Sleep(time.Second)
v := ctx.Value("name").(*Target)
fmt.Println(name, "send ing--", v.name, "第:", i, "秒")
}
}
}
func main() {
//ctx, cancel := context.WithCancel(context.Background())
//ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
testV1 := &Target{"小张"}
testV2 := &Target{"小李"}
vCtx1 := context.WithValue(ctx, "name", testV1)
vCtx2 := context.WithValue(ctx, "name", testV2)
go reqTask(vCtx1, "work2")
go reqTask(vCtx2, "work3")
time.Sleep(3 * time.Second)
fmt.Println("准备结束")
cancel()
time.Sleep(3 * time.Second)
}
/*
work3 send ing-- 小李 第: 1 秒
work2 send ing-- 小张 第: 1 秒
work2 send ing-- 小张 第: 2 秒
stop work2 退出错误显示: context deadline exceeded
work3 send ing-- 小李 第: 2 秒
stop work3 退出错误显示: context deadline exceeded
准备结束
*/
context deadline exceeded 因为截止时间到了而退出