# 协程池

在 ekit 里面我们支持了类似于线程池的协程池,也称为任务池。

在使用之前你需要引入:

import (
    "github.com/ecodeclub/ekit/pool"
)
1
2
3

使用例子:

func ExampleNewOnDemandBlockTaskPool() {
	p, _ := pool.NewOnDemandBlockTaskPool(10, 100)
	_ = p.Start()
	// wg 只是用来确保任务执行的,你在实际使用过程中是不需要的
	var wg sync.WaitGroup
	wg.Add(1)
	_ = p.Submit(context.Background(), TaskFunc(func(ctx context.Context) error {
		fmt.Println("hello, world")
		wg.Done()
		return nil
	}))
	wg.Wait()
	// Output:
	// hello, world
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

再次强调,在上面我们的例子里面,变量 wg 只是我们用来协调例子的,你使用的时候是不需要的。

你可以在调用pool.NewOnDemandBlockTaskPool的时候传入一些选项:

- WithMaxGo: 最大协程数量
- WithCoreGo: 核心协程数量
- WithMaxIdleTime: 协程的最大空闲时间
- WithQueueBacklogRate: 队列积压率
1
2
3
4

这是因为我们使用了以下参数来控制协程池的行为:

  • initGo:初始协程数
  • coreGo:核心协程数
  • maxGo:最大协程数
  • queueBacklogRate: 队列积压率,取值在 [0, 1] 之间
  • maxIdleTime:最大空闲时间

协程池在不同的情况下,会决定是否启用一个新的协程:

  • 在协程池调用 Start() 的时候就会创建出来 initGo 个协程
  • 随着任务不断提交,当协程数量达到 coreGo 之前,如果此时没有空闲协程,那么协程池就会创建一个新的协程
  • 当协程数量到达了 coreGo 之后,任务会先放在队列之中
  • 如果队列中的任务堆积太多,达到了 queueBacklogRate 设定的阈值,那么协程池会创建一个新的协程

关闭协程的过程是一个相反过程:

  • 如果此时协程数量超过 coreGo,而且队列中没有任务,那么协程会直接退出
  • 如果此时协程数量在 (initGo, coreGo] 之间,那么当协程空闲时间超过 maxIdleTime 的时候,协程会退出
  • 当协程数量在 (0, initGo] 之间,这些协程永远不会退出

# 可观测性

TaskPool 提供了可观测性的接口,暴露了协程池本身的一些状态。使用起来也非常简单:

    // p 是你创建的一个协程池
	ch, err := p.States(ctx, time.Second*10)
	if err == nil {
		fmt.Println("get ch")
	}
	for state := range ch {
	    fmt.Println(state)	
    }
1
2
3
4
5
6
7
8

注意,当你调用 States 准备观测协程池的状态之时,你需要传入两个参数:

  • context.Context:你可以通过该参数来控制观测。例如在前面的例子里面,ctx 如果被取消了,或者超时了,那么 ch 就会被关闭。这意味着你后面再也拿不到状态了。
  • time.Duration:采集间隔。例如在前面的例子中,我们会每隔十秒采集一次协程池的状态,并且将状态发送到 ch 上。

在采集状态的时候,如果我们发送状态到 ch 失败,那么就会丢弃本次采集的状态。这往往意味着你并没有创建消费者消费 ch 中的数据,或者你消费得太慢。

此外,当协程池彻底被关闭之后,我们也不会再采集协程池的状态。不过不管是因为 ctx,还是因为协程池被关闭,在退出采集状态循环之前,我们都会立刻上报一次最后的状态。

但是,针对任务自身的观测,你可以考虑使用装饰器模式来实现 Task 接口,而后在装饰器里面自己进行观测。我们认为针对具体任务的观测是一个比较多样化的场景,而且从职责划分的角度来说,也应该是 Task 的具体实现去负责,而不是我们来观测。

# 实践建议

任何时候都要记住,你的任务调度最好是不依赖于协程池管理协程的任何细节。

# initGo, coreGo 和 maxGo 的设置

一般情况下,你可以直接使用默认参数。当你希望修改这些参数的时候,你要考虑以下问题:

  • 如果三个参数的值都相等,那么意味着你希望使用固定个协程来处理你的任务。这些协程将会在 Start 方法调用的时候创建好,并且不会再退出;
  • maxGo 意味着你所能容忍的这个协程池所能占据的最多的协程数量

# 协程池隔离

一般的做法是:

  • 你会有一个全局的协程池。不重要的业务产生的任务都提交到这个协程池
  • 重要业务都是独享一个协程池,以避免相互之间影响
  • 要注意所有的协程池的 maxGo 的设计,并且考虑系统是否有足够的资源支持所有的协程池都满负荷运行

# 队列大小

如果你的任务数量波动比较大,那么应该把队列设置得比较大一些。

例如,在业务高峰期可能会短时间提交很多任务,但是你能容忍这些任务在后面慢慢运行完成,那么你就需要把队列设置得大一些。

而如果你的系统需要很多内存,那么你应该将队列设置小一些,以节省内存。

# queueBacklogRate

这个参数我们不建议你修改,大多数时候我们希望协程池中的协程数量保持稳定,不会出现突然创建协程,然后又退出的情况。

你只有在有确凿把握的时候再去调整它。并且,将来我们会考虑优化这个参数。