Channel使用与实现

背景

channel之前使用的不多,虽然也看过源码和一些使用案例,但是在使用中还是会犯错。这里简单记录下channel使用和一些源码实现。

使用

Go中经常提到的一句话就是不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。也就是使用CSP并发模型。Goroutine之间使用channel传递。

例子1 通过channel传递信息

先来理解一下CSP模型,这里设置两个Goroutine,分别为A和B,A通过Goroutine向B传递信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func A(context int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- context
}
func B(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("consumer:", <-ch)
}

func main() {
ch := make(chan int)
var wg sync.WaitGroup
wg.Add(2)
go A(10, ch, &wg)
go B(ch, &wg)
wg.Wait()
}

例子2 channel先进先出原则

channel遵守先进先出原则,在使用上可以看作是一个队列,具体规则如下:

  • 先从 Channel 读取数据的 Goroutine 会先接收到数据;
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;

通过消费者和生产者来了解这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package main

import (
"fmt"
"sync"
"time"
)

func producer(n int, ch chan int, wg *sync.WaitGroup) {
for i := 1; i < n; i++ {

fmt.Printf("Producer sends: %d\n", i)
ch <- i
time.Sleep(100 * time.Millisecond) // 模拟生产过程
}
wg.Done()
close(ch)

}

func consumer(id int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
value, ok := <-ch
if !ok {
fmt.Printf("Consumer %d exits.\n", id)
return
}
fmt.Printf("Consumer %d receives: %d\n", id, value)
time.Sleep(200 * time.Millisecond) // 模拟消费过程
}
}

func main() {
ch := make(chan int, 3)
var wg sync.WaitGroup
wg.Add(1)
go producer(100, ch, &wg)
for i := 1; i < 4; i++ {
wg.Add(1)
go consumer(i, ch, &wg)
}
wg.Wait()
}

/**
Producer sends: 1
Consumer 3 receives: 1
Producer sends: 2
Consumer 1 receives: 2
Producer sends: 3
Consumer 2 receives: 3
Producer sends: 4
Consumer 3 receives: 4
Producer sends: 5
Consumer 1 receives: 5
Producer sends: 6
Consumer 2 receives: 6
Producer sends: 7
Consumer 3 receives: 7
Producer sends: 8
Consumer 1 receives: 8
Producer sends: 9
Consumer 2 receives: 9
Producer sends: 10
Consumer 3 receives: 10
........

Consumer 3 exits.
Consumer 1 exits.
Consumer 2 exits.
*/

这里看三个Consumer,可以看到第一个消费是Consumer3,所以按照先进先出原则第一个退出的也是Consumer3。

例子3 保证Goroutine顺序性

这里启动三个Goroutine,按照数组中顺序输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import (
"fmt"
"log"
"sync"
)

func First(ch chan bool, wg *sync.WaitGroup) {
defer wg.Done()
log.Println("First")
<-ch //阻塞当前的 goroutine
}

func Second(ch chan bool, wg *sync.WaitGroup) {
defer wg.Done()
log.Println("Second")
<-ch //阻塞当前的 goroutine
}

func Third(ch chan bool, wg *sync.WaitGroup) {
defer wg.Done()
log.Println("Third")
<-ch //阻塞当前的 goroutine

}

func main() {

var wg sync.WaitGroup
arr := []int{1, 2, 3}
ch := make(chan bool)
for j := 0; j < 100; j++ {
fmt.Println()
for i := 0; i < len(arr); i++ {
switch arr[i] {
case 1:
wg.Add(1)
go First(ch, &wg)
case 2:
wg.Add(1)
go Second(ch, &wg)
case 3:
wg.Add(1)
go Third(ch, &wg)
default:
log.Println("Error")
}
ch <- true
}
}

wg.Wait()
}

这里使用了一个同步channel,保证顺序性。

例子4 利用channel实现wait/notify:

chan类型有一个特点,就是如果chan为空,那receiver接受数据的时候就会阻塞当前协程,直到channel有了数据或者有新数据到来。通过这个机制可以实现wait/notify 的设计模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func MySQL(ch chan struct{}) {
log.Println("closing MySQL")
<-ch
log.Println("MySQL closed")
}

func main() {
signCh := make(chan struct{})
go MySQL(signCh)
time.Sleep(3 * time.Second)
log.Println("Sending notification to MySQL")
close(signCh)
time.Sleep(3 * time.Second)
}

channel应用场景

通过之前的例子,这里可以总结下,channel常用场景:

  • 消息传递
  • 数据传递
  • 信号通知

channel类型

channel算是一个用于同步和通信的有锁队列,但是锁导致的休眠和唤醒带来的上下文切换成本太高, 如果临界区过大,加锁解锁操作就会造成性能瓶颈。Go社区在2014年提出了无锁Channel实现,该方案将channel分为三种:

  • 同步channel: 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
  • 异步channel: 基于环形缓存的传统生产者消费者模型;
  • chan struct{}: 也是一种异步channel,struct{}类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;

channel实现

channel 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// /runtime/chan.go
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
debugChan = false
)

type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}

type waitq struct {
first *sudog
last *sudog
}
// /runtime/runtime2.go

type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.

g *g

next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

acquiretime int64
releasetime int64
ticket uint32

// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool

// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool

parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

channel底层是一个循环队列,结合之前的例子,提到过channel是一个有锁循环队列。这里简单说下循环队列(Circular Queue),循环队列基于数组实现,具有固定大小缓冲区。

Circular Queue

channel创建

创建channel一共有三个函数:

  • reflect_makechan
  • makechan64
  • makechan

reflect_makechan是提供给relect库使用的,而makechan64则是int64类型,这两个不需要关心。主要关心makechan:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func makechan(t *chantype, size int) *hchan {
elem := t.Elem

// compiler checks this but be safe.
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}

mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}

参考


Channel使用与实现
http://example.com/2024/03/13/Channel使用与实现/
Author
John Doe
Posted on
March 13, 2024
Licensed under