funcconsumer(id int, ch chanint, wg *sync.WaitGroup) { defer wg.Done() for { value, ok := <-ch if !ok { fmt.Printf("Consumer %d exits.\n", id) return } fmt.Printf("Consumer %d receives: %d\n", id, value) time.Sleep(200 * time.Millisecond) // 模拟消费过程 } }
funcmain() { ch := make(chanint, 3) var wg sync.WaitGroup wg.Add(1) go producer(100, ch, &wg) for i := 1; i < 4; i++ { wg.Add(1) go consumer(i, ch, &wg) } wg.Wait() }
type hchan struct { qcount uint// total data in the queue dataqsiz uint// size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint// send index recvx uint// receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
type waitq struct { first *sudog last *sudog } // /runtime/runtime2.go
type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops.
g *g
next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock.
acquiretime int64 releasetime int64 ticket uint32
// isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool
// success indicates whether communication over channel c // succeeded. It is true if the goroutine was awoken because a // value was delivered over channel c, and false if awoken // because c was closed. success bool
parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
// compiler checks this but be safe. if elem.Size_ >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign { throw("makechan: bad alignment") }
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.PtrBytes == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) }