golang之协程复用

golang是原生支持高并发的语言,它的并发是通过协程实现的,对于上千个任务,我们一般会通过将其分割成多组任务后,对于每组任务,开启对应数目的协程来运行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"sync"

"github.com/petermattis/goid"
)

const batchSize = 10

func main() {
tasks := []int{}
for i := 0; i < 100; i++ {
tasks = append(tasks, i)
}
for j := 0; j < 100/batchSize; j++ {
RunTask1(tasks[batchSize*j : batchSize*(j+1)])
}
}

func RunTask1(tasks []int) {
wg := &sync.WaitGroup{}
wg.Add(len(tasks))
for _, task := range tasks {
go func(task int) {
fmt.Println("id: ", goid.Get(), " task: ", task)
wg.Done()
}(task)
}
wg.Wait()
}

上面就是一种实现高并发运行任务的方法,但是这种方法对协程的控制不够精细,每组扫描任务都会瞬间启动大量协程,然后逐渐关闭,并不是一个平滑的过程,通过top查看cpu占用时,会发现该程序运行的时候会周期性的大幅度变化,为了实现协程复用,我们采用如下使用channel和sync.WaitGroup相结合的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"sync"

"github.com/petermattis/goid"
)

const batchSize = 10

func main() {
tasks := []int{}
for i := 0; i < 100; i++ {
tasks = append(tasks, i)
}

RunTask2(tasks)
}

func RunTask2(tasks []int) {
wg := &sync.WaitGroup{}

//创建一个buffer为batchSize的channel
taskChan := make(chan int, batchSize)

// 创建batchSize个协程
for i := 0; i < batchSize; i++ {
go SubRunTask(taskChan, wg)
}

// 生产者,不断往taskChan Channel发送数据,直到Channel阻塞
for task := range tasks {
wg.Add(1)
taskChan <- task
}

close(taskChan)
}

func SubRunTask(taskChan chan int, wg *sync.WaitGroup) {
// 每个协程都从Channel读取数据然后开始运行任务
for task := range taskChan {
fmt.Println("id: ", goid.Get(), " task: ", task)
wg.Done()
}
}

以上方法通过生产者消费者模式实现协程的复用

可能有人会想,为什么不直接提前分成batchSize组,然后开启batchSize个协程,每个协程循环运行一组任务不就好了?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"sync"

"github.com/petermattis/goid"
)

const batchSize = 10

func main() {
tasks := []int{}
for i := 0; i < 100; i++ {
tasks = append(tasks, i)
}
RunTask3(tasks)
}

func RunTask3(tasks []int) {
wg := &sync.WaitGroup{}
groupSize := len(tasks)/ (batchSize - 1)
wg.Add(batchSize)
// 10个任务分成4四组,10/(4-1)=3,分成3、3、3、1
for i :=0; i < batchSize -1; i++{
go func(taskGroup []int){
for _,task := range taskGroup{
fmt.Println("id: ", goid.Get(), " task: ", task)
}
wg.Done()
}(tasks[GroupSize * i: GroupSize * (i+1)])
}
go func(taskGroup []int){
for _,task := range taskGroup{
fmt.Println("id: ", goid.Get(), " task: ", task)
}
wg.Done()
}(tasks[GroupSize * (batchSize -1):])
wg.Wait()
}

但是这样并不能高效应用所有协程,每个协程分配的任务可能是不均衡的,有些协程可能运行比较快,就把任务提前运行完了,提前运行完的协程并没有接到新的任务就结束了,白白浪费新开的资源。

而上面协程复用的方法可以源源不断地将信道中的任务送到每个协程中,每个协程可以不间断地进行任务的执行,从而高效利用协程的资源。