次期、Java2 Platform, Standard Edition 1.5 (以下、J2SE1.5) は言語仕様の拡張以外に、並列プログラミング機能の強化、JavaVM 内モニタリング機能の強化が加わった。 このページでは並列プログラミングプリミティブについて紹介してみる。
J2SE 1.5 で加わる並列プログラミング機能は JSR-166 Concurrency Utilities に基づいている。
java.util.concurrent.* パッケージに従来の Java スレッドと Java モニタ(synchronize/wait/notify) よりも強力な以下のような道具が用意される。
- 並列プログラムのためのコンテナクラスの拡張
- スレッド・プーリング(Thread Pooling)
- アトミック操作 (java.util.concurrent.atomic)
- ミューテックス、条件変数、リーダーライターロック (java.util.concurrent.locks)
- セマフォ、サイクリックバリア
JSR-166 は、その他 java.lang パッケージの System、Thread、ThreadLocal などに対する機能拡張が行われている(ここ)。
1. 並列プログラムのためのコンテナクラスの拡張
java.util パッケージに FIFO (first-in-first-out) のマナーのコンテナクラスをあらわす java.util.Queue<E> インターフェイスと、その MT-safe 版 java.util.BlockingQueue<E> インターフェイスが新しく追加された。
FIFO のコンテナ自体は java.util.List<E> インターフェイスでも作成可能なのにわざわざ新たに Queue インターフェイスを設けたのは、マルチスレッド環境で要素の追加・取り出しを強化したコンテナを作りたかったためだと思われる。
Queue インターフェイスを実装したクラスはマルチスレッドへの対応度によって 3 種類に分かれる。
分類 説明 実装クラス MT-unsafe Queue マルチスレッド時には安全に用できない Queue の実装クラス LinkedList, AbstractQueue, PriorityQueue MT-safe Queue (blocking) ロックを使ってマルチスレッド時に安全に使用できるようにした Queue の実装クラス。 BlockingQueue インターフェイスを実装している。 DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue, ArrayBlockingQueue MT-safe Queue (wait-free) マルチスレッド時に安全に使用できるが、 ロックは使用していないもの。
BlockingQueue インターフェイスではなく、Queue インターフェイスからの実装。ConcurrentLinkedQueue
MT-unsafe 版は java.util パッケージに、MT-safe 版は blocking も wait-free も java.util.concurrent パッケージに入っている。
まず Blocking 名前のついている MT-safe クラスは synchronized によって同期処理を行っているクラスだと考えられる。
J2SE 1.4 以前はコレクションクラスを MT-safe をするのに以下の構文を使っていたが、いくつかのクラスには専用番ができたということだ。
List list = Collections.synchronizedList(new LinkedList(...));
一方、名前に Concurrent を含む wait-free 版 ConcurrentLinkedQueue は、排他制御に Java モニターを用いずにアトミック操作 を使ったキューである。
Blocking 版の場合 競合が発生するとスレッドが待機(サスペンド)してしまうが、Wait-free 版はアトミック操作を用いることで、スレッドが待機状態に落ちにくいように実装してある(アルゴリズムの解説はここ)。
Wait-free は排他制御の範囲が狭く、競合があまり頻発しない場所で使うと応答性が良くなり処理が高速化する。 しかし使いどころを誤ると CPU 使用率が高くなり、性能低下を招くことになる。
Wait-free 版は Queue インターフェイスを実装した ConcurrentLinkedQueue 以外にも、ConcurrentHashMap がある。
今後の Concurrency Utilities においては、blocking 版と wait-free 版の使い分けが重要になると思われる。
Queue の実装クラスについて簡単にメモしておく。
AbstractQueue<E>は普通のキュー。LinkedQueue<E>、LinkedBlockingQueue<E>は、J2SE 1.4 以前にも存在する既存のクラス。 もともとList<E>インターフェイスを実装していたが、Queue<E>インターフェイスにも対応した。PriorityQueue<E>、PriorityBlockingQueue<E>は優先順位付きキューで、コンストラクターの引数に comparator を渡すとエレメント間に優先順位を付けれるようになる。 つまり同じ優先順位では FIFO だが、優先順位に差があると優先度が高い方が先に出てくるキュー。ArrayQueue<E>は固定サイズのキュー。SynchronousQueue<E>は実質的にサイズが 0 のキューで、要素を挿入しようとするスレッドと要素を取り出そうとするスレッドをリンクして要素を橋渡しして転送する。DelayQueueは挿入した要素が一定時間が経過するまで取り出せないキュー。
その他、並列プログラムのために java.util.concurrent パッケージには CopyOnWriteArrayList と CopyOnWriteArraySet クラスが提供された。
サンプルがないので内容を把握仕切れていないが、これらのクラスはイテレータを使ってコンテナクラス内を走査している時は、別のスレッドが remove、set、add の操作を行ったとしてもイテレーション開始時のコンテナの状態が見え続けるクラスのようだ。
(イテレータがガーベージになると同期が取られるみたい)。
2. スレッド・プーリング(Thread Pooling)
java.util.concurrent パッケージ内では、
スレッド・プーリングのためのフレームワークが提供されている。
java.util.concurrent.ThreadPoolExecutor クラスがプールされているスレッドと処理すべきタスクのリストを管理し、スケジューリングしてゆく。
ThreadPoolExecutor の最も基本的なコンストラクタは、以下の表の引数を取る。
引数 意味 int corePoolSize プールしておく最小のスレッド数 int maximumPoolSize 最大スレッド数 long keepAliveTime 仕事がない場合にスレッドをプールから外すしてゆくが、 仕事がなかったと判断されるまでの待ち時間。 TimeUnit unit keepAliveTime の単位を決める。 BlockingQueue<java.lang.Runnable> workQueue ThreadPoolExecutor が処理される前のタスクをキューイングするために使うキュー
workQueue は ThreadPoolExecutor のもっとも特徴的な引数で、BlockingQueue<java.lang.Runnable> 型の引数を取り、実際には 1. で登場した LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、PriorityBlockingQueue を使うことができ、型によってキューイングの特徴がいろいろ変わってくる。
処理したいタスクは java.lang.Runnable インターフェイスを実装したオブジェクトとし、ThreadPoolExecutor.execute(java.lang.Runnable command) で work queue につなげる。
スレッド・プーリングにはもっと色々な機能が存在しているが、ここでは省略する。
3. アトミック操作 (java.util.concurrent.atomic)
java.util.concurrent.atomic パッケージでは、Java でアトミック操作を行うライブラリを提供する。
アトミック操作は最も基本的でローレベルな排他制御命令。 本来は CPU に命令として実装されるもので、変数の読み込み、値の比較、書き込みなど命令のシーケンスが、途中で割り込まれずに実行されることを保証する。
そのため、ユーザープログラム側でアトミック操作を直接触ることは少ないと思われる。
java.util.concurrent.atomic パッケージでは以下の 3 種類のアトミック命令が使えるようになっている。
- swap (exchange)
- 「メモリ上の値 V を読み込み、その値を新しい値 U に置き換える」
という操作を不可分に行うことができる。
java.util.concurrent.atomicパッケージではgetAndSetメソッドという名前で提供される。
- fetch-and-add
- 「メモリ上の値 V を読み込み、その値に D を足し、V + D に置き換える」
という操作を不可分に行うことができる。
java.util.concurrent.atomicパッケージではaddAndSet、getAndAdd、getAndIncrement、getAndDecrement、incrementAndGet、decrementAndSetメソッドという名前で提供される。
- compare-and-swap
- 「メモリ上の値 V を読み込み、その値を値 C と比較する。
V と C が一致しなければ操作は失敗。
一致すれば新しい値 U をメモリ上に書き込む」
という一連の操作を不可分に行うことができる。
「失敗」があるのがこの処理の特徴。
java.util.concurrent.atomicパッケージではcompareAndSet、weakCompareAndSetメソッドという名前で提供される。
使い方を例をあげてみる。
class Sequencer1 {
private long sequenceNumber = 0;
public long next() {
return sequenceNumber++;
}}
class Sequencer2 {
private long sequenceNumber = 0;
private Object lock = new Object();
public long next() {
synchronized(lock){
return sequenceNumber++;
}}}
class Sequencer3 {
// Long 型のアトミック変数を用意。初期値は 0
private AtomicLong sequenceNumber = new AtomicLong(0);
public long next() {
return sequenceNumber.getAndIncrement();
}}
Sequencer1 Sequencer2 Sequencer3 はいずれも next を呼ぶ度に 0 → 1 → 2 → 3 と順番に値を取得できるプログラム。
しかし Sequencer1 はマルチスレッドに対応していないので、複数のスレッドから呼び出されると同じ値を 2 度返したり、飛ばしたりする問題がある。
Sequencer2 は synchronized を使うことによってこの問題を解決したが、この程度のプログラムでありながら複数のスレッドから同時に next が呼ばれた場合、ロックの所有者(オーナー)になれなかったスレッドは CPU 実行権を手放し待機状態に入ってしまう。next が頻繁に呼ばれる場合効率が悪い。
Sequencer3 は Concurrency Utilities を使ったもので AtomicLong クラスの中に long 型の変数が入っている。
getAndIncrement メソッドを呼ぶと「値に1を足し、元の値を返す」という操作不可分に行う。複数のスレッドが同時に next を呼び出し競合が発生したとしてもCPU とバスが調停してくれるので、後からきたスレッドはブロックされることもなく、前のスレッドが終わったらすぐにgetAndIncrement の操作を実行することができる。
JSR-166 Concurrency Utilities のアトミック操作できるデータは、AtomicLong のように java.util.concurrent.atomic 内で定義されているクラスをインスタンス化したもののだけである。
すでに作成したしまったインスタンスは Concurrency Utilities で排他制御することができない。
これは少し不満な部分である。
J2SE 1.4 から導入された Buffer(DirectBuffer) は、Java ヒープの外にあるメモリ領域を Java の中から扱えるようになっている。
Buffer 領域のメモリにたいしてアトミック操作を行えれば C/C++ で書かれたライブラリとの連係がやりやすいのだが現在はそうなっていないようだ。
残念無念。
4. ミューテックス、条件変数、リーダーライターロック (java.util.concurrent.locks)
java.util.concurrent.locks パッケージでは、ミューテックス、コンディションバリアブル(条件変数)、リーダーライターロックなどの同期オブジェクトが追加された。
インターフェイスとして Lock (ミューテックス)、Condition (条件変数)、ReadWriteLock (リーダーライターロック) が用意され、その実装クラスである ReentrantLock、ReentrantReadWriteLock などが提供される。
これまで Object クラスの notify、wait では不十分だった部分が一挙にパワーアップ。
Lock の使いかたは、当然以下のようになる。
Lock l = ...;
l.lock();
try {
// ここに排他制御したい内容を
} finally {
l.unlock();
}
面白いのは Conditionの扱い。
このメモで書いたように、条件変数はミューテックスと一緒に使う必要があるのだが、Condition は赤字の部分のように Lock から newCondition() メソッドを使って取り出されるので、生成された時点から関係付けられたミューテックスが存在することになる。
この例では実装クラスReentrantLock から newCondition() されたので、その実体は内部クラス ReentrantLock.ConditionObject になる。
class BoundedBuffer {
Lock lock = new ReentrantLock(); // ミューテックスの生成
final Condition cv = lock.newCondition(); // 条件変数の生成
public void condtion_wait() throws InterruptedException {
lock.lock();
try {
cv.wait();
} finally {
lock.unlock();
}
}
public void condition_signal() throws InterruptedException {
lock.lock();
try {
cv.signal();
} finally {
lock.unlock();
}
}
}
5. セマフォ、サイクリックバリア
java.util.concurrent パッケージ内では、同期のための便利なデータ構造が提供されている。
3. 4. の構造が同期プログラムのためのプリミティブだとしたら、5. はマルチスレッドプログラムの骨格を決めるデザイン・パターンのようなもの提供する。
Semaphore- いわゆるセマフォ(semaphore) を提供するデータ構造。
セマフォは内部にカウンターを持っていて最大何スレッドから、
自分を獲得できるかを設定できる。
そのため有限個数の共有資源を管理するのに
セマフォを使うことが多い。
セマフォを獲得したいスレッドはSemaphore.acquire()を呼びだす。 セマフォ内部のカウンタが 0 でない場合には スレッドは動作を許可される。同時にカウンタは 1 減る。 セマフォを使用し終わったらSemaphore.release()を呼び出す。 カウンタは 1 増え元に戻る。
内部のカウンタが 0 の場合には、 他のスレッドがreleaseを行うまでブロックされる。
カウンタ 1 のセマフォはようするにミューテックス(mutex)です。Semaphore sema = new Semaphore(5); int foo() { sema.acquire(); // この部分は同時に 5 スレッドしか動作できない。 sema.release(); }
FairSemaphore-
Semaphore同様のセマフォを機能を提供するが、 実行権限の獲得ルールがSemaphoreと異なり fair になっている。
セマフォが複数のスレッドが待機させているとき、 別のスレッドがreleaseを実行して 実行権限が戻ってきたとする。
FairSemaphoreは 現在 待機中のスレッドのうち 最初にaquireを実行したものに、 実行権限を渡す。
Semaphoreはこのような順序を保証しない。 それどころかreleaseで回収された実行権限を 待機中のスレッドに渡すということも保証しない。 そのためreleaseをしたスレッドが直後にaquireを実行すると、 そのまま実行権限を手に入ってしまうという ことがありえる。 このような場合、 スレッドスケジューリングのタイミングによっては いったんセマフォでブロックされたスレッドは プログラムが終わるまで待機し続けるという危険性がある。
CountDownLatch-
スレッドのランデブーポイントを作成するのに便利なクラス。
複数のワーキングスレッドの終了を待つような場合に便利なデータ構造で、 大雑把にいって下のようなはたらきをする。
class CountDownLatch { Object lock = new Object(); int count; CountDownLatch(int cnt){ this.count = cnt; } synchronized void countDown() { count-- if( count == 0 ){ lock.notifyAll(); } } void await() throws InterruptedException { lock.wait(); } } CyclicBarrier-
CyclicBarrierは バリアポイントが複数あるような マルチスレッドプログラムで有効な同期機構。
CyclicBarrierのコンストラクターは、CyclicBarrier(int parties, java.lang.Runnable barrierAction)となっている。partiesでバリアを行うワーカースレッドの数を決定する。parties数のワーカースレッドがCyclicBarrier.await()を呼び出せば ワーカースレッドのランデブーが完了する。 ランデブー完了後にbarrierActionが実行される。
class Controller { final int MAX_CPUS = ...; final CyclicBarrier barrier; public Controller() { barrier = new CyclicBarrier(MAX_CPUS, new Runnable() { public void run() { // すべてのワーカースレッドが await によって待機した後に、 // この Runnable が実行される。 }}); for (int i = 0; i < MAX_CPUS; ++i) new Thread(new Worker(i)).start(); waitUntilDone(); } // ワーカースレッド class Worker implements Runnable { int index; Worker(int i) { index = i; } public void run() { while (!done()) { // ここに1回の処理内容を書く try { barrier.await(); // ここでランデブー } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } } Exchanger<V>Exchanger<V>は 2つのスレッドがランデブーポイントで データを交換しあう同期機構。
下の例ではユーザー定義のData型のオブジェクトを 交換することになる。
Exchanger<Data> exchanger = new Exchanger(); // 2つのスレッドによって実行されるメソッド int foo() { // ここで処理を行い自分が生成したデータを myData にいれておく // ランデブーポイント。別のスレッドが来るまでここまで待機 Data otherThreadsData = exchanger.exchange(myData); }exchanger.exchange(myData)に自スレッドのデータを引数として渡すと、対応する相手スレッドのデータが戻り値として返ってくる。
その他のパッケージに対する変更
Concurrency Utilities の導入と共に、java.lang. 以下のパッケージも変更を受けたものがある。
- java.lang.System
-
java.lang.System にはシステムの持つナノ秒のクロックを返すメソッドが導入された。
時間測定に使える解像度が上がったようだ。
このタイマーは CPU のリアルタイムクロックを (定数で乗除算して) そのまま返すことを狙っているので、
class java.lang.System { public static long nanoTime()nanoTimeメソッドの返す値の絶対値には意味がない。 2つのnanoTimeの差だけが意味を持つ。
この点は、currentTimeMillisが 基準時刻(1970 年1月1日午前0時)との差になっているのとは対象的。
UNIX タイムのように基準時刻からの時間というような意味のある値にはなっていない。
- java.lang.Thread
-
java.lang.Threadには 以下のようなメソッドが追加されている。
public class Thread implements java.lang.Runnable { // スレッドのスタックトレースが取れる public java.lang.StackTraceElement[] getStackTrace() // すべてのスレッドのスタックトレース public static MapgetAllStackTraces() public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) public static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) public static Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() // システム依存のスレッドの ID public long getId() } getStackTrace、getAllStackTracesメソッドが追加され、 ユーザープログラム内から普通にスタックトレースが取れるようになった。 無論、このメソッドを呼び出すためにはセキュリティーマネージャーの許可が必要。
getUncaughtExceptionHandler、setUncaughtExceptionHandlerメソッドは キャッチされなかった例外を最終的にキャッチする網を、 Java スレッド毎に登録することのできるメソッド。getDefaultUncaughtExceptionHandler、setDefaultUncaughtExceptionHandlerによって 全 Java スレッドの未捕捉例外を捕捉する網を張ることも可能。 J2SE 1.4 までは、 キャッチされない例外は Java スレッドのスレッドグループ(ThreadGroup) に伝達されていた。 未捕捉例外を強制的にキャッチしたければThreadGroupのuncaughtExceptionメソッドを オーバーライドする必要があったが、 J2SE 1.5 のハンドラ登録の方が簡単になった。
- システムに依存するスレッド ID を返す
getIdメソッドが追加された。 初期の JavaVM は green thread 方式を取り JavaVM 内で閉じたユーザースレッドを提供していたが、 現行の JavaVM のほとんどは Java スレッドを OS/システムのスレッドと 一対一に割り当てる native thread 方式を取っている。getIdメソッドは OS/システムのスレッドの識別子を返す ローレベル API である。
- java.lang.ThreadLocal
-
Java スレッド毎に固有のデータを保存する code>ThreadLocal クラスは、汎型(generic type) の導入によって
ThreadLocal<T>に変わっている。
これまでのThreadLocalはObject型のみを入れられていたが、generic によって自分の好きなオブジェクトの型が入れられるようになった。
また、一旦登録したオブジェクトを削除するremoveメソッドが追加されている。
public class ThreadLocal<T> { T get() protected T initialValue() void remove() void set(T value) }
まとめ
JSR-166 Concurrency Utilities では、ハードウェアのローレベルな機能を呼び出す API がバシバシ追加された。 ユーザーが永らく望んでいた機能だが、ちょっと恐いのは実装の自由度の問題。
追加された機能はあくまでも JDK のクラスライブラリレベルのものなので、実装者がどう実装するかはまったくの自由。
商用 JavaVM メーカーはランタイム、JIT コンパイラ、クラスライブラリの協調によって、java.util.concurrent. 以下のクラスで CPU や OS のネイティブな機能を使えるように最適化するはず。
だが手を抜いた実装もあると思われる。
java.util.concurrent.atomic は Java モニタを利用してベタに実装しても構わないし、System.nanoTime() も System.currentTimeMillis() を 単純に 1,000 倍した値を返しても構わない。
手を抜かなくても、どの方向に最適化するかという性能を追求する方向の違いもある。
同期プリミティブはライブラリの仕様が等しくても、タイミングやポリシーの差によって動作・挙動がまったく変わってくることが結構ある(例えば、ロックが fair なのか unfair なのかなど)。
Concurrency Utilities を使うユーザーは性能を追求した並列プログラムを書きたいわけだが、それが JavaVM のメーカーやバージョンの違いによって大幅に動作が変わってくるとしたら堪らない。 最終的にはプログラムの先頭で JavaVM の情報を得て、Sun JVM の場合にはこっちのメソッドを IBM JVM の場合にはあっちのメソッドを呼ぶなんてことになりそうな気がする。
もっとも、Tiger が出ても、 Concurrency Utilities は 簡単には普及しないかもしれないな〜〜〜
関連ページ
- JSR-000166 Concurrency Utilities Specification 1.0 Public Review Draft
仕様はここ から
ダウンロード可能。
- JSR-166 の提唱者のページ
(PDF 資料)
コメント
トラックバック
[Trackback URL: http://www.nminoru.jp/cgi-bin/tb.cgi/java__j2se15_concurrent]
[りっくでぃあすの日記] [Java]Java5.0の新機能。 2007-06-17 (Sun) 03:20:56 中村実氏のブログより。 Tiger (Java2 SE 1.5) で追加された並列プログラミング機能 今更、気づきました。。。 確かに 1.5 になったら java.util.concurrent というパッケージが増えてた。 java.util.concurrent (Java 2 Platform SE 5.0) java.util.concurrent.atomic (Jav
コメントを書き込む
仕様はここ から ダウンロード可能。