Go Concurrent Part.2 Pipeline

Go Concurrency Patterns: Pipelines and cancellation

首先,這篇主要以 pipeline 方式,將工作拆分成 N 個階段。

在第K個階段中使用 fan out pattern 產生多個 worker 運作

並且在接下來的階段中使用 fan in pattern 將資訊收集起來處理


但是在實際狀況,stage 不會每次都把所有資料接收完。

可能上個階段發生 error 提早結束,又或者下個階段不需要那麼多資料

問題1:如果 下游 stage 沒有成功消費完所有資料的話,會導致 上游 goroutine block, Resource Leak

解法:上游 outbound channel 使用 channel buffer。


問題2:上游 channel buffer 數量取決於下游狀況,所以當上個階段多送,或下個階段少接,還是會造成 block

解法:使用 Explicit cancellation(Done channel)


於是使用 Done Channel 告知個階段拋棄資料。(發送一次 Done 訊號 處理一個 潛在 Block)

問題3:下游接收者得要知道上游的潛在 Block 數量

解法:將 Done Channel defer close,用 closed channel 拿出來的 zero value 處理 N 個 潛在 Block

pipeline

1
2
3
4
5
func pipeline(inQueue <-chan int,out chan<- int) {
for in := range inQueue {
out <- in + 1
}
}
平方 Pipelines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)

// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9

for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}

Fan-out, fan-in

Fan-out

多個 func 可以從同個 channel 讀取資料,直到 channel 被關閉

是一種分派工作的方式

Fan-in

一個 func 可以讀取多個 input 並且處理,直到 多工 channel 被關閉

多工 channel : N to 1 channel,指所有 channel 被關閉才會關閉 多工 channel

Stopping short

問題: blocked goroutines
解法: outbound channel buffer

我們的 pipeline 有下面這些模式:

  • 當 stage 所有的傳送活動結束時,會將 輸出 channel 關閉
  • stage 會不停地接收 channel 的資料,直到 channel 被關閉

這個模式允許每個接收階段能夠被寫成 range loop 並且確保所有的 goroutines 在結束前足夠把所有東西送給下個階段。


但是實際狀況,stage 不會每次都把所有資料接收完。

Sometimes this is by design: the receiver may only need a subset of values to make progress.

更多的可能是,stage 提早結束,因為在前個 stage 發出了 error

另一種可能是 接收方不應該等待剩餘的資料到來,且我們希望前個 stage 不要丟不必要的資料給下個階段。


在我們範例中,如 stage 沒有成功消費完所有資料的話,會導致 goroutine block,Resource Leak

1
2
in := gen(2, 3)
<-merge(sq(in), sq(in))

我們需要整理 upstream stages,讓 pipeline 結束,即使 downstream stages 沒有接收完所有資料

一種方式就是讓 outbound channels 有 buffer。

這個修正了 blocked goroutine,但這是 bad code。

buffer size 取決於我們知道下個階段的狀況。

這是脆弱的設計。如果我們多丟幾資料,或者 downstream stage 讀少一點資料,我們會再度 block again。

Instead, we need to provide a way for downstream stages to indicate to the senders that they will stop accepting input.

Explicit cancellation

當主程式決定離開並且不接收所有資料,他必須告訴 upstream stages 的 goroutine 拋棄它們要送的資料。

他丟了兩次訊號,因為那邊有兩次潛在 block。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
in := gen(2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)

// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9

// Tell the remaining senders we're leaving.
done <- struct{}{}
done <- struct{}{}
}

The sending goroutines replace their send operation with a select statement that proceeds either when the send on out happens or when they receive a value from done.

The value type of done is the empty struct because the value doesn’t matter: it is the receive event that indicates the send on out should be abandoned.

The output goroutines continue looping on their inbound channel, c, so the upstream stages are not blocked.

(We’ll discuss in a moment how to allow this loop to return early.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... the rest is unchanged ...

這個改進還是有些問題:每一個下游接收者需要知道上游的潛在 block 量,且要安排提醒上游要 early return。

持續追蹤這個值是乏味且容易出錯


我們需要一個方式告訴 goroutines 停止送資料給他們的下游。

在 Go,我們可以把 channel 關閉,因為從關閉的 channel 讀資料不會 block 且會 yielding zero value。


這代表主程式可以 unblock 所有 senders,藉由簡單低關閉 done channel

這個關閉是很有效率低廣播訊號給 senders。

我們把所有的 pipeline func 套用上這個模式,接收 done 參數 且透過 defer 安排一個 close channel,因此,所有從主程式的 return paths 將會發結束訊息給 pipeline。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{}) // Marked
defer close(done) // Marked

in := gen(done, 2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)

// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9

// done will be closed by the deferred call.
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done() // Marked
for n := range c {
select {
case out <- n:
case <-done:
return // Marked
}
}
}
// ... the rest is unchanged ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out) // Marked
for n := range in {
select {
case out <- n * n:
case <-done:
return // Marked
}
}
}()
return out
}

Here are the guidelines for pipeline construction:

  • stages 關閉他們的輸出,當所有的傳送活動結束。
  • stages 持續接收資料,直到 channel 被關閉 或者 the senders are unblocked.