Effective Javaを勉強します【第10章】

項目66. 共有された可変データへのアクセスを同期する

synchronized 予約語

Java ではマルチスレッドを取り扱うことができる。しかし複数のスレッドが同じオブジェクトを 同時に操作すると、プログラムが意図しない動作をする可能性がある。この問題を解決するのが synchronized 予約語である。

synchronized 予約語を利用することで、メソッドやブロックがある時点で1つのスレッドのみで 実行されていることを保証することができる。

変数の読み書きの同期

Java の言語仕様は longdouble 型以外の変数については、アトミックであることを保証している。 すなわち、複数のスレッドにより同期なしでその変数が並行して変更されたとしても、どれかのスレッドにより その変数に保存された値を返すことが保証されている。

ここで注意しなければならないのは、あるスレッドが書き込んだ値が、他のスレッドからも見えることは 保証されていないことである。

例えば、以下のコードについて考えてみる。プログラムが約1秒動作し、その後メインスレッドが stopRequestedtrue に設定することで backgroundThread が終了するように思えるが、実際はスレッドが止まることはない。

public class StopThread {
    private static boolean stopRequested;

    public static void main(String[] args) 
            throws InterruptedException {
        Thread backgroundThread = new Thread(new Runnable() {
            public void run() {
                int i = 0;
                while (!stopRequested) {
                    i++;
                }
            }
        });
        backgroundThread.start();

        TimeUnit.SECONDS.sleep(1);
        stopRequested = true;
    }
}

スレッドが止まらない理由は、JVMの最適化により run メソッド内部の while ループが 以下のように書き換えられるためである。

if (!stopRequested) {
    while (true) {
        i++;
    }
}

上記を防ぐためには、書き込みメソッドと読み込みメソッドの両方を同期する必要がある。 上記のコードは以下のように修正する。

public class StopThread {
    private static boolean stopRequested;
    private static synchronized void requestStop() {
        stopRequested = true;
    }
    private static synchronized boolean stopRequested() {
        return stopRequested;
    } 

    public static void main(String[] args) 
            throws InterruptedException {
        Thread backgroundThread = new Thread(new Runnable() {
            public void run() {
                int i = 0;
                while (!stopRequested()) {
                    i++;
                }
            }
        });
        backgroundThread.start();

        TimeUnit.SECONDS.sleep(1);
        requestStop();
    }
}

まとめると、複数スレッドで可変データを共有する場合は、longdouble 以外であっても、 synchronized で読み書きするスレッドを同期しなければならない。

volitile 修飾子

「変数の読み書きの同期」に記載したコードでは synchronized が使われているものの、 その目的は相互排他のためではなく、スレッドが最新の値を見ることを指示するために使われている。 (stopRequested を2つのスレッドが更新するわけではない)

このような時は、synchronized ではなく volatile 修飾子を使うことも可能である。修正方法は以下。 volatile 修飾子は相互排他を行わないが、フィールドを読み込むスレッドが最後に書き込まれた値を見ることを保証している。

public class StopThread {
    private static volatile boolean stopRequested;

    public static void main(String[] args) 
            throws InterruptedException {
        Thread backgroundThread = new Thread(new Runnable() {
            public void run() {
                int i = 0;
                while (!stopRequested) {
                    i++;
                }
            }
        });
        backgroundThread.start();

        TimeUnit.SECONDS.sleep(1);
        stopRequested = true;
    }
}

項目67. 過剰な同期は避ける

同期されたメソッドやブロック内で決して制御をクライアントに譲ってはいけない。

言い換えると、オーバーライドされるように設計されているメソッドや、関数オブジェクトの形式でクライアントから受け取ったメソッドを同期された領域内で呼び出してはいけない。そのようなメソッドはどのように動作するか分からないので制御することができず、場合によっては例外やデッドロック、データ破壊が発生する可能性があるため。

オープンコール

同期された領域から異質なメソッド(オープンコール)を呼び出してはいけない。 以下の ObservableSet は上記を守っていないために、多くの問題を抱えている。

public class ObservableSet<E> extends ForwardingSet<E> {
    interface SetObserver<E> {
        void added(ObservableSet set, E element);
    }

    public ObservableSet(Set<E> set) {
        super(set);
    }

    private final List<SetObserver<E>> observers = new ArrayList<>();

    public void addObserver(SetObserver<E> observer) {
        synchronized (observers) {
            observers.add(observer);
        }
    }

    public boolean removeObserver(SetObserver<E> observer) {
        synchronized (observers) {
            return observers.remove(observer);
        }
    }

    private void notifyElementAdded(E element) {
        synchronized (observers) {
            for (SetObserver<E> observer: observers) {
                observer.added(this, element);
            }
        }
    }

    @Override
    public boolean add(E element) {
        boolean added = super.add(element);
        if (added) {
            notifyElementAdded(element);
        }
        return added;
    }
}

上記のコードは要素がセットに追加された時にクライアントが通知を受け取ることを可能にしている。 オブザーバーは addObserver メソッドを呼び出すことで通知を受けるようになり、 removeObserver メソッドを呼び出すことで通知を受けることを解除する。

この時、下記のように値を順にセットに追加し。23になったら自分を取り除くことを意図するとどうなるだろうか。

public static void main(String[] args) {
    ObservableSet<Integer> set = new ObservableSet<Integer>(new HashSet<Integer>());

    set.addObserver(new SetObserver<Integer>() {
        public void added(ObservableSet<Integer> s, Integer e) {
            System.out.println(e);
            if (e == 23) {
                s.removeObserver();
            }
        }
    });
    for (int i = 0;i < 100;i++) {
        set.add(i);
    }
}

実際には数字0から23までを表示して、その後に ConcurrentModificationException がスローされる。 これは notifyElementAdded メソッドが synchronized ブロック内で SetObserver.added (オープンコール)を 呼び出すことで生じている。

SetObserver.added メソッドは removeObserver メソッドを呼び出し、 さらにその中で observers.remove メソッドを呼び出しているため、 observers のイテレート中にその要素を削除しようとすることになり例外が発生してしまう。

この場合は例外が発生しただけだが、デッドロックが発生する場合もある。このように、同期された メソッドやブロックの中でオープンコールを呼んでは行けない。

一般的に、同期された領域内ではできる限り少ない処理を行うべきであり、必要最小限の処理の間でのみロックを獲得し、 その後は速やかに解放すべきである。

スレッドセーフな設計

クラスが並行して使用されるのであれば、可変クラスをスレッドセーフにすべきである。 また内部的に同期を行うことで、高い並行生を達成することが可能となる。 内部的に同期するための技法としては、次のようなものがある。

クラスを内部的に同期させる場合は、そのことをドキュメント化すること。

項目68. スレッドよりエグゼキューターとタスクを選ぶ

java.util.concurrent パッケージにはエグゼキューターフレームワークという マルチスレッド処理を行うための仕組みが整っている。

まずは別スレッドにしたい処理を、Runnable インタフェースを実装したクラスの run メソッドに実装する。

public class TestRunnable implements Runnable {

    public void run() {
        // 実行したい処理
        ...
    }
}

あとは以下のようにスレッドを生成、実行すればよい。

ExecutorSerivice executor = Executors.newSingleThreadExecutor();
executor.execute(new TestRunnable());

処理が終わったら以下のコードでエグゼキューターを終了する。

executorService.shutdown();

スレッドプール

キューを実行するスレッドが2つ以上必要な場合は、スレッドプールを利用することが可能。

小さなプログラムや軽い負荷のサーバー処理には Executors.newCachedThreadPool を利用すると良い。 ただし Executors.newCachedThreadPool を利用すると、タスクはキューに入れられることなく直ちにスレッドに渡されるため、 利用可能なスレッドがなければ次々と新たなスレッドが生成されてしまう。

したがって高負荷のサーバーでは、固定数のスレッドを持つ Executors.newFixedThreadPool を利用したほうがよい。

タスク

エグゼキューターはスレッドを実行する機構であり、実行する機能はタスクと呼ばれる Runnable インタフェースや Callable インタフェースに記述する。従来の Thread は処理の単位と処理を実行する機構が混在していたが、エグゼキューターとタスクによりそれらが分離され、より柔軟にスレッドを扱うことができるようになった。

項目69. wait と notify よりコンカレンシーユーティリティを選ぶ

Java 5以降では、wait と notify を使用するよりも、用意されているコンカレンシーユーティリティ(java.util.concurrent)を利用したほうが良い。

java.util.concurrent 内の高レベルのユーティリティは、エグゼキューターフレームワーク、コンカレントコレクション、シンクロナイザーの3つに分類され、本項目では後者2項目を説明する。

コンカレントコレクション

List、Queue、Map などの標準コレクションインタフェースの高パフォーマンスな並行実装を提供している。これらには独自の同期管理の仕組みが実装されているため、並行な活動を排除することは不可能である。すなわち、コンカレントコレクションをロックしても効果はなく、プログラムを遅くするだけなので注意すること。

コンカレントコレクションをアトミックに操作するために、いくつかのメソッドが用意されている。例えば、ConcurrentMapputIfAbsent メソッドなど。

private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<String, String>();

public static String intern(String s) {
    // 毎回 putIfAbsent を呼び出すより、get で判定したほうがパフォーマンスが良い
    String result = map.get(s);
    if (result == null) {
        result = map.putIfAbsent(s, s);
        if (result == null) {
            result = s;
        }
    }
    return result;
} 

コレクションインタフェースによっては、ブロックする(操作が完了するまで待つ)操作が用意されている。 例えば BlockingQueue は、キューが空なら待つ take メソッドが用意されている。

シンクロナイザー

シンクロナイザーは、スレッドが他のスレッドを待つことを可能にするオブジェクトである。 CountDownLatchSemaphoreCyclicBarrierExchanger の4種類がある。

以下は CountDownLatch を用いたアクションの並列実行処理時間を計測するプログラムの例である。 waitnotify を利用して同様の実装をするより簡単である。

// executor: アクションを実行するエグゼキューター
// concurrency: 同時実行スレッド数
// action: 計測対象のアクション
public static long time(Executor executor, int concurrency, final Runnable action)
        throws InterruptedException {
    final CountDownLatch ready = new CountDownLatch(concurrency);
    final CountDownLatch start = new CountDownLatch(1);
    final CountDownLatch done = new CountDownLatch(concurrency);
    for (int i = 0; i < concurrency; i++) {
        executor.execute(new Runnable() {
            @Override public void run() {
                // スレッドが実行可能になったら通知する
                ready.countDown();
                try {
                    // start.countDown()が1回実行されるまで待つ
                    start.await();
                    action.run();
                } catch(InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // スレッドの処理が終了したら通知する
                    done.countDown();
                }
            }
        });
    }
    // すべてのスレッドが実行可能になるまで待つ
    // すなわち concurrency 回 ready.countDown() が呼ばれるまで先には進まない
    ready.await();
    long start = System.nanoTime();
    start.countDown();
    // すべてのスレッドの処理が終了するまで待つ
    // すなわち concurrency 回 done.countDown() が呼ばれるまで先には進まない
    done.await();
    return System.nanoTime() - start;
}

waitnotify の使い方については、今後は直接使用する機会は少ないと思うので割愛。

項目70. スレッド安全性を文書化する

クラスのインスタンスstatic のメソッドが並行して使用された場合に、そのクラスはどのように振る舞うか文書化しなければならない。

文書化するときの注意点としては、以下のようなものが挙げられる。

  • synchronized 修飾子は実装の詳細であるため、Javadoc に含むべきではない。
  • スレッド安全性に関しては、以下のサポートするスレッド安全性のレベルを明確に文書化しなければならない。
    • 不変(immutable): このクラスのインスタンスは定数のように振る舞い、外部の同期は必要ない
    • 無条件スレッドセーフ(unconditionally thread-safe): このクラスのインスタンスは可変だが、外部同期は不要であり、並列実行可能な内部同期の仕組みが実装されている
    • 条件付きスレッドセーフ(conditionally thread-safe): 安全に並列実行するために、いくつかのメソッドは外部動機が必要
    • スレッドセーフではない(not thread-safe): このクラスのインスタンスは可変であり、ここのメソッド呼び出しはクライアントにより外部同期する必要がある
    • スレッド敵対(thread-hostile): このクラスは、たとえすべてのメソッドが外部同期されたとしても、並列実行すべきではない
  • 条件付きスレッドセーフのクラスを文書化する際は、外部同期が必要なメソッドと、そのために獲得すべきロックを明記する必要がある。
  • 誰でもアクセス可能なロックをクラスが使用すると、ロックを長期間保持することによるサービス拒否攻撃を受ける可能性がある。これに対して、無条件スレッドセーフのクラスの場合、以下のようなプライベートロックオブジェクトを利用することができる。継承のために設計したクラスにおいては、サブクラスとスーパークラスの間の干渉を防ぐことができるため、特に有用。
    private final Object lock = new Object();
    public void foo() {
        synchronized(lock) {
            ...
        }
    }

項目71. 遅延初期化を注意して使用する

遅延初期化は、フィールドの値が必要となるまで、そのフィールドの初期化を遅らせること。 ほとんどの状況では遅延初期化よりは普通の初期化が望ましい。

特に複数スレッドで遅延初期化を使用する場合、フィールドの初期化時にロックが必要となるためアクセスコストが増加し、 また適切に同期が行われない場合深刻なバグとなる可能性があるため、注意しなければならない。

同期されたアクセッサー

複数スレッドから遅延初期化を使用する場合、一般的には以下のような同期されたアクセッサーを使用する。

    private FieldType field;
    synchronized FieldType getField() {
        if (field == null) {
            field = computeFieldValue();
        }
        return field;
    }

遅延初期化ホルダークラスイデオム

static フィールドに対するパフォーマンスのために遅延初期化を使用する場合は、遅延初期化ホルダークラスイデオムを使用する。 これは「クラスが利用されるまでクラスが初期化されないことが保証される」という言語仕様を利用している。

    private static class FieldHolder {
        static final FieldType field = computeFieldValue();
    }
    static public FieldType getField() {
        return FieldHolder.field;
    }

上記の場合、初めて getField メソッドが呼び出されるときに、初めて FieldHolder クラスが初期化される。

二重チェックイデオム

インスタンスフィールドに対するパフォーマンスのために遅延初期化を使用する場合は、二重チェックイデオムを使用する。 このイデオムは、フィールドの初期化が行われた後にアクセスされた場合のロックのコストを回避する。

    private volatile FieldType field;
    public FieldType getField() {
        FieldType result = field;
        if (result == null) { //1回目検査
            synchronized(this) {
                result = field;
                if (result == null) { //2回目検査
                    field = result = computeFieldValue();
                }
            }
        }
        return result;
    }

上記では1回目の検査で、初期化されているかをチェックしている。 すでに初期化されている場合そのまま result を返すため、ロックによるコストを回避することができる。

初期化されていない場合、synchronized でロックをして2回目の検査を実施した上で、初期化を実施する。 (このため二重チェックイデオムと呼ばれる)

変数 fieldsynchronized 修飾子の内外からアクセスされるため、volatile 宣言する必要があることに注意すること。

項目72. スレッドスケジューラに依存しない

複数スレッドが実行可能な場合、スレッドスケジューラにより各スレッドの実行時間が決定されるが、この決定方法の詳細はオペレーティングシステムにより異なる可能性がある。 複数スレッドを扱うプログラムを書く際は、パフォーマンスや振る舞いが、あるオペレーティングシステムのスレッドスケジューラに依存しないよう注意する必要がある。

スレッドスケジューラに依存しないようにするためには、実行可能なスレッドの平均数がプロセッサの数を大きく超えないことを保証すればよい。 そのためには、有益な処理をしていないスレッドは動作しないようにする必要がある。

エグゼキュータフレームワークであれば、スレッドプールを適切な大きさにして、タスクを適度に小さくし、各スレッドが独立するよう設計すること。

項目73. スレッドグループを避ける

スレッドグループは、もともとセキュリティ向上のためにアプレットを隔離する仕組みとして考案されたが、今日では利用する機会はない。 もしスレッドの論理的なグループを扱うクラスを設計するのであれば、スレッドプールエグゼキューターの利用などを検討すること。