Thread Pool
January 12, 2022建立執行緒需要成本,若能重複使用已建立的執行緒,而不是用完就丟,對效率可能有所助益,〈Worker Thread〉示範了預先建立執行緒,不斷地從任務來源取得任務並執行,是一種重複使用執行緒的方式。
另一種 Worker
〈Worker Thread〉範例的實現方式,需要知道協調者的存在;另一種方式是考慮將 Worker
角色轉為被動,也就是被交辦任務,而不是主動取得任務,不用知道協調者的存在,然而這個時候,協調者必須知道 Worker
是否處於空閒狀態,才能知道該不該交辦任務:
class Worker extends Thread {
private Runnable runnable;
private boolean isOnDuty = true;
boolean isIdle() {
return runnable == null;
}
void accept(Runnable runnable) {
synchronized(this) {
if(isIdle()) {
this.runnable = runnable;
notify();
}
}
}
public void run() {
while(isOnDuty) {
synchronized(this) {
runnable.run();
runnable = null;
try {
wait();
}
catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
void terminate() {
isOnDuty = false;
accept(() -> {}); // nope
}
}
上面這個 Worker
建立後,必須先交辦任務,然後啟動執行緒,當手邊的 runnable
執行完畢,執行緒進入等待狀態,若要判斷 Worker
是否符於空閒狀態,是簡單地透過 runnable
是否為 null
;範例中也簡單地實現了了〈Two-phase Termination〉中談到的,如何停止執行緒。
簡單的 ThreadPool
為了要管理 Worker
,這邊實現一個簡單的 ThreadPool
,可以在沒有 Worker
可以接任務時,建立新 Worker
,也示範了清除空閒 Worker
的簡單方式:
class ThreadPool {
private List<Worker> workers = new ArrayList<>();
synchronized void submit(Runnable runnable) {
for(var worker : workers) {
if(worker.isIdle()) {
worker.accept(runnable);
return;
}
}
// 沒有空閒的 Worker,建立新的
var worker = new Worker();
worker.accept(runnable);
worker.start();
workers.add(worker);
}
synchronized void removeIdle() {
for(var worker : workers) {
if(worker.isIdle()) {
workers.remove(worker);
worker.terminate();
}
}
}
}
以〈Thread-Per-Message〉下載網頁的範例來說,可以如下改用 ThreadPool
:
var pool = new ThreadPool();
for(var uri : uris) {
pool.submit(() -> {
try {
download(
uri,
uri.replace("https://openhome.cc/zh-tw/", "")
.replace("/", "")
.concat(".html")
);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
Java 的 Executor 框架
當然,以上只是簡單的執行緒池示範,真的需要執行緒池功能的話,建議瞭解一下現成穩固的方案,例如 Java 標準 API 提供 Executor
框架,可以依需求,透過 使用 java.util.concurrent.Executors
的 newCachedThreadPool
、newFixedThreadPool
靜態方法來建構想要的執行緒池,例如:
var pool = Executors.newCachedThreadPool();
for(var uri : uris) {
pool.submit(() -> {
try {
download(
uri,
uri.replace("https://openhome.cc/zh-tw/", "")
.replace("/", "")
.concat(".html")
);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}