0%

使用Context控制gorutine并发

更复杂的场景如何做并发控制呢?比如子协程中开启了新的子协程,或者需要同时控制多个子协程。这种场景下,select+chan的方式就显得力不从心了。Go 语言提供了 Context 标准库可以解决这类场景的问题,Context 的作用和它的名字很像,上下文,即子协程的下上文。Context 有两个主要的功能:

  • 通知子协程退出(正常退出,超时退出等);
  • 传递必要的参数。

1.context.WithCancel

select+chan定时轮询

var stop chan bool

func reqTask(name string) {
   for {
      select {
      case <-stop:
         fmt.Println("stop", name)
         return
      default:
         fmt.Println(name, "send ing")
         time.Sleep(time.Second)
      }
   }
}

func main() {
   stop = make(chan bool)
   fmt.Println(stop)
   go reqTask("work1")
   time.Sleep(3 * time.Second)
   stop <- true
   fmt.Println("准备结束")
   time.Sleep(3 * time.Second)
}
/* 输出:
work1 send ing
work1 send ing
work1 send ing
准备结束
stop work1
*/

在上述基础上使用context.WithCancel

context.WithCancel() 创建可取消的 Context 对象,即可以主动通知子协程退出。

控制单个协程

  • context.Backgroud() 创建根 Context,通常在 main 函数、初始化和测试代码中创建,作为顶层 Context。
  • context.WithCancel(parent) 创建可取消的子 Context,同时返回函数 cancel
  • 在子协程中,使用 select 调用 <-ctx.Done() 判断是否需要退出。
  • 主协程中,调用 cancel() 函数通知子协程退出。
func reqTask(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("stop", name)
			return
		default:
			fmt.Println(name, "send ing")
			time.Sleep(time.Second)
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	go reqTask(ctx, "work2")
	time.Sleep(3 * time.Second)
	cancel()
	fmt.Println("准备结束")
	time.Sleep(3 * time.Second)
}
/*输出:
work2 send ing
work2 send ing
work2 send ing
准备结束
stop work2
*/

控制多个协程

只需要多启用协程即可

func main() {
	ctx, cancel := context.WithCancel(context.Background())
    //多协程示例
	go reqTask(ctx, "work2")
	go reqTask(ctx, "work3")
	go reqTask(ctx, "work4")
	time.Sleep(3 * time.Second)
	cancel()
	fmt.Println("准备结束")
	time.Sleep(3 * time.Second)
}
/*输出:
work4 send ing
work2 send ing
work3 send ing
work2 send ing
work3 send ing
work4 send ing
work2 send ing
work3 send ing
work4 send ing
准备结束
stop work4
stop work3
stop work2
*/

2.context.WithValue

如果需要往子协程中传递参数,可以使用 context.WithValue()

  • context.WithValue() 创建了一个基于 ctx 的子 Context,并携带了值 options
  • 在子协程中,使用 ctx.Value("options") 获取到传递的值,读取/修改该值。
type Target struct {
	name string
}

func reqTask(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("stop", name)
			return
		default:
			//Value获取的是any 接口型值 需要.(*Target)类型断言 将v获取为*Target类型
			v := ctx.Value("name").(*Target)
			fmt.Println(name, "send ing--", v.name)
			time.Sleep(time.Second)
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	testV1 := &Target{"小张"}
	testV2 := &Target{"小李"}
	vCtx1 := context.WithValue(ctx, "name", testV1)
	vCtx2 := context.WithValue(ctx, "name", testV2)
	go reqTask(vCtx1, "work2")
	go reqTask(vCtx2, "work3")
	time.Sleep(3 * time.Second)
	cancel()
	fmt.Println("准备结束")
	time.Sleep(3 * time.Second)
}

/*
work3 send ing-- 小李
work2 send ing-- 小张
work3 send ing-- 小李
work2 send ing-- 小张
work2 send ing-- 小张
work3 send ing-- 小李
work2 send ing-- 小张
准备结束  
stop work3
stop work2
*/

3.context.WithTimeout

如果需要控制子协程的执行时间,可以使用 context.WithTimeout 创建具有超时通知机制的 Context 对象

因为超时时间设置为 2s,但是 main 函数中,3s 后才会调用 cancel(),因此,在调用 cancel() 函数前,子协程因为超时已经退出了。

type Target struct {
	name string
}

func reqTask(ctx context.Context, name string) {
	i := 0
	for {
		i++
		select {
		case <-ctx.Done():
			fmt.Println("stop", name)
			return
		default:

			//Value获取的是any 接口型值 需要.(*Target)类型断言 将v获取为*Target类型
			time.Sleep(time.Second)
			v := ctx.Value("name").(*Target)
			fmt.Println(name, "send ing--", v.name, "第:", i, "秒")
		}
	}
}

func main() {
	//ctx, cancel := context.WithCancel(context.Background())
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	testV1 := &Target{"小张"}
	testV2 := &Target{"小李"}
	vCtx1 := context.WithValue(ctx, "name", testV1)
	vCtx2 := context.WithValue(ctx, "name", testV2)
	go reqTask(vCtx1, "work2")
	go reqTask(vCtx2, "work3")
	time.Sleep(3 * time.Second)
	fmt.Println("准备结束")
	cancel()
	time.Sleep(3 * time.Second)
}
/*
work3 send ing-- 小李 第: 1 秒
work2 send ing-- 小张 第: 1 秒
work2 send ing-- 小张 第: 2 秒
stop work2                     
work3 send ing-- 小李 第: 2 秒
stop work3                     
准备结束
*/

4.context.WithDeadline

超时退出可以控制子协程的最长执行时间,那 context.WithDeadline() 则可以控制子协程的最迟退出时间

  • WithDeadline 用于设置截止时间。在这个例子中,将截止时间设置为2s后,cancel() 函数在 3s 后调用,因此子协程将在调用 cancel() 函数前结束。
  • 在子协程中,可以通过 ctx.Err() 获取到子协程退出的错误原因。
type Target struct {
	name string
}

func reqTask(ctx context.Context, name string) {
	i := 0
	for {
		i++
		select {
		case <-ctx.Done():
			fmt.Println("stop", name, "退出错误显示:", ctx.Err())
			return
		default:

			//Value获取的是any 接口型值 需要.(*Target)类型断言 将v获取为*Target类型
			time.Sleep(time.Second)
			v := ctx.Value("name").(*Target)
			fmt.Println(name, "send ing--", v.name, "第:", i, "秒")
		}
	}
}

func main() {
	//ctx, cancel := context.WithCancel(context.Background())
	//ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
	testV1 := &Target{"小张"}
	testV2 := &Target{"小李"}
	vCtx1 := context.WithValue(ctx, "name", testV1)
	vCtx2 := context.WithValue(ctx, "name", testV2)
	go reqTask(vCtx1, "work2")
	go reqTask(vCtx2, "work3")
	time.Sleep(3 * time.Second)
	fmt.Println("准备结束")
	cancel()
	time.Sleep(3 * time.Second)
}

/*
work3 send ing-- 小李 第: 1 秒
work2 send ing-- 小张 第: 1 秒
work2 send ing-- 小张 第: 2 秒
stop work2 退出错误显示: context deadline exceeded
work3 send ing-- 小李 第: 2 秒                    
stop work3 退出错误显示: context deadline exceeded
准备结束
*/

context deadline exceeded 因为截止时间到了而退出

-------------本文结束感谢您的阅读-------------
打赏一瓶矿泉水