errgroup-新方法解析

可能是全网第一篇?

前言

前几天在用 errgroup 的时候发现了新的 api ,一看是 6月1号 才更新的。那这就来试试抢个首发解析新 api 吧

正文

新加了哪些方法?

  • SetLimit(n int)
  • TryGo(f func() error) bool

有什么用?

SetLimit

之前用 errgroup 的时候没有提供 SetLimit 来限制并发数,所以在写业务逻辑的时候就只能先把任务分批,然后再一批批地放进一个 errgroup 里面执行。但现在加了 SetLimit 了,就可以先调用这个来设置最大并发数,然后再无脑使用 Go() 来把所有任务放进来,在循环完成后再使用 Wait 来看这个任务是不是都成功了

TryGo

这个应该是和 SetLimit 配套的一个方法,和 Go 方法的区别是,如果 errgroup 有并发限制,并且现在已经到了并发上限的话,那么就会返回 false 不执行传入的任务。这个设计应该是和 go 1.18 带来的 Mutex.TryLock 相似,因为有需求,所以就加了。之后再看看邮件列表,看大佬们对这个 api 有啥讨论。

有哪些变动?

Go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

type token struct{}

type Group struct {
cancel func()

wg sync.WaitGroup

sem chan token

errOnce sync.Once
err error
}

func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}

g.wg.Add(1)
go func() {
defer g.done()

if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}

func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}

让我们从熟悉的 Go 方法看起,这里和以前的区别是先判断了当前的 errgroup 有没有并发限制的 channel sem,如果有的话那么就往这个 channel 里发送一个空的结构体用于抢占资源,相当于信号量的概念。之后就是我们熟悉的处理流程了,执行函数,如果 err != nil 那么就记录下这个 err 并且 cancel 掉这个 errgroupcontext

值得注意的是新增的这个 done 方法,之前是直接调用 wg.Done ,但是现在因为有了并发数的限制,所以在一个任务处理完之后还得判断当前的 errgroup 是不是有并发限制,如果有的话就把之前抢到的一个并发的机会给释放掉 <- g.sem,让其他的方法能够继续去抢并发的机会

看到了这里,那这个 g.sem 是怎么来的呢?答案是来自新提供的方法 SetLimit

SetLimit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}

可以看到,即使 errgroup 不是从 WithContext 创建的,也可以设置最大并发数。

  • 如果 n<0 那么就认为不设限,那么就把当前的 channel 置空。
  • 如果 len(g.sem) != 0 那么就说明当前还有任务在执行中,这个时候 errgroup 的处理方式是直接 panic ,这个在注释里面也提到了

The limit must not be modified while any goroutines in the group are active

然后值得注意的是,如果 SetLimit = 0 那么就会陷入死锁。因为 channel 的长度为 0,此时不会有方法能够抢到并发的机会。即使用这种骚操作,最开始开起来的那个协程也会永久阻塞掉,相当于发生了泄露

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 骚操作
func main() {
g := errgroup.Group{}
g.SetLimit(0)

go func() {
time.Sleep(time.Second)
g.SetLimit(1)
}()

go func() {
time.Sleep(time.Second * 4)
fmt.Println(g.Wait())
}()

go func() {
g.Go(func() error {
return errors.New("不可能的 err")
})
}()

time.Sleep(time.Second * 2)
g.Go(func() error {
return errors.New("会返回的 err")
})
time.Sleep(time.Second * 5)
}

// 最后只会输出 **会返回的 err**

TryGo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}

g.wg.Add(1)
go func() {
defer g.done()

if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
return true
}

这个就比较好懂了,先判断当前是不是限制了并发数,如果限制了那么就用

select case default 这种形式做一次尝试抢并发机会的逻辑,如果抢到了的话就会走和 Go 抢到了锁后相同的逻辑

怎么用?

这里直接看 test 文件里的测试样例就好

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func TestTryGo(t *testing.T) {
g := &errgroup.Group{}
n := 42
g.SetLimit(42)
ch := make(chan struct{})
fn := func() error {
ch <- struct{}{}
return nil
}
for i := 0; i < n; i++ {
if !g.TryGo(fn) {
t.Fatalf("TryGo should succeed but got fail at %d-th call.", i)
}
}
if g.TryGo(fn) {
t.Fatalf("TryGo is expected to fail but succeeded.")
}
go func() {
for i := 0; i < n; i++ {
<-ch
}
}()
g.Wait()

if !g.TryGo(fn) {
t.Fatalf("TryGo should success but got fail after all goroutines.")
}
go func() { <-ch }()
g.Wait()

// Switch limit.
g.SetLimit(1)
if !g.TryGo(fn) {
t.Fatalf("TryGo should success but got failed.")
}
if g.TryGo(fn) {
t.Fatalf("TryGo should fail but succeeded.")
}
go func() { <-ch }()
g.Wait()

// Block all calls.
g.SetLimit(0)
for i := 0; i < 1<<10; i++ {
if g.TryGo(fn) {
t.Fatalf("TryGo should fail but got succeded.")
}
}
g.Wait()
}

func TestGoLimit(t *testing.T) {
const limit = 10

g := &errgroup.Group{}
g.SetLimit(limit)
var active int32
for i := 0; i <= 1<<10; i++ {
g.Go(func() error {
n := atomic.AddInt32(&active, 1)
if n > limit {
return fmt.Errorf("saw %d active goroutines; want ≤ %d", n, limit)
}
time.Sleep(1 * time.Microsecond) // Give other goroutines a chance to increment active.
atomic.AddInt32(&active, -1)
return nil
})
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}
  • TestTryGo 里面展示了各种情况下(包括调用 SetLimit 设置并发与否)应该得到的是 true or false
  • TestGoLimit 里面则简单一点,只是用于验证会不会有超过预期数量的并发

具体的示例

来点具体的示例吧, 并发处理 x 个任务,最大并发为 4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"log"

"golang.org/x/sync/errgroup"
)

func main() {
const limit = 4
tasks := make([]int, 20)
g := errgroup.Group{}

doSomeThing := func(task int) error {
if task > 5 && task%5 == 0 {
return fmt.Errorf("%d task err", task)
}
return nil
}

for i := 0; i+limit < len(tasks); i += limit {
j := i + limit
if len(tasks) < j {
j = len(tasks)
}
for task := i; task < j; task++ {
task := task
g.Go(func() error {
return doSomeThing(task)
})
}

err := g.Wait()
if err != nil {
log.Fatal(err)
}
}
}

我们之前用 errgroup 但是又想限制并发数的话,这是个比较简单的写法。但是就真的是批处理,最大并发为 4 ,这一批四个任务,全部走完了才能走下一批。如果要写能够实时填充的任务池的话,其实相对而言写起来更简单,但是对业务逻辑的侵入比较明显

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"log"

"golang.org/x/sync/errgroup"
)

func main() {
const limit = 4
tasks := make([]int, 20)
g := errgroup.Group{}

doSomeThing := func(task int) error {
if task > 5 && task%5 == 0 {
return fmt.Errorf("%d task err", task)
}
return nil
}

workChan := make(chan struct{}, limit)
for task := range tasks {
task := task
workChan <- struct{}{}
g.Go(func() error {
return doSomeThing(task)
})
<-workChan
}
err := g.Wait()
if err != nil {
log.Fatal(err)
}
}

明显要清爽一些了,但是得创建一个用于限制并发数的 channel ,对于看业务逻辑而言不太友好。毕竟主要是看你的业务逻辑是怎么样的,而不是看你怎么写并发和同步的

用新版的来改造的话就用着简单,看着也很清爽

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"log"

"golang.org/x/sync/errgroup"
)

func main() {
const limit = 4
tasks := make([]int, 20)
g := errgroup.Group{}
g.SetLimit(limit)

doSomeThing := func(task int) error {
if task > 5 && task%5 == 0 {
return fmt.Errorf("%d task err", task)
}
return nil
}

for task := range tasks {
task := task
g.Go(func() error {
return doSomeThing(task)
})
}

err := g.Wait()
if err != nil {
log.Fatal(err)
}
}

这样别人看你代码的时候能够直接看到你的业务逻辑,而限制并发数只是一个调用而已,对 CR 的同学而言知道了这个用法就行,不用再 Review 你的并发限制写得有没有问题了。

参考


errgroup-新方法解析
https://www.yikakia.com/errgroup-新方法解析/
作者
Yika
发布于
2022年7月17日
许可协议