次期、Java2 Platform, Standard Edition 1.5 (以下、J2SE1.5) は言語仕様の拡張以外に、並列プログラミング機能の強化、JavaVM 内モニタリング機能の強化が加わった。 このページでは並列プログラミングプリミティブについて紹介してみる。
J2SE 1.5 で加わる並列プログラミング機能は JSR-166 Concurrency Utilities に基づいている。
java.util.concurrent.*
パッケージに従来の Java スレッドと Java モニタ(synchronize/wait/notify) よりも強力な以下のような道具が用意される。
- 並列プログラムのためのコンテナクラスの拡張
- スレッド・プーリング(Thread Pooling)
- アトミック操作 (java.util.concurrent.atomic)
- ミューテックス、条件変数、リーダーライターロック (java.util.concurrent.locks)
- セマフォ、サイクリックバリア
JSR-166 は、その他 java.lang
パッケージの System
、Thread
、ThreadLocal
などに対する機能拡張が行われている(ここ)。
1. 並列プログラムのためのコンテナクラスの拡張
java.util
パッケージに FIFO (first-in-first-out) のマナーのコンテナクラスをあらわす java.util.Queue<E>
インターフェイスと、その MT-safe 版 java.util.BlockingQueue<E>
インターフェイスが新しく追加された。
FIFO のコンテナ自体は java.util.List<E>
インターフェイスでも作成可能なのにわざわざ新たに Queue
インターフェイスを設けたのは、マルチスレッド環境で要素の追加・取り出しを強化したコンテナを作りたかったためだと思われる。
Queue
インターフェイスを実装したクラスはマルチスレッドへの対応度によって 3 種類に分かれる。
分類 | 説明 | 実装クラス |
---|---|---|
MT-unsafe Queue | マルチスレッド時には安全に用できない Queue の実装クラス | LinkedList, AbstractQueue, PriorityQueue |
MT-safe Queue (blocking) | ロックを使ってマルチスレッド時に安全に使用できるようにした Queue の実装クラス。 BlockingQueue インターフェイスを実装している。 | DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue, ArrayBlockingQueue |
MT-safe Queue (wait-free) | マルチスレッド時に安全に使用できるが、
ロックは使用していないもの。 BlockingQueue インターフェイスではなく、Queue インターフェイスからの実装。 |
ConcurrentLinkedQueue |
MT-unsafe 版は java.util
パッケージに、MT-safe 版は blocking も wait-free も java.util.concurrent
パッケージに入っている。
まず Blocking 名前のついている MT-safe クラスは synchronized
によって同期処理を行っているクラスだと考えられる。
J2SE 1.4 以前はコレクションクラスを MT-safe をするのに以下の構文を使っていたが、いくつかのクラスには専用番ができたということだ。
List list = Collections.synchronizedList(new LinkedList(...));
一方、名前に Concurrent を含む wait-free 版 ConcurrentLinkedQueue
は、排他制御に Java モニターを用いずにアトミック操作 を使ったキューである。
Blocking 版の場合 競合が発生するとスレッドが待機(サスペンド)してしまうが、Wait-free 版はアトミック操作を用いることで、スレッドが待機状態に落ちにくいように実装してある(アルゴリズムの解説はここ)。
Wait-free は排他制御の範囲が狭く、競合があまり頻発しない場所で使うと応答性が良くなり処理が高速化する。 しかし使いどころを誤ると CPU 使用率が高くなり、性能低下を招くことになる。
Wait-free 版は Queue
インターフェイスを実装した ConcurrentLinkedQueue
以外にも、ConcurrentHashMap
がある。
今後の Concurrency Utilities においては、blocking 版と wait-free 版の使い分けが重要になると思われる。
Queue
の実装クラスについて簡単にメモしておく。
AbstractQueue<E>
は普通のキュー。LinkedQueue<E>
、LinkedBlockingQueue<E>
は、J2SE 1.4 以前にも存在する既存のクラス。 もともとList<E>
インターフェイスを実装していたが、Queue<E>
インターフェイスにも対応した。PriorityQueue<E>
、PriorityBlockingQueue<E>
は優先順位付きキューで、コンストラクターの引数に comparator を渡すとエレメント間に優先順位を付けれるようになる。 つまり同じ優先順位では FIFO だが、優先順位に差があると優先度が高い方が先に出てくるキュー。ArrayQueue<E>
は固定サイズのキュー。SynchronousQueue<E>
は実質的にサイズが 0 のキューで、要素を挿入しようとするスレッドと要素を取り出そうとするスレッドをリンクして要素を橋渡しして転送する。DelayQueue
は挿入した要素が一定時間が経過するまで取り出せないキュー。
その他、並列プログラムのために java.util.concurrent
パッケージには CopyOnWriteArrayList
と CopyOnWriteArraySet
クラスが提供された。
サンプルがないので内容を把握仕切れていないが、これらのクラスはイテレータを使ってコンテナクラス内を走査している時は、別のスレッドが remove、set、add の操作を行ったとしてもイテレーション開始時のコンテナの状態が見え続けるクラスのようだ。
(イテレータがガーベージになると同期が取られるみたい)。
2. スレッド・プーリング(Thread Pooling)
java.util.concurrent
パッケージ内では、
スレッド・プーリングのためのフレームワークが提供されている。
java.util.concurrent.ThreadPoolExecutor
クラスがプールされているスレッドと処理すべきタスクのリストを管理し、スケジューリングしてゆく。
ThreadPoolExecutor
の最も基本的なコンストラクタは、以下の表の引数を取る。
引数 | 意味 |
---|---|
int corePoolSize | プールしておく最小のスレッド数 |
int maximumPoolSize | 最大スレッド数 |
long keepAliveTime | 仕事がない場合にスレッドをプールから外すしてゆくが、 仕事がなかったと判断されるまでの待ち時間。 |
TimeUnit unit | keepAliveTime の単位を決める。 |
BlockingQueue<java.lang.Runnable> workQueue | ThreadPoolExecutor が処理される前のタスクをキューイングするために使うキュー |
workQueue
は ThreadPoolExecutor
のもっとも特徴的な引数で、BlockingQueue<java.lang.Runnable>
型の引数を取り、実際には 1. で登場した LinkedBlockingQueue
、ArrayBlockingQueue
、SynchronousQueue
、PriorityBlockingQueue
を使うことができ、型によってキューイングの特徴がいろいろ変わってくる。
処理したいタスクは java.lang.Runnable
インターフェイスを実装したオブジェクトとし、ThreadPoolExecutor.execute(java.lang.Runnable command)
で work queue につなげる。
スレッド・プーリングにはもっと色々な機能が存在しているが、ここでは省略する。
3. アトミック操作 (java.util.concurrent.atomic)
java.util.concurrent.atomic
パッケージでは、Java でアトミック操作を行うライブラリを提供する。
アトミック操作は最も基本的でローレベルな排他制御命令。 本来は CPU に命令として実装されるもので、変数の読み込み、値の比較、書き込みなど命令のシーケンスが、途中で割り込まれずに実行されることを保証する。
そのため、ユーザープログラム側でアトミック操作を直接触ることは少ないと思われる。
java.util.concurrent.atomic
パッケージでは以下の 3 種類のアトミック命令が使えるようになっている。
- swap (exchange)
- 「メモリ上の値 V を読み込み、その値を新しい値 U に置き換える」
という操作を不可分に行うことができる。
java.util.concurrent.atomic
パッケージではgetAndSet
メソッドという名前で提供される。
- fetch-and-add
- 「メモリ上の値 V を読み込み、その値に D を足し、V + D に置き換える」
という操作を不可分に行うことができる。
java.util.concurrent.atomic
パッケージではaddAndSet
、getAndAdd
、getAndIncrement
、getAndDecrement
、incrementAndGet
、decrementAndSet
メソッドという名前で提供される。
- compare-and-swap
- 「メモリ上の値 V を読み込み、その値を値 C と比較する。
V と C が一致しなければ操作は失敗。
一致すれば新しい値 U をメモリ上に書き込む」
という一連の操作を不可分に行うことができる。
「失敗」があるのがこの処理の特徴。
java.util.concurrent.atomic
パッケージではcompareAndSet
、weakCompareAndSet
メソッドという名前で提供される。
使い方を例をあげてみる。
class Sequencer1 { private long sequenceNumber = 0; public long next() { return sequenceNumber++; }} class Sequencer2 { private long sequenceNumber = 0; private Object lock = new Object(); public long next() { synchronized(lock){ return sequenceNumber++; }}} class Sequencer3 { // Long 型のアトミック変数を用意。初期値は 0 private AtomicLong sequenceNumber = new AtomicLong(0); public long next() { return sequenceNumber.getAndIncrement(); }}
Sequencer1
Sequencer2
Sequencer3
はいずれも next
を呼ぶ度に 0 → 1 → 2 → 3 と順番に値を取得できるプログラム。
しかし Sequencer1
はマルチスレッドに対応していないので、複数のスレッドから呼び出されると同じ値を 2 度返したり、飛ばしたりする問題がある。
Sequencer2
は synchronized を使うことによってこの問題を解決したが、この程度のプログラムでありながら複数のスレッドから同時に next
が呼ばれた場合、ロックの所有者(オーナー)になれなかったスレッドは CPU 実行権を手放し待機状態に入ってしまう。next
が頻繁に呼ばれる場合効率が悪い。
Sequencer3
は Concurrency Utilities を使ったもので AtomicLong
クラスの中に long 型の変数が入っている。
getAndIncrement
メソッドを呼ぶと「値に1を足し、元の値を返す」という操作不可分に行う。複数のスレッドが同時に next
を呼び出し競合が発生したとしてもCPU とバスが調停してくれるので、後からきたスレッドはブロックされることもなく、前のスレッドが終わったらすぐにgetAndIncrement の操作を実行することができる。
JSR-166 Concurrency Utilities のアトミック操作できるデータは、AtomicLong のように java.util.concurrent.atomic
内で定義されているクラスをインスタンス化したもののだけである。
すでに作成したしまったインスタンスは Concurrency Utilities で排他制御することができない。
これは少し不満な部分である。
J2SE 1.4 から導入された Buffer
(DirectBuffer
) は、Java ヒープの外にあるメモリ領域を Java の中から扱えるようになっている。
Buffer
領域のメモリにたいしてアトミック操作を行えれば C/C++ で書かれたライブラリとの連係がやりやすいのだが現在はそうなっていないようだ。
残念無念。
4. ミューテックス、条件変数、リーダーライターロック (java.util.concurrent.locks)
java.util.concurrent.locks
パッケージでは、ミューテックス、コンディションバリアブル(条件変数)、リーダーライターロックなどの同期オブジェクトが追加された。
インターフェイスとして Lock
(ミューテックス)、Condition
(条件変数)、ReadWriteLock
(リーダーライターロック) が用意され、その実装クラスである ReentrantLock
、ReentrantReadWriteLock
などが提供される。
これまで Object
クラスの notify
、wait
では不十分だった部分が一挙にパワーアップ。
Lock
の使いかたは、当然以下のようになる。
Lock l = ...; l.lock(); try { // ここに排他制御したい内容を } finally { l.unlock(); }
面白いのは Condition
の扱い。
このメモで書いたように、条件変数はミューテックスと一緒に使う必要があるのだが、Condition
は赤字の部分のように Lock
から newCondition()
メソッドを使って取り出されるので、生成された時点から関係付けられたミューテックスが存在することになる。
この例では実装クラスReentrantLock
から newCondition()
されたので、その実体は内部クラス ReentrantLock.ConditionObject
になる。
class BoundedBuffer { Lock lock = new ReentrantLock(); // ミューテックスの生成 final Condition cv = lock.newCondition(); // 条件変数の生成 public void condtion_wait() throws InterruptedException { lock.lock(); try { cv.wait(); } finally { lock.unlock(); } } public void condition_signal() throws InterruptedException { lock.lock(); try { cv.signal(); } finally { lock.unlock(); } } }
5. セマフォ、サイクリックバリア
java.util.concurrent
パッケージ内では、同期のための便利なデータ構造が提供されている。
3. 4. の構造が同期プログラムのためのプリミティブだとしたら、5. はマルチスレッドプログラムの骨格を決めるデザイン・パターンのようなもの提供する。
Semaphore
- いわゆるセマフォ(semaphore) を提供するデータ構造。
セマフォは内部にカウンターを持っていて最大何スレッドから、
自分を獲得できるかを設定できる。
そのため有限個数の共有資源を管理するのに
セマフォを使うことが多い。
セマフォを獲得したいスレッドはSemaphore.acquire()
を呼びだす。 セマフォ内部のカウンタが 0 でない場合には スレッドは動作を許可される。同時にカウンタは 1 減る。 セマフォを使用し終わったらSemaphore.release()
を呼び出す。 カウンタは 1 増え元に戻る。
内部のカウンタが 0 の場合には、 他のスレッドがrelease
を行うまでブロックされる。
Semaphore sema = new Semaphore(5); int foo() { sema.acquire(); // この部分は同時に 5 スレッドしか動作できない。 sema.release(); }
FairSemaphore
-
Semaphore
同様のセマフォを機能を提供するが、 実行権限の獲得ルールがSemaphore
と異なり fair になっている。
セマフォが複数のスレッドが待機させているとき、 別のスレッドがrelease
を実行して 実行権限が戻ってきたとする。
FairSemaphore
は 現在 待機中のスレッドのうち 最初にaquire
を実行したものに、 実行権限を渡す。
Semaphore
はこのような順序を保証しない。 それどころかrelease
で回収された実行権限を 待機中のスレッドに渡すということも保証しない。 そのためrelease
をしたスレッドが直後にaquire
を実行すると、 そのまま実行権限を手に入ってしまうという ことがありえる。 このような場合、 スレッドスケジューリングのタイミングによっては いったんセマフォでブロックされたスレッドは プログラムが終わるまで待機し続けるという危険性がある。
CountDownLatch
-
スレッドのランデブーポイントを作成するのに便利なクラス。
複数のワーキングスレッドの終了を待つような場合に便利なデータ構造で、 大雑把にいって下のようなはたらきをする。
class CountDownLatch { Object lock = new Object(); int count; CountDownLatch(int cnt){ this.count = cnt; } synchronized void countDown() { count-- if( count == 0 ){ lock.notifyAll(); } } void await() throws InterruptedException { lock.wait(); } }
CyclicBarrier
-
CyclicBarrier
は バリアポイントが複数あるような マルチスレッドプログラムで有効な同期機構。
CyclicBarrier
のコンストラクターは、CyclicBarrier(int parties, java.lang.Runnable barrierAction)
となっている。parties
でバリアを行うワーカースレッドの数を決定する。parties
数のワーカースレッドがCyclicBarrier.await()
を呼び出せば ワーカースレッドのランデブーが完了する。 ランデブー完了後にbarrierAction
が実行される。
class Controller { final int MAX_CPUS = ...; final CyclicBarrier barrier; public Controller() { barrier = new CyclicBarrier(MAX_CPUS, new Runnable() { public void run() { // すべてのワーカースレッドが await によって待機した後に、 // この Runnable が実行される。 }}); for (int i = 0; i < MAX_CPUS; ++i) new Thread(new Worker(i)).start(); waitUntilDone(); } // ワーカースレッド class Worker implements Runnable { int index; Worker(int i) { index = i; } public void run() { while (!done()) { // ここに1回の処理内容を書く try { barrier.await(); // ここでランデブー } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } }
Exchanger<V>
Exchanger<V>
は 2つのスレッドがランデブーポイントで データを交換しあう同期機構。
下の例ではユーザー定義のData
型のオブジェクトを 交換することになる。
Exchanger<Data> exchanger = new Exchanger(); // 2つのスレッドによって実行されるメソッド int foo() { // ここで処理を行い自分が生成したデータを myData にいれておく // ランデブーポイント。別のスレッドが来るまでここまで待機 Data otherThreadsData = exchanger.exchange(myData); }
exchanger.exchange(myData)に自スレッドのデータを引数として渡すと、対応する相手スレッドのデータが戻り値として返ってくる。
その他のパッケージに対する変更
Concurrency Utilities の導入と共に、java.lang. 以下のパッケージも変更を受けたものがある。
- java.lang.System
-
java.lang.System にはシステムの持つナノ秒のクロックを返すメソッドが導入された。
時間測定に使える解像度が上がったようだ。
class java.lang.System { public static long nanoTime()
nanoTime
メソッドの返す値の絶対値には意味がない。 2つのnanoTime
の差だけが意味を持つ。
この点は、currentTimeMillis
が 基準時刻(1970 年1月1日午前0時)との差になっているのとは対象的。
UNIX タイムのように基準時刻からの時間というような意味のある値にはなっていない。
- java.lang.Thread
-
java.lang.Thread
には 以下のようなメソッドが追加されている。
public class Thread implements java.lang.Runnable { // スレッドのスタックトレースが取れる public java.lang.StackTraceElement[] getStackTrace() // すべてのスレッドのスタックトレース public static Map<K,V> getAllStackTraces() public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) public static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) public static Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() // システム依存のスレッドの ID public long getId() }
getStackTrace
、getAllStackTraces
メソッドが追加され、 ユーザープログラム内から普通にスタックトレースが取れるようになった。 無論、このメソッドを呼び出すためにはセキュリティーマネージャーの許可が必要。
getUncaughtExceptionHandler
、setUncaughtExceptionHandler
メソッドは キャッチされなかった例外を最終的にキャッチする網を、 Java スレッド毎に登録することのできるメソッド。getDefaultUncaughtExceptionHandler
、setDefaultUncaughtExceptionHandler
によって 全 Java スレッドの未捕捉例外を捕捉する網を張ることも可能。 J2SE 1.4 までは、 キャッチされない例外は Java スレッドのスレッドグループ(ThreadGroup) に伝達されていた。 未捕捉例外を強制的にキャッチしたければThreadGroup
のuncaughtException
メソッドを オーバーライドする必要があったが、 J2SE 1.5 のハンドラ登録の方が簡単になった。
- システムに依存するスレッド ID を返す
getId
メソッドが追加された。 初期の JavaVM は green thread 方式を取り JavaVM 内で閉じたユーザースレッドを提供していたが、 現行の JavaVM のほとんどは Java スレッドを OS/システムのスレッドと 一対一に割り当てる native thread 方式を取っている。getId
メソッドは OS/システムのスレッドの識別子を返す ローレベル API である。
- java.lang.ThreadLocal
-
Java スレッド毎に固有のデータを保存する
ThreadLocal
クラスは、汎型(generic type) の導入によってThreadLocal<T>
に変わっている。
これまでのThreadLocal
はObject
型のみを入れられていたが、generic によって自分の好きなオブジェクトの型が入れられるようになった。
また、一旦登録したオブジェクトを削除するremove
メソッドが追加されている。
public class ThreadLocal<T> { T get() protected T initialValue() void remove() void set(T value) }
まとめ
JSR-166 Concurrency Utilities では、ハードウェアのローレベルな機能を呼び出す API がバシバシ追加された。 ユーザーが永らく望んでいた機能だが、ちょっと恐いのは実装の自由度の問題。
追加された機能はあくまでも JDK のクラスライブラリレベルのものなので、実装者がどう実装するかはまったくの自由。
商用 JavaVM メーカーはランタイム、JIT コンパイラ、クラスライブラリの協調によって、java.util.concurrent.
以下のクラスで CPU や OS のネイティブな機能を使えるように最適化するはず。
だが手を抜いた実装もあると思われる。
java.util.concurrent.atomic
は Java モニタを利用してベタに実装しても構わないし、System.nanoTime()
も System.currentTimeMillis()
を 単純に 1,000 倍した値を返しても構わない。
手を抜かなくても、どの方向に最適化するかという性能を追求する方向の違いもある。
同期プリミティブはライブラリの仕様が等しくても、タイミングやポリシーの差によって動作・挙動がまったく変わってくることが結構ある(例えば、ロックが fair なのか unfair なのかなど)。
Concurrency Utilities を使うユーザーは性能を追求した並列プログラムを書きたいわけだが、それが JavaVM のメーカーやバージョンの違いによって大幅に動作が変わってくるとしたら堪らない。 最終的にはプログラムの先頭で JavaVM の情報を得て、Sun JVM の場合にはこっちのメソッドを IBM JVM の場合にはあっちのメソッドを呼ぶなんてことになりそうな気がする。
もっとも、Tiger が出ても、 Concurrency Utilities は 簡単には普及しないかもしれないな~~~
関連ページ
- JSR-000166 Concurrency Utilities Specification 1.0 Public Review Draft
仕様はここ から ダウンロード可能。
- JSR-166 の提唱者のページ
(PDF 資料)