Reactive
January 26, 2022現今對許多開發者而言,尤其是對前端開發者而言,聽到 Reactive Programming 應該不會覺得陌生,不少技術生態圈中都有著支援 Reactive 概念的框架,就 ReactiveX 而言,就有著 JavaScript 的 RxJS,Java 的 RxJava、Go 的 RxGo 等,清單可參考〈ReactiveX〉。
Reactive Programming
對於 Reactive Programming 一般常看到的定義是,可以自動傳播資料流變化的程式設計典範。
在一般設計典範中,如果寫下了 c = b + 5,在該程式碼運算過後,變數 c 的值就固定了,若有其他流程導致變數 b 改變,c 的值並不會自動變化,然而在 Reactive Programming 的概念中,c 的值必須對 b 的值作出反應,這對多數開發者而言,似乎是有點陌生的功能概念。
實際上若用過試算表軟體,應該知道這類軟體的功能之一:可以在欄位 C1 輸入 =B1+5,如此就會將欄位 B1 的值加 5 後作為 C1 的值,如果使用者變動了 B1 的值,那麼變化會傳播,C1 的值也會自動反應變化;另外像是聯級(Cascade)表單、搜尋框自動提示等功能,某些程度都可以算是 Reactive 概念的實現。
想要實作出試算表欄位、聯級表單、搜尋框自動提示之類的功能,方式之一基於發佈訂閱模式(Publish-subscribe),可以訂閱某事件,並在事件發生時獲得通知以執行對應的變化操作,當任務不複雜時,這個方式也沒什麼問題。
然而,如果事情變得複雜的話,例如,本來實作的流程是「欄位失焦時發出請求,請求完成後更新表單,表單欄位更新後顯示提示訊息」,現在若想「欄位失焦時發出請求,而請求完成後」另外發出一個請求呢?你得將新請求按插在相對的處理器之中,若另外想在「欄位失焦時發出請求,請求完成後更新表單,表單欄位更新後顯示提示訊息」後又做其他的任務呢?你得又找出對應的處理器,將原始碼安插進去。
也就是說你想要的是,能夠基於某個「事件流程」來做些組合與銜接,可想而知的,在事件間關係複雜之時,這種做法就會令程式碼流程變得錯綜複雜。
Functional Reactive Programming
既然談到了「事件流程」,來想想,事件發生時不是會有事件相關的資料嗎?像是鍵盤事件會有按上哪個鍵的訊息,這些資料是在「事件流程」中流動,談到資料流動,就會讓人想到 Java 的 Stream
API:
source.filter(n -> n > 10)
.map(n -> n * 5)
.forEach(out::println);
如果在「事件流程」中流動的事件資料,也能像上頭這樣處理就好了,例如底下的概念程式碼:
fromEvent("blur", field).flatMap(evt -> fromEvent('load', request(evt.text)))
.flatMap(evt -> fromEvent('change', changeName(evt.responseText)))
.subscribe(message::pop);
與 Stream
API 不同的是,Stream
的資料來源是被動地提取,而事件這類資料是主動地發送,這樣就仍然可以處理先前談到的事件流程,若也能進一步銜接與組合:
var keywordLoaded = fromEvent("blur", field).flatMap(evt -> fromEvent('load', request(evt.text)));
var fieldChanged = keywordLoaded.flatMap(evt -> fromEvent('change', changeName(evt.responseText)));
那麼對 keywordLoaded
這個事件流有興趣的開發者,可以直接再銜接想進行的流程,對 fieldChanged
有興趣的開發者,也可以組合出自己的事件流,這些事件流也可以進一步再被組合,那就可以避免程式碼流程變得錯綜複雜了!
Reactive 在於辨識出可銜接與組合的資料流。
實際上要實作出程式庫來支援資料流的銜接、組合,必須得付出一番功夫,而且還得保持關切點清楚明白,也就是資料流清晰,銜接與組合也容易,這就是為什麼採用 Functional Programming 風格的原因,因為資料流的處理細節,被隱藏在各個高階操作之下,採用 Functional Programming 的 Reactive 設計,就被稱為 Reactive Functional Programming 了。
Reactive Streams
在 Reactive Functional Programming 的概念逐漸成形之際,由於各技術生態圈都有這方面的需求與實現,為了避免各搞各的,造成日後相容性的問題,因而催生了 Reactive Streams 規範,在 ReactiveX 中有著各個技術生態圈的實現。
就 Java 這部份具體來說,規格在 org.reactivestreams
套件下定義了 Publisher
、Subscriber
、 Subscription
與 Processor
四個介面(詳細方法簽署等可參考 JavaDoc)。
Publisher
實例會發佈資料串流,接受 Subscriber
的訂閱,並建立一個 Subscription
實例代表該次訂閱,在訂閱成功事件發生時,會呼叫 Subscriber
的 onSubscribe
並傳入 Subscription
實例。
Subscription
是 Publisher
、Subscriber
之間溝通的橋樑,可以進行流量控制,這是為了避免訂閱者來不及消化資料流來源產 生的資料,而引發事件的持續堆積而造成記憶體的滿溢,Subscriber
可以透過傳入的 Subscription
,使用 request(n)
向 Publisher
請求 n
筆資料,或者是透過 cancel
要求 Publisher
停止傳送資料並清除資源。
資料流可能被轉換,Processor
同時扮演著 Publisher
與 Subscriber
(Processor
繼承了這兩個介面),在最前端的 Subscriber
與最末端的 Subscriber
之間,可以串接多個 Processor
,每個 Processor
代表著整個資料流串的一個階段。
在 Java 的領域,早期就有 RxJava 可以用來支援 Reactive Functional Programming,後來為了符合 Reactive Streams 規範,而重寫為 RxJava 2,目前的 RxJava 庫,就是 RxJava 2 的實現。
Spring 的 Reactor,實現了 Reactive Streams 規範,主要是用來支援 Reactive 堆疊,像是 Web Flux,有興趣可以參考〈Spring〉中的 Spring Reactive。
在 Java 標準平台這塊是在 Java 9 引入,具體來說,是在 java.util.concurrent.Flow
類別中定義了四個介面,它們遵守 Reactive Streams 的規範,因此各介面下實際的方法簽署與 org.reactivestreams
套件下的定義是一樣的。
HttpClient API
Java 11 正式推出的 HttpClient API,是 Flow API 的實作品之一,例如,一個簡單的 POST 請求回應:
var request = HttpRequest.newBuilder(uri).POST(ofString("k=v")).build();
var response = client.send(request, ofString());
var body = response.body();
就算沒正式看過文件,應該也不難理解這個程式片段的作用,其中 client
是 HttpClient.newHttpClient
方法建立的 HttpClient
實例,而 send
是同步方法,傳回 HttpResponse
實例,透過 body
方法可以取得字串回應,對於非同步操作,HttpClient
可以透過 sendAsync
方法,取得 CompletableFuture
實例,後續透過 thenXXX
等方法,可以進一步處理本體。
HttpClient
實例,實質上就等效於請求本體的訂閱者、回應本體的發布者。
至於請求本體真正的發布者,是 HttpRequest.BodyPublishers
的 ofXXX
方法傳回的 BodyPublisher
實例,而 BodyPublisher
是 Flow.Publisher
的子介面,例如上面程式片段 POST 方法中看到的 ofString
,指定了字串作為本體來源,將資料發布給 HttpClient
,BodyPublishers
也有 ofFile
、ofInputStream
等方法,可在檔案、串流作為本體來源時使用。
上例中 BodyHandlers
的 ofXXX
會傳回 BodyHandler
實例,BodyHandler
是個函式介面(functional interface),其 apply
方法會傳回 BodySubscriber
實例,而 BodySubscriber
是 Flow.Subscriber
的子介面,BodySubscriber
顧名思義是作為回應本體的訂閱者,上例中send方法中的 ofString
,最終是向 HttpClient
訂閱回應,然後轉換為字串,這也正是 BodyHandler
會在呼叫 send
(sendAsync
)時指定的原因。
也就是說,從請求到回應的過程,BodyPublisher
會作為請求本體的資料來源,它會發布資料,而 HttpClient
實質上作為資料的訂閱者,轉換為 HTTP 協定內容後進行請求;當 HttpClient
收到回應,會作為回應本體的資料來源,它會發布資料,而你指定的 BodyHandlers.ofXXX
方法,最後會取得 BodySubscriber
實例作為訂閱者,將收到的回應資料轉為想要的格式,然後,透過 HttpResponse
的 body
方法取得(同步)或用 thenXXX
處理(非同步)。
在 CompletableFuture
不敷使用時,若能瞭解這個 API 架構,要在各種請求、回應之間進行流式銜接的方式,就清晰了。例如,想將某個來源作為請求本體發送,就是實作 BodyPublisher
後、指定實例給 HttpRequest
,BodyPublishers
本身也有 fromPublisher
,可便於銜接 Flow.Publisher
的實作。
若是想將網站的回應做進一步處理,那麼,就實作 BodyHandler
。其中的 BodySubscriber
實作會訂閱回應,至於該怎麼轉換回應?就是在 BodySubscriber
中實作,BodyPublishers
本身也有 fromSubscriber
,可以便於銜接 Flow.Subscriber
的實作。
就現今而言,Reactive 已是開發選項之一,與其說這是種模式,其實更結合了 Functional programming 的風格或典範,這也傳達了一件事,現今在某些場合,需要比現今開發者熟悉的命令式、物件導向等典範更高階的抽象,而這會是開發者必修的課題之一。