Go Concurrent Part.2 Pipeline
Go Concurrency Patterns: Pipelines and cancellation
首先,這篇主要以 pipeline 方式,將工作拆分成 N 個階段。
在第K個階段中使用 fan out pattern
產生多個 worker 運作
並且在接下來的階段中使用 fan in pattern
將資訊收集起來處理
但是在實際狀況,stage 不會每次都把所有資料接收完。
可能上個階段發生 error 提早結束,又或者下個階段不需要那麼多資料
問題1:如果 下游 stage
沒有成功消費完所有資料的話,會導致 上游 goroutine block, Resource Leak
解法:上游 outbound channel 使用 channel buffer。
問題2:上游 channel buffer 數量
取決於下游狀況,所以當上個階段多送,或下個階段少接,還是會造成 block
解法:使用 Explicit cancellation
(Done channel)
於是使用 Done Channel 告知個階段拋棄資料。(發送一次 Done 訊號
處理一個 潛在 Block
)
問題3:下游接收者得要知道上游的潛在 Block 數量
解法:將 Done Channel defer close,用 closed channel 拿出來的 zero value 處理 N 個 潛在 Block
pipeline
1 | func pipeline(inQueue <-chan int,out chan<- int) { |
1 | func gen(nums ...int) <-chan int { |
Fan-out, fan-in
Fan-out
多個 func 可以從同個 channel 讀取資料,直到 channel 被關閉
是一種分派工作的方式
Fan-in
一個 func 可以讀取多個 input 並且處理,直到 多工 channel
被關閉
多工 channel
: N to 1 channel,指所有 channel 被關閉才會關閉 多工 channel
Stopping short
問題: blocked goroutines
解法: outbound channel buffer
我們的 pipeline 有下面這些模式:
- 當 stage 所有的傳送活動結束時,會將 輸出 channel 關閉
- stage 會不停地接收 channel 的資料,直到 channel 被關閉
這個模式允許每個接收階段能夠被寫成 range loop
並且確保所有的 goroutines
在結束前足夠把所有東西送給下個階段。
但是實際狀況,stage 不會每次都把所有資料接收完。
Sometimes this is by design: the receiver may only need a subset of values to make progress.
更多的可能是,stage 提早結束,因為在前個 stage 發出了 error
另一種可能是 接收方不應該等待剩餘的資料到來,且我們希望前個 stage 不要丟不必要的資料給下個階段。
在我們範例中,如 stage 沒有成功消費完所有資料的話,會導致 goroutine block,Resource Leak
1 | in := gen(2, 3) |
我們需要整理 upstream stages
,讓 pipeline 結束,即使 downstream stages 沒有接收完所有資料
一種方式就是讓 outbound channels 有 buffer。
這個修正了 blocked goroutine,但這是 bad code。
buffer size 取決於我們知道下個階段的狀況。
這是脆弱的設計。如果我們多丟幾資料,或者 downstream stage 讀少一點資料,我們會再度 block again。
Instead, we need to provide a way for downstream stages to indicate to the senders that they will stop accepting input.
Explicit cancellation
當主程式決定離開並且不接收所有資料,他必須告訴 upstream stages 的 goroutine 拋棄它們要送的資料。
他丟了兩次訊號,因為那邊有兩次潛在 block。
1 | func main() { |
The sending goroutines replace their send operation with a select statement that proceeds either when the send on out happens or when they receive a value from done.
The value type of done is the empty struct because the value doesn’t matter: it is the receive event that indicates the send on out should be abandoned.
The output goroutines continue looping on their inbound channel, c, so the upstream stages are not blocked.
(We’ll discuss in a moment how to allow this loop to return early.)
1 | func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { |
這個改進還是有些問題:每一個下游接收者需要知道上游的潛在 block 量,且要安排提醒上游要 early return。
持續追蹤這個值是乏味且容易出錯
我們需要一個方式告訴 goroutines 停止送資料給他們的下游。
在 Go,我們可以把 channel 關閉,因為從關閉的 channel 讀資料不會 block 且會 yielding zero value。
這代表主程式可以 unblock 所有 senders,藉由簡單低關閉 done channel
。
這個關閉是很有效率低廣播訊號給 senders。
我們把所有的 pipeline func 套用上這個模式,接收 done 參數
且透過 defer 安排一個 close channel
,因此,所有從主程式的 return paths 將會發結束訊息給 pipeline。
1 | func main() { |
1 | func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { |
1 | func sq(done <-chan struct{}, in <-chan int) <-chan int { |
Here are the guidelines for pipeline construction:
- stages 關閉他們的輸出,當所有的傳送活動結束。
- stages 持續接收資料,直到 channel 被關閉 或者 the senders are unblocked.