在〈初試 Reactor〉中的範例,有訂閱者進行了 subscribe 的話,訂閱處理會是在同一個執行緒進行,這意謂著,若獲取資料來源是個阻斷操作的話,訂閱處理也會被阻斷,這也就是為何在〈初試 Reactor〉中首個範例會是這樣的結果:
JAVAPYTHONJAVASCRIPT
JAVA
PYTHON
JAVASCRIPT
javapythonjavascript
也就是按照各個 subscribe 的順序執行了各個訂閱者的處理,在前一個訂閱者處理完資料序列之前,下一個 subscribe 就不會執行。
如果希望訂閱處理可以在不同的執行緒進行,可以使用 subscribeOn,並指定 Scheduler 實例。例如:
package cc.openhome;
import java.util.Arrays;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import static java.lang.System.out;
public class FsttReactor {
public static void main(String[] args) throws InterruptedException {
List<String> skillSource = Arrays.asList("java", "python", "javascript");
Flux<String> skills = Flux.fromIterable(skillSource)
.subscribeOn(Schedulers.parallel());
Flux<String> upperSkills = skills.map(String::toUpperCase);
Flux<String> chars = upperSkills.flatMap(skill -> Flux.fromArray(skill.split("")));
chars.subscribe(out::print);
out.println();
upperSkills.subscribe(out::println);
skills.subscribe(out::print);
Thread.sleep(5000);
}
}
由於訂閱處理在不同的執行緒中進行,在主執行緒結束後,Scheduler 安排的執行緒也會自動結束,為了能觀察,上頭使用了 Thread.sleep(5000),你會看到執行結果並不會是依訂閱的順序進行。
Reactor 提供了幾個預設的 Scheduler 實例,可以透過 Schedulers 的 static 方法取得。Schedulers.parallel() 傳回的 Scheduler 實例,會按照 CPU 核心數決定執行緒數量,因而適合計算密集式處理;single 為可重用的單一執行緒;elastic 會重用已建立的執行緒,必要時建立新執行緒,執行緒閒置過久也會自動回收。你可以查詢 API 文件來瞭解細節。
對於一些會阻斷的場合,可以使用 subscribeOn 將之變為非同步處理,例如若 findUserById 若是基於 JDBC 查詢的阻斷操作,可如下建立一個可非同步訂閱的發佈者:
Mono<User> user = Mono.fromCallable(() -> findUserById("X1234"))
.subscribeOn(Schedulers.elastic());
在 Reactor 中,有些發佈者預設就會使用某種 Scheduler,例如 Flux.interval 預設使用 parallel,可以以固定週期發佈:
Flux<Long> seconds = Flux.interval(Duration.ofSeconds(1)).log();
seconds.subscribe(out::println);
seconds.subscribe(out::println);
Thread.sleep(10000);
log 方法會自動記錄發佈、訂閱時的一些細節,有助於觀察發佈者與訂閱者的關係,像是底下的訊息中可看到使用了 parallel 的 Scheduler:
[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxInterval.IntervalRunnable)
[ INFO] (main) request(unbounded)
[ INFO] (main) onSubscribe(FluxInterval.IntervalRunnable)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
0
[ INFO] (parallel-2) onNext(0)
0
[ INFO] (parallel-1) onNext(1)
1
[ INFO] (parallel-2) onNext(1)
1
...略
你也可以試著在先前或〈初試 Reactor〉中的範例上,加上 log 來觀察有什麼不同。

