Tiger (Java2 SE 1.5) で追加された並列プログラミング機能

作成日:2003.10.31

次期、Java2 Platform, Standard Edition 1.5 (以下、J2SE1.5) は言語仕様の拡張以外に、並列プログラミング機能の強化、JavaVM 内モニタリング機能の強化が加わった。 このページでは並列プログラミングプリミティブについて紹介してみる。

J2SE 1.5 で加わる並列プログラミング機能は JSR-166 Concurrency Utilities に基づいている。 java.util.concurrent.* パッケージに従来の Java スレッドと Java モニタ(synchronize/wait/notify) よりも強力な以下のような道具が用意される。

  1. 並列プログラムのためのコンテナクラスの拡張
  2. スレッド・プーリング(Thread Pooling)
  3. アトミック操作 (java.util.concurrent.atomic)
  4. ミューテックス、条件変数、リーダーライターロック (java.util.concurrent.locks)
  5. セマフォ、サイクリックバリア

JSR-166 は、その他 java.lang パッケージの SystemThreadThreadLocal などに対する機能拡張が行われている(ここ)。

1. 並列プログラムのためのコンテナクラスの拡張

java.util パッケージに FIFO (first-in-first-out) のマナーのコンテナクラスをあらわす java.util.Queue<E> インターフェイスと、その MT-safe 版 java.util.BlockingQueue<E> インターフェイスが新しく追加された。 FIFO のコンテナ自体は java.util.List<E> インターフェイスでも作成可能なのにわざわざ新たに Queue インターフェイスを設けたのは、マルチスレッド環境で要素の追加・取り出しを強化したコンテナを作りたかったためだと思われる。

Queue インターフェイスを実装したクラスはマルチスレッドへの対応度によって 3 種類に分かれる。

分類 説明 実装クラス
MT-unsafe Queue マルチスレッド時には安全に用できない Queue の実装クラス LinkedList, AbstractQueue, PriorityQueue
MT-safe Queue (blocking) ロックを使ってマルチスレッド時に安全に使用できるようにした Queue の実装クラス。 BlockingQueue インターフェイスを実装している。 DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue, ArrayBlockingQueue
MT-safe Queue (wait-free) マルチスレッド時に安全に使用できるが、 ロックは使用していないもの。
BlockingQueue インターフェイスではなく、Queue インターフェイスからの実装。
ConcurrentLinkedQueue

MT-unsafe 版は java.util パッケージに、MT-safe 版は blocking も wait-free も java.util.concurrent パッケージに入っている。

まず Blocking 名前のついている MT-safe クラスは synchronized によって同期処理を行っているクラスだと考えられる。 J2SE 1.4 以前はコレクションクラスを MT-safe をするのに以下の構文を使っていたが、いくつかのクラスには専用番ができたということだ。

List list = Collections.synchronizedList(new LinkedList(...));

一方、名前に Concurrent を含む wait-free 版 ConcurrentLinkedQueue は、排他制御に Java モニターを用いずにアトミック操作 を使ったキューである。 Blocking 版の場合 競合が発生するとスレッドが待機(サスペンド)してしまうが、Wait-free 版はアトミック操作を用いることで、スレッドが待機状態に落ちにくいように実装してある(アルゴリズムの解説はここ)。

Wait-free は排他制御の範囲が狭く、競合があまり頻発しない場所で使うと応答性が良くなり処理が高速化する。 しかし使いどころを誤ると CPU 使用率が高くなり、性能低下を招くことになる。

Wait-free 版は Queue インターフェイスを実装した ConcurrentLinkedQueue 以外にも、ConcurrentHashMap がある。 今後の Concurrency Utilities においては、blocking 版と wait-free 版の使い分けが重要になると思われる。

Queue の実装クラスについて簡単にメモしておく。

その他、並列プログラムのために java.util.concurrent パッケージには CopyOnWriteArrayListCopyOnWriteArraySet クラスが提供された。 サンプルがないので内容を把握仕切れていないが、これらのクラスはイテレータを使ってコンテナクラス内を走査している時は、別のスレッドが remove、set、add の操作を行ったとしてもイテレーション開始時のコンテナの状態が見え続けるクラスのようだ。 (イテレータがガーベージになると同期が取られるみたい)。

2. スレッド・プーリング(Thread Pooling)

java.util.concurrent パッケージ内では、 スレッド・プーリングのためのフレームワークが提供されている。

java.util.concurrent.ThreadPoolExecutor クラスがプールされているスレッドと処理すべきタスクのリストを管理し、スケジューリングしてゆく。 ThreadPoolExecutor の最も基本的なコンストラクタは、以下の表の引数を取る。

引数意味
int corePoolSizeプールしておく最小のスレッド数
int maximumPoolSize最大スレッド数
long keepAliveTime 仕事がない場合にスレッドをプールから外すしてゆくが、 仕事がなかったと判断されるまでの待ち時間。
TimeUnit unitkeepAliveTime の単位を決める。
BlockingQueue<java.lang.Runnable> workQueue ThreadPoolExecutor が処理される前のタスクをキューイングするために使うキュー

workQueueThreadPoolExecutor のもっとも特徴的な引数で、BlockingQueue<java.lang.Runnable> 型の引数を取り、実際には 1. で登場した LinkedBlockingQueueArrayBlockingQueueSynchronousQueuePriorityBlockingQueue を使うことができ、型によってキューイングの特徴がいろいろ変わってくる。

処理したいタスクは java.lang.Runnable インターフェイスを実装したオブジェクトとし、ThreadPoolExecutor.execute(java.lang.Runnable command) で work queue につなげる。

スレッド・プーリングにはもっと色々な機能が存在しているが、ここでは省略する。

3. アトミック操作 (java.util.concurrent.atomic)

java.util.concurrent.atomic パッケージでは、Java でアトミック操作を行うライブラリを提供する。

アトミック操作は最も基本的でローレベルな排他制御命令。 本来は CPU に命令として実装されるもので、変数の読み込み、値の比較、書き込みなど命令のシーケンスが、途中で割り込まれずに実行されることを保証する。

そのため、ユーザープログラム側でアトミック操作を直接触ることは少ないと思われる。 java.util.concurrent.atomic パッケージでは以下の 3 種類のアトミック命令が使えるようになっている。

swap (exchange)
「メモリ上の値 V を読み込み、その値を新しい値 U に置き換える」 という操作を不可分に行うことができる。
java.util.concurrent.atomic パッケージでは getAndSet メソッドという名前で提供される。

fetch-and-add
「メモリ上の値 V を読み込み、その値に D を足し、V + D に置き換える」 という操作を不可分に行うことができる。
java.util.concurrent.atomic パッケージでは addAndSetgetAndAddgetAndIncrementgetAndDecrementincrementAndGetdecrementAndSet メソッドという名前で提供される。

compare-and-swap
「メモリ上の値 V を読み込み、その値を値 C と比較する。 V と C が一致しなければ操作は失敗。 一致すれば新しい値 U をメモリ上に書き込む」 という一連の操作を不可分に行うことができる。 「失敗」があるのがこの処理の特徴。
java.util.concurrent.atomic パッケージでは compareAndSetweakCompareAndSet メソッドという名前で提供される。

使い方を例をあげてみる。

class Sequencer1 {
    private long sequenceNumber = 0;

    public long next() {
        return sequenceNumber++;
}}

class Sequencer2 {
    private long sequenceNumber = 0;
    private Object lock = new Object();

    public long next() {
        synchronized(lock){
        return sequenceNumber++;
}}}

class Sequencer3 {
    // Long 型のアトミック変数を用意。初期値は 0
    private AtomicLong sequenceNumber = new AtomicLong(0);

    public long next() {
        return sequenceNumber.getAndIncrement();
}}

Sequencer1 Sequencer2 Sequencer3 はいずれも next を呼ぶ度に 0 → 1 → 2 → 3 と順番に値を取得できるプログラム。 しかし Sequencer1 はマルチスレッドに対応していないので、複数のスレッドから呼び出されると同じ値を 2 度返したり、飛ばしたりする問題がある。

Sequencer2 は synchronized を使うことによってこの問題を解決したが、この程度のプログラムでありながら複数のスレッドから同時に next が呼ばれた場合、ロックの所有者(オーナー)になれなかったスレッドは CPU 実行権を手放し待機状態に入ってしまう。next が頻繁に呼ばれる場合効率が悪い。

Sequencer3 は Concurrency Utilities を使ったもので AtomicLong クラスの中に long 型の変数が入っている。 getAndIncrement メソッドを呼ぶと「値に1を足し、元の値を返す」という操作不可分に行う。複数のスレッドが同時に next を呼び出し競合が発生したとしてもCPU とバスが調停してくれるので、後からきたスレッドはブロックされることもなく、前のスレッドが終わったらすぐにgetAndIncrement の操作を実行することができる。

JSR-166 Concurrency Utilities のアトミック操作できるデータは、AtomicLong のように java.util.concurrent.atomic 内で定義されているクラスをインスタンス化したもののだけである。 すでに作成したしまったインスタンスは Concurrency Utilities で排他制御することができない。 これは少し不満な部分である。

J2SE 1.4 から導入された Buffer(DirectBuffer) は、Java ヒープの外にあるメモリ領域を Java の中から扱えるようになっている。 Buffer 領域のメモリにたいしてアトミック操作を行えれば C/C++ で書かれたライブラリとの連係がやりやすいのだが現在はそうなっていないようだ。 残念無念。

4. ミューテックス、条件変数、リーダーライターロック (java.util.concurrent.locks)

java.util.concurrent.locks パッケージでは、ミューテックス、コンディションバリアブル(条件変数)、リーダーライターロックなどの同期オブジェクトが追加された。

インターフェイスとして Lock (ミューテックス)、Condition (条件変数)、ReadWriteLock (リーダーライターロック) が用意され、その実装クラスである ReentrantLockReentrantReadWriteLock などが提供される。

これまで Object クラスの notifywait では不十分だった部分が一挙にパワーアップ。

Lock の使いかたは、当然以下のようになる。

Lock l = ...;
l.lock();
try {
    // ここに排他制御したい内容を
} finally {
    l.unlock();
}

面白いのは Conditionの扱い。 このメモで書いたように、条件変数はミューテックスと一緒に使う必要があるのだが、Condition は赤字の部分のように Lock から newCondition() メソッドを使って取り出されるので、生成された時点から関係付けられたミューテックスが存在することになる。

この例では実装クラスReentrantLock から newCondition() されたので、その実体は内部クラス ReentrantLock.ConditionObject になる。

class BoundedBuffer {
  Lock lock           = new ReentrantLock(); // ミューテックスの生成
  final Condition cv  = lock.newCondition(); // 条件変数の生成

  public void condtion_wait() throws InterruptedException {
    lock.lock();
    try {
      cv.wait();
    } finally {
      lock.unlock();
    }
  }

  public void condition_signal() throws InterruptedException {
    lock.lock();
    try {
      cv.signal();
    } finally {
      lock.unlock();
    }
  }
}

5. セマフォ、サイクリックバリア

java.util.concurrent パッケージ内では、同期のための便利なデータ構造が提供されている。 3. 4. の構造が同期プログラムのためのプリミティブだとしたら、5. はマルチスレッドプログラムの骨格を決めるデザイン・パターンのようなもの提供する。

Semaphore
いわゆるセマフォ(semaphore) を提供するデータ構造。 セマフォは内部にカウンターを持っていて最大何スレッドから、 自分を獲得できるかを設定できる。 そのため有限個数の共有資源を管理するのに セマフォを使うことが多い。
セマフォを獲得したいスレッドは Semaphore.acquire() を呼びだす。 セマフォ内部のカウンタが 0 でない場合には スレッドは動作を許可される。同時にカウンタは 1 減る。 セマフォを使用し終わったら Semaphore.release() を呼び出す。 カウンタは 1 増え元に戻る。
内部のカウンタが 0 の場合には、 他のスレッドが release を行うまでブロックされる。
Semaphore sema = new Semaphore(5);

int foo() {
    sema.acquire();
    // この部分は同時に 5 スレッドしか動作できない。
    sema.release();
}
カウンタ 1 のセマフォはようするにミューテックス(mutex)です。

FairSemaphore
Semaphore 同様のセマフォを機能を提供するが、 実行権限の獲得ルールが Semaphore と異なり fair になっている。

セマフォが複数のスレッドが待機させているとき、 別のスレッドが release を実行して 実行権限が戻ってきたとする。
FairSemaphore は 現在 待機中のスレッドのうち 最初に aquire を実行したものに、 実行権限を渡す。
Semaphore はこのような順序を保証しない。 それどころか release で回収された実行権限を 待機中のスレッドに渡すということも保証しない。 そのためrelease をしたスレッドが直後に aquire を実行すると、 そのまま実行権限を手に入ってしまうという ことがありえる。 このような場合、 スレッドスケジューリングのタイミングによっては いったんセマフォでブロックされたスレッドは プログラムが終わるまで待機し続けるという危険性がある。

CountDownLatch
スレッドのランデブーポイントを作成するのに便利なクラス。
複数のワーキングスレッドの終了を待つような場合に便利なデータ構造で、 大雑把にいって下のようなはたらきをする。
class CountDownLatch {
  Object lock = new Object();
  int    count;
  CountDownLatch(int cnt){
    this.count = cnt;
  }
  synchronized void countDown() {
    count--
    if( count == 0 ){
      lock.notifyAll();
    }
  }
  
  void await() throws InterruptedException {
    lock.wait();
  }
}

CyclicBarrier
CyclicBarrier は バリアポイントが複数あるような マルチスレッドプログラムで有効な同期機構。
CyclicBarrier のコンストラクターは、 CyclicBarrier(int parties, java.lang.Runnable barrierAction) となっている。 parties でバリアを行うワーカースレッドの数を決定する。 parties 数のワーカースレッドが CyclicBarrier.await() を呼び出せば ワーカースレッドのランデブーが完了する。 ランデブー完了後に barrierAction が実行される。
class Controller {
  final int MAX_CPUS  = ...;
  final CyclicBarrier barrier;

  public Controller() {
    barrier = new CyclicBarrier(MAX_CPUS,
        new Runnable() {
          public void run() {
            // すべてのワーカースレッドが await によって待機した後に、
            // この Runnable が実行される。
        }});
    for (int i = 0; i < MAX_CPUS; ++i)
      new Thread(new Worker(i)).start();
    waitUntilDone();
  }

  // ワーカースレッド
  class Worker implements Runnable {
    int index;

    Worker(int i) {
      index = i;
    }

    public void run() {
      while (!done()) {
        // ここに1回の処理内容を書く
        try {
          barrier.await(); // ここでランデブー
        } catch (InterruptedException ex) {
          return;
        } catch (BrokenBarrierException ex) {
          return;
        }
      }
    }
  }
}

Exchanger<V>
Exchanger<V> は 2つのスレッドがランデブーポイントで データを交換しあう同期機構。
下の例ではユーザー定義の Data 型のオブジェクトを 交換することになる。
Exchanger<Data> exchanger = new Exchanger();

// 2つのスレッドによって実行されるメソッド
int foo() {
    // ここで処理を行い自分が生成したデータを myData にいれておく

    // ランデブーポイント。別のスレッドが来るまでここまで待機
    Data otherThreadsData = exchanger.exchange(myData);
}
exchanger.exchange(myData)に自スレッドのデータを引数として渡すと、対応する相手スレッドのデータが戻り値として返ってくる。

その他のパッケージに対する変更

Concurrency Utilities の導入と共に、java.lang. 以下のパッケージも変更を受けたものがある。

java.lang.System
java.lang.System にはシステムの持つナノ秒のクロックを返すメソッドが導入された。 時間測定に使える解像度が上がったようだ。
class java.lang.System {
    public static long nanoTime()
このタイマーは CPU のリアルタイムクロックを (定数で乗除算して) そのまま返すことを狙っているので、 nanoTime メソッドの返す値の絶対値には意味がない。 2つの nanoTime の差だけが意味を持つ。
この点は、 currentTimeMillis が 基準時刻(1970 年1月1日午前0時)との差になっているのとは対象的。
UNIX タイムのように基準時刻からの時間というような意味のある値にはなっていない。

java.lang.Thread
java.lang.Thread には 以下のようなメソッドが追加されている。
public class Thread implements java.lang.Runnable {
    // スレッドのスタックトレースが取れる
    public java.lang.StackTraceElement[] getStackTrace()

    // すべてのスレッドのスタックトレース
    public static Map<K,V> getAllStackTraces()

    public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler()
    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)

    public static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)
    public static Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler()
    // システム依存のスレッドの ID
    public long getId()
}
  1. getStackTracegetAllStackTraces メソッドが追加され、 ユーザープログラム内から普通にスタックトレースが取れるようになった。 無論、このメソッドを呼び出すためにはセキュリティーマネージャーの許可が必要。

  2. getUncaughtExceptionHandlersetUncaughtExceptionHandler メソッドは キャッチされなかった例外を最終的にキャッチする網を、 Java スレッド毎に登録することのできるメソッド。 getDefaultUncaughtExceptionHandlersetDefaultUncaughtExceptionHandler によって 全 Java スレッドの未捕捉例外を捕捉する網を張ることも可能。 J2SE 1.4 までは、 キャッチされない例外は Java スレッドのスレッドグループ(ThreadGroup) に伝達されていた。 未捕捉例外を強制的にキャッチしたければ ThreadGroupuncaughtException メソッドを オーバーライドする必要があったが、 J2SE 1.5 のハンドラ登録の方が簡単になった。

  3. システムに依存するスレッド ID を返す getId メソッドが追加された。 初期の JavaVM は green thread 方式を取り JavaVM 内で閉じたユーザースレッドを提供していたが、 現行の JavaVM のほとんどは Java スレッドを OS/システムのスレッドと 一対一に割り当てる native thread 方式を取っている。 getId メソッドは OS/システムのスレッドの識別子を返す ローレベル API である。

java.lang.ThreadLocal
Java スレッド毎に固有のデータを保存する ThreadLocal クラスは、汎型(generic type) の導入によってThreadLocal<T> に変わっている。
これまでの ThreadLocalObject 型のみを入れられていたが、generic によって自分の好きなオブジェクトの型が入れられるようになった。
また、一旦登録したオブジェクトを削除する remove メソッドが追加されている。
public class ThreadLocal<T> {
    T              get()
    protected  T   initialValue()
    void           remove()
    void           set(T value)
}

まとめ

JSR-166 Concurrency Utilities では、ハードウェアのローレベルな機能を呼び出す API がバシバシ追加された。 ユーザーが永らく望んでいた機能だが、ちょっと恐いのは実装の自由度の問題。

追加された機能はあくまでも JDK のクラスライブラリレベルのものなので、実装者がどう実装するかはまったくの自由。 商用 JavaVM メーカーはランタイム、JIT コンパイラ、クラスライブラリの協調によって、java.util.concurrent. 以下のクラスで CPU や OS のネイティブな機能を使えるように最適化するはず。

だが手を抜いた実装もあると思われる。 java.util.concurrent.atomic は Java モニタを利用してベタに実装しても構わないし、System.nanoTime()System.currentTimeMillis() を 単純に 1,000 倍した値を返しても構わない。 手を抜かなくても、どの方向に最適化するかという性能を追求する方向の違いもある。

同期プリミティブはライブラリの仕様が等しくても、タイミングやポリシーの差によって動作・挙動がまったく変わってくることが結構ある(例えば、ロックが fair なのか unfair なのかなど)。

Concurrency Utilities を使うユーザーは性能を追求した並列プログラムを書きたいわけだが、それが JavaVM のメーカーやバージョンの違いによって大幅に動作が変わってくるとしたら堪らない。 最終的にはプログラムの先頭で JavaVM の情報を得て、Sun JVM の場合にはこっちのメソッドを IBM JVM の場合にはあっちのメソッドを呼ぶなんてことになりそうな気がする。

もっとも、Tiger が出ても、 Concurrency Utilities は 簡単には普及しないかもしれないな〜〜〜

関連ページ

コメント

トラックバック   [Trackback URL: http://www.nminoru.jp/cgi-bin/tb.cgi/java__j2se15_concurrent]
[りっくでぃあすの日記] [Java]Java5.0の新機能。 2007-06-17 (Sun) 03:20:56
中村実氏のブログより。 Tiger (Java2 SE 1.5) で追加された並列プログラミング機能 今更、気づきました。。。 確かに 1.5 になったら java.util.concurrent というパッケージが増えてた。 java.util.concurrent (Java 2 Platform SE 5.0) java.util.concurrent.atomic (Jav
コメントを書き込む

TOP    掲示板    戻る
Written by NAKAMURA Minoru, Twitter:@nminoru_jp