之前在 StampedLock Idioms 中介紹了 JDK8 新的 StampedLock,剛好我書中也有幾個類似的例子,於是想說也整理在一起來個東施效顰好了。
首先來看個簡易的 ArrayList 實作好了:
package cc.openhome;
import java.util.Arrays;
public class ArrayList<E> {
    private Object[] elems;
    private int next;
    public ArrayList(int capacity) {
        elems = new Object[capacity];
    }
    public ArrayList() {
        this(16);
    }
    public void add(E e) {
        if(next == elems.length) {
            elems = Arrays.copyOf(elems, elems.length * 2);
        }
        elems[next++] = e;
    }
    public E get(int index) {
        return (E) elems[index];
    }
    public int size() {
        return next;
    }
}
如果將這個 ArrayList 用在只有主執行緒的環境中時,它沒有什麼問題。如果有兩個以上的執行緒同時使用它會如何?例如:
package cc.openhome;
public class ArrayListDemo {
    public static void main(String[] args) {
        ArrayList list = new ArrayList();
        new Thread(() -> {
            while (true) {
                list.add(1);
            }
        }).start();
        new Thread(() -> {
            while (true) {
                list.add(2);
            }
        }).start();
    }
}
在這個範例中建立了兩個執行緒,分別於 while 迴圈中對同一個 ArrayList 實例進行 add,如果你嘗試執行程式,「有可能」會發生 ArrayIndexOutOfBoundsException 例外:
Exception in thread "Thread-1" java.lang.ArrayIndexOutOfBoundsException: 64
    at cc.openhome.ArrayList.add(ArrayList.java:21)
    at cc.openhome.ArrayListDemo.lambda$main$1(ArrayListDemo.java:15)
    at cc.openhome.ArrayListDemo$$Lambda$2/1072591677.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:744)
synchronized
學過多執行緒的都知道,這是機率問題,有可能發生,也有可能沒發生,就先因陣列長度過長,JVM 分配到的記憶體不夠,而發生 java.lang.OutOfMemoryError。這是多個執行緒存取同一物件相同資源時所引發的競速情況(Race condition),也知道解決的方法之一可以在 add 等方法上加上 synchronized 關鍵字。例如:
package cc.openhome;
import java.util.Arrays;
public class SynchronizedArrayList<E> {
    private Object[] elems;
    private int next;
    public SynchronizedArrayList(int capacity) {
        elems = new Object[capacity];
    }
    public SynchronizedArrayList() {
        this(16);
    }
    public synchronized void add(E e) {
        if(next == elems.length) {
            elems = Arrays.copyOf(elems, elems.length * 2);
        }
        elems[next++] = e;
    }
    public synchronized E get(int index) {
        return (E) elems[index];
    }
    public synchronized int size() {
        return next;
    }
}
這是學習 Java 多執行緒時一定會接觸到的基本概念,如果在方法上標示 synchronized,則執行方法必須取得該實例的鎖定,這是避免競速問題的作法之一。不過直接使用 synchronized 有許多限制,未取得鎖定的執行緒會直接被阻斷,如果你希望的功能是執行緒可嘗試取得鎖定,無法取得鎖定時就先作其他事,直接使用 synchronized 必須透過一些設計才可完成這個需求。
ReentrantLock
java.util.concurrent.locks 套件中提供了 ReentrantLock,,可以達到synchronized 的作用,也提供額外的功能,如果單純用來達到 synchronized 的作用,可以如下改寫方才的範例:
package cc.openhome;
import java.util.Arrays;
import java.util.concurrent.locks.*;
public class ReentrantLockArrayList<E> {
    private Lock lock = new ReentrantLock();
    private Object[] elems;
    private int next;
    public ReentrantLockArrayList(int capacity) {
        elems = new Object[capacity];
    }
    public ReentrantLockArrayList() {
        this(16);
    }
    public void add(E elem) {
        lock.lock();
        try {
            if (next == elems.length) {
                elems = Arrays.copyOf(elems, elems.length * 2);
            }
            elems[next++] = elem;
        } finally {
            lock.unlock();
        }
    }
    public E get(int index) {
        lock.lock();
        try {
            return (E) elems[index];
        } finally {
            lock.unlock();
        }
    }
    public int size() {
        lock.lock();
        try {
            return next;
        } finally {
            lock.unlock();
        }
    }
}
如果有兩個執行緒都想呼叫 get 與 size 方法,由於鎖定的關係,其中一個執行緒只能等待另一執行緒解除鎖定,無法兩個執行緒同時呼叫 get 與 size,然而這兩個方法都只是讀取物件狀態,並沒有變更物件狀態,如果只是讀取操作,可允許執行緒同時並行的話,那對讀取效率將會有所改善,你可以使用兩個 Lock 物件,透過設計來達到這項需求,不過 JDK 本身提供有 ReentrantReadWriteLock 可以使用。
ReentrantReadWriteLock
ReentrantReadWriteLock 的 readLock 方法會傳回 ReentrantReadWriteLock.ReadLock 實例,writeLock 方法會傳回 ReentrantReadWriteLock.WriteLock 實例。呼叫 ReadLock 的 lock 方法時,若沒有任何 WriteLock 呼叫過 lock 方法,也就是沒有任何寫入鎖定時,就可以取得讀取鎖定。呼叫 WriteLock 的 lock 方法時,若沒有任何 ReadLock 或 WriteLock 呼叫過 lock 方法,也就是沒有任何讀取或寫入鎖定時,才可以取得寫入鎖定。
例如可使用 ReadWriteLock 改寫先前的 ArrayList,改進讀取效率:
package cc.openhome;
import java.util.Arrays;
import java.util.concurrent.locks.*;
public class ReentrantReadWriteLockArrayList<E> {
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    private Object[] elems;
    private int next;
    public ReentrantReadWriteLockArrayList(int capacity) {
        elems = new Object[capacity];
    }
    public ReentrantReadWriteLockArrayList() {
        this(16);
    }
    public void add(E elem) {
        lock.writeLock().lock();
        try {
            if (next == elems.length) {
                elems = Arrays.copyOf(elems, elems.length * 2);
            }
            elems[next++] = elem;
        } finally {
            lock.writeLock().unlock();
        }
    }
    public E get(int index) {
        lock.readLock().lock();
        try {
            return (E) elems[index];
        } finally {
            lock.readLock().unlock();
        }
    }
    public int size() {
        lock.readLock().lock();
        try {
            return next;
        } finally {
            lock.readLock().unlock();
        }
    }
}
如此設計之後,若執行緒都多是在呼叫 get 或 size 方法,就比較不會因等待鎖定而進入阻斷狀態,可以增加讀取效率。
StampedLock
ReentrantReadWriteLock 在沒有任何讀取或寫入鎖定時,才可以取得寫入鎖定,這可用於實現悲觀讀取(Pessimistic Reading),如果執行緒進行讀取時,經常可能有另一執行緒有寫入需求,為了維持資料一致,ReentrantReadWriteLock 的讀取鎖定就可派上用場。
然而,如果讀取執行緒很多,寫入執行緒甚少的情況下,使用 ReentrantReadWriteLock 可能會使得寫入執行緒遭受飢餓(Starvation)問題,也就是寫入執行緒可能遲遲無法競爭到鎖定,而一直處於等待狀態。
JDK8 新增了 StampedLock 類別,可支援樂觀讀取(Optimistic Reading)實作,也就是若讀取執行緒很多,寫入執行緒甚少的情況下,你可以樂觀地認為,寫入與讀取同時發生的機會甚少,因此不悲觀地使用完全的讀取鎖定,程式可以查看資料讀取之後,是否遭到寫入執行緒的變更,再採取後續的措施(重新讀取變更後的資料,或者是拋出例外)。
假設之前的 ArrayList 範例會用於讀取執行緒多而寫入執行緒少的情況,而你想要實作樂觀讀取,如何使用 StampedLock 類別來實現:
package cc.openhome;
import java.util.Arrays;
import java.util.concurrent.locks.*;
public class StampedLockArrayList<E> {
    private StampedLock lock = new StampedLock();
    private Object[] elems;
    private int next;
    public StampedLockArrayList(int capacity) {
        elems = new Object[capacity];
    }
    public StampedLockArrayList() {
        this(16);
    }
    public void add(E elem) {
        long stamp = lock.writeLock();
        try {
            if (next == elems.length) {
                elems = Arrays.copyOf(elems, elems.length * 2);
            }
            elems[next++] = elem;
        } finally {
            lock.unlockWrite(stamp);
        }
    }
    public E get(int index) {
        long stamp = lock.tryOptimisticRead();
        Object elem = elems[index];
        if (!lock.validate(stamp)) {
            stamp = lock.readLock();
            try {
                elem = elems[index];
            } finally {
                lock.unlockRead(stamp);
            }
        }
        return (E) elem;
    }
    public int size() {
        long stamp = lock.tryOptimisticRead();
        int size = next;
        if (!lock.validate(stamp)) {
            stamp = lock.readLock();
            try {
                size = next;
            } finally {
                lock.unlockRead(stamp);
            }
        }
        return size;
    }
}
範例中使用了 StampedLock,可以使用 writeLock 方法取得寫入鎖定,這會傳回一個 long 整數代表鎖定戳記(Stamp),可用於使用解除鎖定或透過 tryConvertToXXX 方法轉換為其他鎖定。
在範例 get 中示範了一種樂觀讀取的實作方式,tryOptimisticRead 不會真正執行讀取鎖定,而是傳回鎖定戳記,如果有其他排他性鎖定的話,戳記會是 0,在這之後將資料暫讀出至區域變數,validate 方法用來驗證戳記是不是被其他排他性鎖定取得了,如果是的話就傳回 false,如果戳記是 0 也會傳回 false。如果 if 驗證出戳記被其他排他性鎖定取得,重新使用 readLock 做真正的讀取鎖定,並在鎖定時更新區域變數,而後解除讀取鎖定,如 if 驗證條件不成立,只要直接傳回區域變數的值。範例中的 size 方法也是類似的實作方式。
在 validate 之後發生寫入而傳回結果不一致是有可能的,如果你在意這樣的不一致,應當採用完全的鎖定。

