Worker Thread

January 12, 2022

執行緒建立後,從任務來源處取得任務,每次執行完後,繼續取得下個任務,這是 Worker Thread 的基本概念,可應用在一些需要冗長計算或背景執行的請求。

馬不停蹄的 Worker

因為每次執行完後,執行緒要繼續取得下個任務,這表示執行緒的流程不會停止,基本上會是個任務迴圈:

import java.util.concurrent.BlockingQueue;

class Worker extends Thread {
    private BlockingQueue<Runnable> runnables;
    
    Worker(BlockingQueue<Runnable> runnables) {
        this.runnables = runnables;
    }
    
    public void run() {
        while(true) {
            try {
                runnables.take().run();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

就消費任務而言,這樣的 Worker 實現了〈Producer Consumer〉中 Consumer 的角色,至於協調任務的角色,這邊直接使用 Java 標準 API 的 java.util.concurrent.BlockingQueue

因為使用的是 BlockingQueue,也就是說,BlockingQueue 沒有任務時,Worker 會進入等待狀態;另一方面,這邊並沒有考慮 Worker Thread 流程被中止時,該怎麼做善後,這會在之後的文件討論。

任務的交辦?

至於怎麼交辦任務,那是另一回事了,以〈Thread-Per-Message〉中的範例來說,或許你只需要下載網頁時,有其他流程可以代勞,主流程別被阻斷就好,這時可以改寫一下其中的 ThreadService

class ThreadService {
    private static BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>();
    
    static {
        // 只有一個 Worker Thread
        new Worker(runnables).start();
    }
    
    static void submit(Runnable runnable) {
        runnables.add(runnable);
    }
}

建立 Worker 時,需要指定 BlockingQueue 的實作,以便 Worker 主動取得任務,這邊使用了 Java 標準 API 的 java.util.concurrent.LinkedBlockingQueue,沒有限制任務的長度;並且只建立了一個 Worker Thread。

必要時可以建立多個 Worker Thread,或許可進一步考量,待執行任務大於 Worker Thread 數量一定程度時,建立新的 Worker Thread,也許又會想在空閒時減少 Worker Thread,或者進一步地思考,關閉 ThreadService 時,未被執行的任務該怎麼辦等問題。

既然如此,可以考慮將 Worker 角色轉為被動,也就是被交辦任務,而不是主動取得任務,不用知道 BlockingQueue 這類角色的存在,任務的交辦可以由 ThreadService 這類角色負責,這時 ThreadService 就有執行緒池管理的概念了。

執行緒池的部份,就在後續的文件中,再來探討了…

分享到 LinkedIn 分享到 Facebook 分享到 Twitter