WaitGroup使用与实现

背景

WaitGroup主要应用场景是等待一组goroutine完成,主goroutine使用Add来设置等待goroutine的数量,之后每个goroutine使用Done来表示它们已经运行完成,同时Wait可以用于阻塞,直到所有goroutine都已经使用了Done。简单来说WaitGroup就是用来解决并发-等待问题。

WaitGroup在被创建之后,不能被复制,这里有点绕,用代码来说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func main() {
var wg sync.WaitGroup
wg.Add(1)
wg2 := wg
wg2.Add(2)
go func() {
defer wg.Done()
log.Println("G1")
}()
go func() {
defer wg2.Done()
log.Println("G2")
}()
wg.Wait()
wg2.Wait()
}

/**
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc000116040?)
/usr/local/go/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0xc000114058?)
/usr/local/go/src/sync/waitgroup.go:116 +0x48
main.main()
/home/cola/code/goExample/concurrency/main.go:22 +0xef
//

使用

实现

WaitGroup结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type WaitGroup struct {
noCopy noCopy

state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}


//相关接口与结构体

// A Uint64 is an atomic uint64. The zero value is zero.
// atomic/type.go
type Uint64 struct {
_ noCopy
_ align64
v uint64
}

// Note that it must not be embedded, due to the Lock and Unlock methods.
///sync/cond.go
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
  • noCopy: 确保waitGroup唯一性, 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝;
  • state: 高32位用来存储计数器的值,低 32 位用于存储等待计数器的值。这个状态字段是原子操作的,可以确保在并发环境下对其进行安全的读写操作。这里的值是指Add方法中goroutine的数量。
  • sena: 用作信号量,用来实现Wait方法的阻塞功能,来等待所有goroutine完成。

Add

delta表示WaitGroup计数器,也就是goroutine数量;虽然它可以位负数,但是如果为负数就会引起panic。如果为零,则表示所有goroutine都已经Done(),那被wait的所有协程就会释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (wg *WaitGroup) Add(delta int) {
//这里检查是否开启数据竞争检测,
if race.Enabled {
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
/**

*/
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(&wg.sema))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}

WaitGroup使用与实现
http://example.com/2024/03/13/WaitGroup使用与实现/
Author
John Doe
Posted on
March 13, 2024
Licensed under