Reactive

January 26, 2022

現今對許多開發者而言,尤其是對前端開發者而言,聽到 Reactive Programming 應該不會覺得陌生,不少技術生態圈中都有著支援 Reactive 概念的框架,就 ReactiveX 而言,就有著 JavaScript 的 RxJS,Java 的 RxJava、Go 的 RxGo 等,清單可參考〈ReactiveX〉。

Reactive Programming

對於 Reactive Programming 一般常看到的定義是,可以自動傳播資料流變化的程式設計典範。

在一般設計典範中,如果寫下了 c = b + 5,在該程式碼運算過後,變數 c 的值就固定了,若有其他流程導致變數 b 改變,c 的值並不會自動變化,然而在 Reactive Programming 的概念中,c 的值必須對 b 的值作出反應,這對多數開發者而言,似乎是有點陌生的功能概念。

實際上若用過試算表軟體,應該知道這類軟體的功能之一:可以在欄位 C1 輸入 =B1+5,如此就會將欄位 B1 的值加 5 後作為 C1 的值,如果使用者變動了 B1 的值,那麼變化會傳播,C1 的值也會自動反應變化;另外像是聯級(Cascade)表單、搜尋框自動提示等功能,某些程度都可以算是 Reactive 概念的實現。

想要實作出試算表欄位、聯級表單、搜尋框自動提示之類的功能,方式之一基於發佈訂閱模式(Publish-subscribe),可以訂閱某事件,並在事件發生時獲得通知以執行對應的變化操作,當任務不複雜時,這個方式也沒什麼問題。

然而,如果事情變得複雜的話,例如,本來實作的流程是「欄位失焦時發出請求,請求完成後更新表單,表單欄位更新後顯示提示訊息」,現在若想「欄位失焦時發出請求,而請求完成後」另外發出一個請求呢?你得將新請求按插在相對的處理器之中,若另外想在「欄位失焦時發出請求,請求完成後更新表單,表單欄位更新後顯示提示訊息」後又做其他的任務呢?你得又找出對應的處理器,將原始碼安插進去。

也就是說你想要的是,能夠基於某個「事件流程」來做些組合與銜接,可想而知的,在事件間關係複雜之時,這種做法就會令程式碼流程變得錯綜複雜。

Functional Reactive Programming

既然談到了「事件流程」,來想想,事件發生時不是會有事件相關的資料嗎?像是鍵盤事件會有按上哪個鍵的訊息,這些資料是在「事件流程」中流動,談到資料流動,就會讓人想到 Java 的 Stream API:

source.filter(n -> n > 10)
      .map(n -> n * 5)
      .forEach(out::println);

如果在「事件流程」中流動的事件資料,也能像上頭這樣處理就好了,例如底下的概念程式碼:

fromEvent("blur", field).flatMap(evt -> fromEvent('load', request(evt.text)))
                        .flatMap(evt -> fromEvent('change', changeName(evt.responseText)))
                        .subscribe(message::pop);

Stream API 不同的是,Stream 的資料來源是被動地提取,而事件這類資料是主動地發送,這樣就仍然可以處理先前談到的事件流程,若也能進一步銜接與組合:

var keywordLoaded = fromEvent("blur", field).flatMap(evt -> fromEvent('load', request(evt.text)));
var fieldChanged = keywordLoaded.flatMap(evt -> fromEvent('change', changeName(evt.responseText)));

那麼對 keywordLoaded 這個事件流有興趣的開發者,可以直接再銜接想進行的流程,對 fieldChanged 有興趣的開發者,也可以組合出自己的事件流,這些事件流也可以進一步再被組合,那就可以避免程式碼流程變得錯綜複雜了!

Reactive 在於辨識出可銜接與組合的資料流。

實際上要實作出程式庫來支援資料流的銜接、組合,必須得付出一番功夫,而且還得保持關切點清楚明白,也就是資料流清晰,銜接與組合也容易,這就是為什麼採用 Functional Programming 風格的原因,因為資料流的處理細節,被隱藏在各個高階操作之下,採用 Functional Programming 的 Reactive 設計,就被稱為 Reactive Functional Programming 了。

Reactive Streams

在 Reactive Functional Programming 的概念逐漸成形之際,由於各技術生態圈都有這方面的需求與實現,為了避免各搞各的,造成日後相容性的問題,因而催生了 Reactive Streams 規範,在 ReactiveX 中有著各個技術生態圈的實現。

就 Java 這部份具體來說,規格在 org.reactivestreams 套件下定義了 PublisherSubscriberSubscriptionProcessor 四個介面(詳細方法簽署等可參考 JavaDoc)。

Publisher 實例會發佈資料串流,接受 Subscriber 的訂閱,並建立一個 Subscription 實例代表該次訂閱,在訂閱成功事件發生時,會呼叫 SubscriberonSubscribe 並傳入 Subscription 實例。

SubscriptionPublisherSubscriber 之間溝通的橋樑,可以進行流量控制,這是為了避免訂閱者來不及消化資料流來源產 生的資料,而引發事件的持續堆積而造成記憶體的滿溢,Subscriber 可以透過傳入的 Subscription,使用 request(n)Publisher 請求 n 筆資料,或者是透過 cancel 要求 Publisher 停止傳送資料並清除資源。

資料流可能被轉換,Processor 同時扮演著 PublisherSubscriberProcessor 繼承了這兩個介面),在最前端的 Subscriber 與最末端的 Subscriber 之間,可以串接多個 Processor,每個 Processor 代表著整個資料流串的一個階段。

在 Java 的領域,早期就有 RxJava 可以用來支援 Reactive Functional Programming,後來為了符合 Reactive Streams 規範,而重寫為 RxJava 2,目前的 RxJava 庫,就是 RxJava 2 的實現。

Spring 的 Reactor,實現了 Reactive Streams 規範,主要是用來支援 Reactive 堆疊,像是 Web Flux,有興趣可以參考〈Spring〉中的 Spring Reactive。

在 Java 標準平台這塊是在 Java 9 引入,具體來說,是在 java.util.concurrent.Flow 類別中定義了四個介面,它們遵守 Reactive Streams 的規範,因此各介面下實際的方法簽署與 org.reactivestreams 套件下的定義是一樣的。

HttpClient API

Java 11 正式推出的 HttpClient API,是 Flow API 的實作品之一,例如,一個簡單的 POST 請求回應:

var request = HttpRequest.newBuilder(uri).POST(ofString("k=v")).build();
var response = client.send(request, ofString());
var body = response.body();

就算沒正式看過文件,應該也不難理解這個程式片段的作用,其中 clientHttpClient.newHttpClient 方法建立的 HttpClient 實例,而 send 是同步方法,傳回 HttpResponse實例,透過 body 方法可以取得字串回應,對於非同步操作,HttpClient 可以透過 sendAsync 方法,取得 CompletableFuture 實例,後續透過 thenXXX 等方法,可以進一步處理本體。

HttpClient 實例,實質上就等效於請求本體的訂閱者、回應本體的發布者。

至於請求本體真正的發布者,是 HttpRequest.BodyPublishersofXXX 方法傳回的 BodyPublisher 實例,而 BodyPublisherFlow.Publisher 的子介面,例如上面程式片段 POST 方法中看到的 ofString,指定了字串作為本體來源,將資料發布給 HttpClientBodyPublishers 也有 ofFileofInputStream 等方法,可在檔案、串流作為本體來源時使用。

上例中 BodyHandlersofXXX 會傳回 BodyHandler 實例,BodyHandler 是個函式介面(functional interface),其 apply 方法會傳回 BodySubscriber 實例,而 BodySubscriberFlow.Subscriber 的子介面,BodySubscriber 顧名思義是作為回應本體的訂閱者,上例中send方法中的 ofString,最終是向 HttpClient 訂閱回應,然後轉換為字串,這也正是 BodyHandler會在呼叫 sendsendAsync)時指定的原因。

也就是說,從請求到回應的過程,BodyPublisher 會作為請求本體的資料來源,它會發布資料,而 HttpClient 實質上作為資料的訂閱者,轉換為 HTTP 協定內容後進行請求;當 HttpClient 收到回應,會作為回應本體的資料來源,它會發布資料,而你指定的 BodyHandlers.ofXXX 方法,最後會取得 BodySubscriber 實例作為訂閱者,將收到的回應資料轉為想要的格式,然後,透過 HttpResponsebody 方法取得(同步)或用 thenXXX 處理(非同步)。

CompletableFuture 不敷使用時,若能瞭解這個 API 架構,要在各種請求、回應之間進行流式銜接的方式,就清晰了。例如,想將某個來源作為請求本體發送,就是實作 BodyPublisher 後、指定實例給 HttpRequestBodyPublishers 本身也有 fromPublisher,可便於銜接 Flow.Publisher 的實作。

若是想將網站的回應做進一步處理,那麼,就實作 BodyHandler。其中的 BodySubscriber 實作會訂閱回應,至於該怎麼轉換回應?就是在 BodySubscriber 中實作,BodyPublishers 本身也有 fromSubscriber,可以便於銜接 Flow.Subscriber 的實作。

就現今而言,Reactive 已是開發選項之一,與其說這是種模式,其實更結合了 Functional programming 的風格或典範,這也傳達了一件事,現今在某些場合,需要比現今開發者熟悉的命令式、物件導向等典範更高階的抽象,而這會是開發者必修的課題之一。

分享到 LinkedIn 分享到 Facebook 分享到 Twitter