奧推網

選單
財經

聊聊Flink非同步IO機制的原理

非同步I/O專門用來解決Flink計算過程中與外部系統的互動問題。在預設情況下,運算元向外部系統發出請求後即阻塞,等待結果返回才能傳送下一個請求,可能會造成較大的延遲,吞吐量下降。有了非同步I/O之後,就可以併發地發出請求和接收響應,延遲大大降低。下圖來自官方文件,一看便知。

image

要享受非同步I/O帶來的便利,前提就是我們有一個能非同步請求外部系統的客戶端。如果原生沒有提供,就得自己建立有限大小的執行緒池,將客戶端放到執行緒池裡呼叫。

非同步I/O的原始設計文件見FLIP-12。但是隨著時間的推移,它裡面的內容與目前最新的Flink 1。9版本的實現有了一定出入,所以就不參考它了,直接講講筆者讀過1。9版本的相關原始碼之後總結出的東西吧。

在呼叫AsyncDataStream。orderedWait()/unorderedWait()方法時,本質上是產生了一個AsyncWaitOperator運算元,它是非同步I/O的核心。每個AsyncWaitOperator都由三個主要的部分組成。

AsyncFunction:執行非同步操作的函式,使用者需要覆寫其asyncInvoke()方法並傳入。

StreamElementQueue:包含StreamElementQueueEntry的佇列,底層由ArrayDeque實現。

Emitter:單獨的守護執行緒,將非同步呼叫完成後的結果傳送給下游運算元。

所謂StreamElementQueueEntry就是StreamElement(Flink基礎概念,可以是流中的一條資料,或是一個水印等)的簡單封裝,透過j。u。c。CompletableFuture實現非同步返回。CompletableFuture是JDK 8提供的新特性,可以認為是非常好用的Future改進版,這裡就不再展開講了。

以下是以StreamElementQueueEntry為中心展開的類圖。看官會注意到它有兩種實現:代表資料的StreamRecordQueueEntry,和代表水印的WatermarkQueueEntry。它們都持有CompletableFuture。

image

AsyncWaitOperator的機制可以用下面的簡圖來表示。

image

來自上游的StreamElement進入AsyncWaitOperator的StreamElementQueue,並被封裝成StreamElementQueueEntry。

AsyncWaitOperator呼叫傳入的AsyncFunction的asyncInvoke()方法,該方法

非同步地

與外部系統互動。

非同步操作完成後,由asyncInvoke()方法顯式地呼叫ResultFuture。complete()方法,將結果返回;或者呼叫completeExceptionally()方法表示出現了異常。ResultFuture就是CompletableFuture的代理介面。

Emitter執行緒從StreamElementQueue中拉取那些已經完成了的StreamElementQueueEntry,並輸出到下游運算元。

以上的分析說明了AsyncWaitOperator的工作流程,但是沒有考慮輸出流的順序性。實際上會有以下兩種情況:

呼叫AsyncDataStream。orderedWait():建立OrderedStreamElementQueue佇列,保持請求的順序與輸出結果的順序相同,亦即先進先出。

呼叫AsyncDataStream。unorderedWait():建立UnorderedStreamElementQueue佇列,不保持順序。在採用處理時間時,先返回的結果先輸出。而採用事件時間時,需要額外保證水印的邊界不錯亂。

image

簡單討論一下。

有序

有序是最簡單的情況,只需要將元素按照到來的順序放入OrderedStreamElementQueue。只有當佇列中的隊頭請求非同步操作返回了結果,才會觸發Emitter輸出,後面的請求先返回也只能等待。

無序(處理時間)

這種情況也不難辦。在UnorderedStreamElementQueue中維護兩個子佇列,一個是未完成請求的佇列(uncompletedQueue),一個是已完成請求的佇列(completedQueue)。所有請求都先進入uncompletedQueue並執行非同步操作,並按照操作完成的順序進到completedQueue中。Emitter從completedQueue拉取並輸出結果即可。如下圖所示。

image

無序(事件時間)

這是比較複雜的情況:我們允許兩個水印之間的元素亂序,但是水印不能亂。所以在使用兩個佇列的同時,uncompletedQueue中還必須儲存水印,這就是上面的WatermarkQueueEntry的由來。在水印之間儲存的也不再是單個StreamElementQueueEntry,而是它們的集合。只有當uncompletedQueue中的隊頭集合有元素的非同步操作返回了,才能將其移動到completedQueue裡面。這樣就可以保證在透過某個水印之前,它前面的所有非同步請求都完成。如下圖所示。

非同步I/O的檢查點做起來很容易。由上面的分析可以知道,StreamElementQueue儲存的就是尚未完成非同步請求的元素,以及已完成非同步請求但還沒有送到Emitter傳送的元素,只要遍歷該佇列,並將它們都放入狀態後端就OK。