Producer Consumer
January 10, 2022若有一些執行緒負責生產資源,另一些執行緒會消費資源,那就會有生產/消費間的協調問題,例如,最常見的就是協調生產速度與消費速度的問題。
協調生產/消費
如果你讓生產者/消費者直接處理等待、通知的話,會產生複雜的同步問題,沒有搞好的話,還可能產生死結;不如將等待、通知的職責,交由一個居中的協調者。
例如,生產者可將產品交給店員,消費者可從店員處取走產品,假設產品是整數好了:
class Producer implements Runnable {
private Clerk clerk;
Producer(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for(var product = 1; product <= 10; product++) {
try {
clerk.setProduct(product);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
class Consumer implements Runnable {
private Clerk clerk;
Consumer(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for(var i = 1; i <= 10; i++) {
try {
clerk.getProduct();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
店員可維持一定數量的產品,若生產者速度較快,店員處的產品量已滿,店員叫生產者等一下,若有空位放產品再通知生產者繼續生產;如果消費者速度較快,店員手上沒有產品,店員告訴消費者等一下,有產品了再通知消費者前來消費。
class Clerk {
private final int EMPTY = 0;
private int product = EMPTY;
synchronized void setProduct(int product)
throws InterruptedException {
waitIfFull();
this.product = product;
notify();
}
private synchronized void waitIfFull() throws InterruptedException {
while(this.product != EMPTY) {
wait();
}
}
synchronized int getProduct() throws InterruptedException {
waitIfEmpty();
var p = this.product;
this.product = EMPTY;
notify();
return p;
}
private synchronized void waitIfEmpty() throws InterruptedException {
while(this.product == EMPTY) {
wait();
}
}
}
可以使用以下的程式來使用 Producer
、Consumer
與 Clerk
:
var clerk = new Clerk();
new Thread(new Producer(clerk)).start();
new Thread(new Consumer(clerk)).start();
BlockingQueue
程式語言環境若有多執行緒能力,標準 API 通常就會提供具有 Producer Consumer 概念的實作,例如,Java 的 java.util.concurrent.BlockingQueue
,可以用它來改寫一下以上的例子:
import java.util.concurrent.BlockingQueue;
class Producer implements Runnable {
private BlockingQueue<Integer> productQueue;
Producer(BlockingQueue<Integer> productQueue) {
this.productQueue = productQueue;
}
public void run() {
for(var product = 1; product <= 10; product++) {
try {
productQueue.put(product);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> productQueue;
Consumer(BlockingQueue<Integer> productQueue) {
this.productQueue = productQueue;
}
public void run() {
for(var i = 1; i <= 10; i++) {
try {
var product = productQueue.take();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
BlockingQueue
的實作之一是 ArrayBlockingQueue
類別,可以指定容量,例如:
var queue = new ArrayBlockingQueue<Integer>(1); // 容量為1
new Thread(new Producer3(queue)).start();
new Thread(new Consumer3(queue)).start();
Producer Consumer 是蠻常見的模式,有些伺服器或甚至語言本身,提供 channel 之類的概念,可能就是這個模式的實現。