モジュール java.base
パッケージ java.util.concurrent

クラスThreadPoolExecutor

java.lang.Object
java.util.concurrent.AbstractExecutorService
java.util.concurrent.ThreadPoolExecutor
すべての実装されたインタフェース:
AutoCloseable, Executor, ExecutorService
直系の既知のサブクラス:
ScheduledThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService
プールされた複数のスレッドの1つを使用して送信された各タスクを実行するExecutorServiceです。通常はExecutorsファクトリ・メソッドを使用して構成されます。

スレッド・プールでは、2つの問題に対処します。まず、タスク当たりの呼出しオーバーヘッドが減少するため、通常は大量の非同期タスクの実行時にパフォーマンスが向上します。また、タスクのコレクションを実行するときに消費されるリソース(スレッドを含む)の境界設定および管理のための方法を提供します。 ThreadPoolExecutorも基本的な統計情報(完了したタスクの数など)を保持します。

幅広いコンテキストで有用であるために、このクラスでは多くの調整可能なパラメータや拡張性フックを提供します。 ただしプログラマは、より便利なExecutorsファクトリ・メソッドであるExecutors.newCachedThreadPool() (自動スレッド再生のあるアンバウンド形式のスレッド・プール)、Executors.newFixedThreadPool(int) (固定サイズのスレッド・プール)およびExecutors.newSingleThreadExecutor() (単一バックグラウンドのスレッド)を使用してください。これらのファクトリ・メソッドは、もっとも一般的な使用を想定した設定を事前構成します。 そうでない場合、このクラスを手動で設定し調整するときは次のガイドに従ってください。

コアおよび最大プール・サイズ
ThreadPoolExecutorは、corePoolSize (getCorePoolSize()を参照)とmaximumPoolSize (getMaximumPoolSize()を参照)によって設定された境界に従って、プール・サイズ(getPoolSize()を参照)を自動的に調整します。 メソッドexecute(Runnable)で新しいタスクが送信されると、実行中のスレッドがcorePoolSizeより少ない場合は、他のワーカー・スレッドがアイドル状態であっても、リクエストを処理する新しいスレッドが作成されます。 それ以外の場合、maximumPoolSizeより小さいスレッドが実行されている場合は、キューが満杯になった場合にのみリクエストを処理するための新しいスレッドが作成されます。 corePoolSizeとmaximumPoolSizeを同じ値に設定すると、固定サイズのスレッド・プールが作成されます。 maximumPoolSizeをInteger.MAX_VALUEなどの実質的にアンバウンド形式である値に設定すると、プールに任意の数の並行タスクを格納することができます。 コア・プール・サイズと最大プール・サイズは構築時にのみ設定されるのがもっとも一般的ですが、setCorePoolSize(int)およびsetMaximumPoolSize(int)を使用して動的に変更することもできます。
オン・デマンド構築
デフォルトでは、新しいタスクの到着時にのみ偶数のコア・スレッドがはじめて作成されて起動されますが、これはprestartCoreThread()またはprestartAllCoreThreads()メソッドを使用して動的にオーバーライドできます。 空でないキューでプールを構築する場合は、スレッドを事前に起動することもできます。
新しいスレッドの作成
新しいスレッドは、ThreadFactoryを使用して作成されます。 特に指定されていない場合は、Executors.defaultThreadFactory()が使用されます。この場合に作成されるスレッドは、すべて同じThreadGroup内にあり、同じNORM_PRIORITY優先順位と非デーモン・ステータスを持ちます。 異なるThreadFactoryを指定すると、スレッドの名前、スレッド・グループ、優先順位、デーモン・ステータスなどを変更できます。要求時にnewThreadからnullが返されてThreadFactoryがスレッドの作成に失敗すると、executorは続行しますが、タスクをまったく実行できない可能性があります。 スレッドは、「modifyThread」のRuntimePermissionを所有するようにしてください。 プールを使用しているワーカー・スレッドやその他のスレッドがこのアクセス権を所有していないと、サービスが低下する場合があります。構成の変更がタイムリに有効にならなかったり、プールのシャットダウンが、終了は可能だが完了していない状態のままになったりする場合があります。
キープアライブ時間
現在プールにcorePoolSizeの数よりも多くのスレッドがある場合、超過した分のスレッドは、アイドル状態になっている期間がkeepAliveTimeを超えると終了します(getKeepAliveTime(TimeUnit)を参照)。 これにより、プールがアクティブに使用されていないときのリソースの消費量を減らすことができます。 プールがあとでアクティブになると、新しいスレッドが構築されます。 また、このパラメータは、setKeepAliveTime(long, TimeUnit)メソッドを使用して動的に変更することもできます。 Long.MAX_VALUE TimeUnit.NANOSECONDSの値を使用すると、アイドル・スレッドがシャットダウン前に終了することも実質的に無効になります。 デフォルトでは、キープ・アライブ・ポリシーはcorePoolSizeを超えるスレッドがある場合にのみ適用されますが、メソッドallowCoreThreadTimeOut(boolean)を使用して、keepAliveTime値がゼロでないかぎり、このタイムアウト・ポリシーをコア・スレッドにも適用できます。
キューイング
送信されたタスクを転送したり保持したりするために、任意のBlockingQueueを使用できます。 このキューの使用は、プールのサイズ設定と相互に作用します。
  • corePoolSizeよりも少ない数のスレッドが実行されている場合、executorはキューイングよりも新しいスレッドの追加を常に優先します。
  • corePoolSize以上の数のスレッドが実行されている場合、executorは新しいスレッドの追加よりも要求のキューイングを常に優先します。
  • 要求をキューに入れることができない場合、新しいスレッドを作成することによりmaximumPoolSizeを超えない場合は新しいスレッドが作成され、超える場合はタスクが拒否されます。
キューイングの一般的な方式には次の3種類があります。
  1. 直接ハンドオフ。 ワーク・キューに適したデフォルトの選択肢は、タスクを保持しないでスレッドにハンドオフするSynchronousQueueです。これは、本来はタスクを保持します。 この場合、タスクを実行するためにすぐに利用できるスレッドがないと、タスクをキューに入れようとしたときに失敗し、新しいスレッドが構築されます。 このポリシーにより、内部的な依存関係を持つ可能性がある要求セットの処理時にロック・アップが回避されます。 一般に直接ハンドオフでは、送信された新しいタスクが拒否されるのを回避するために、アンバウンド形式のmaximumPoolSizesが必要です。 これにより、平均して処理能力を超える速さでコマンドが次々に到着すると、アンバウンド形式のスレッドが大きくなる可能性があります。
  2. アンバウンド形式のキュー。 アンバウンド形式のキュー(事前定義された容量を持たないLinkedBlockingQueueなど)を使用すると、corePoolSizeのすべてのスレッドがビジー状態である場合に、新しいタスクはキュー内で待機します。 これにより、corePoolSizeを超えるスレッドは作成されなくなります。 つまりmaximumPoolSizeの値は効果がなくなります。 各タスクが完全に独立しているため、タスクが相互の実行に影響しない場合はこの方式が適切であることがあります(Webページ・サーバーの場合など)。 この方式のキューイングは、一時的に急増した要求を処理する場合などは便利ですが、平均して処理能力を超える速さでコマンドが次々に到着すると、アンバウンド形式のワーク・キューが大きくなる可能性があります。
  3. バウンド形式のキュー。 バウンド形式のキュー(ArrayBlockingQueueなど)は、限定されたmaximumPoolSizesで使用するとリソース不足を回避できますが、調整と制御が難しくなる可能性があります。 キュー・サイズと最大プール・サイズは互いにトレードオフの関係になることがあります。大きなキューと小さなプールを使用すると、CPU使用率、OSリソース量、およびコンテキスト・スイッチングのオーバーヘッドは最小化されますが、スループットは大幅に低下する可能性があります。 タスクが頻繁にブロックする場合は(入出力が制限される場合など)、許可しているスレッド数よりも多くのスレッドに対して、システムが時間をスケジュールすることができます。 小さいキューを使用すると、一般に必要なプール・サイズは大きくなります。これにより、CPUがよりビジーになりますが、過大なスケジュール設定のオーバーヘッドが発生することがあり、スループットも低下します。
拒否されたタスク
Executorがシャットダウンしている場合、またはExecutorが最大スレッド数とワーク・キュー容量の両方で有限の境界を使用し、かつ飽和状態である場合、execute(Runnable)メソッドで送信された新しいタスクは拒否されます いずれの場合も、executeメソッドは、そのRejectedExecutionHandlerRejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)メソッドを呼び出します。 事前定義された4つのハンドラ・ポリシーが用意されています。
  1. デフォルトのThreadPoolExecutor.AbortPolicyでは、拒否されると、ハンドラは実行時のRejectedExecutionExceptionをスローします。
  2. ThreadPoolExecutor.CallerRunsPolicyでは、execute自体を呼び出すスレッドがタスクを実行します。 これにより、単純なフィード・バック制御メカニズムが提供され、結果として新しいタスクの送信レートが低下します。
  3. ThreadPoolExecutor.DiscardPolicyでは、実行できないタスクが単に削除されます。 このポリシーは、タスクの完了が決して関連しないまれなケースに対してのみ設計されています。
  4. ThreadPoolExecutor.DiscardOldestPolicyでは、executorがシャットダウンしていない場合は、ワーク・キューの先頭にあるタスクが削除されたあと、実行が再試行されます(もう一度失敗する可能性があり、その場合はこの処理が繰り返される)。 このポリシーは、ほとんど受け入れられません。 ほとんどの場合、ThreadPoolExecutor.DiscardOldestPolicyドキュメントに示すように、完了を待機しているコンポーネントで例外が発生するタスクを取り消すか、または失敗を記録する必要があります。
ほかの種類のRejectedExecutionHandlerクラスを定義して使用することができます。 その場合、特定の容量またはキューイング・ポリシーでのみ動作するようにポリシーが設計されているときは、特に注意が必要です。
フック・メソッド
このクラスは、protectedのオーバーライド可能なbeforeExecute(Thread, Runnable)メソッドとafterExecute(Runnable, Throwable)メソッドを提供します。これらのメソッドは、各タスクを実行する前後に呼び出されます。 これらのメソッドは、実行環境を操作するために使用できます(ThreadLocalの再初期化、統計情報の収集、ログ・エントリの追加など)。 さらに、Executorが完全に終了した後で実行する必要のある特殊な処理をすべて実行するように、terminated()メソッドをオーバーライドできます。

フック、コールバックまたはBlockingQueueメソッドが例外をスローすると、内部ワーカー・スレッドは失敗し、突然終了し、置換される可能性があります。

キューの保守
getQueue()メソッドを使用すると、監視やデバッグの目的でワーク・キューにアクセスできます。 その他の目的でこのメソッドを使用しないことが強く推奨されています。 提供される2つのメソッドremove(Runnable)purge()は、キューに入れられた多数のタスクが取り消されるときに、ストレージの再生を支援するために使用できます。
お薦め
プログラムで参照されなくなったプールで、残りのスレッドがない場合、明示的にシャットダウンしなくても、メモリーの再利用が可能です。 0のコア・スレッドの下限を設定し、またはallowCoreThreadTimeOut(boolean)を設定して、未使用のすべてのスレッドが最終的に停止できるようにプールを構成できます。

拡張機能の例 このクラスのほとんどの拡張機能は、1つ以上のprotectedフック・メソッドをオーバーライドします。 たとえば、次の例は、単純な一時停止および再開の機能を追加するサブクラスです。

 
 class PausableThreadPoolExecutor extends ThreadPoolExecutor {
   private boolean isPaused;
   private ReentrantLock pauseLock = new ReentrantLock();
   private Condition unpaused = pauseLock.newCondition();

   public PausableThreadPoolExecutor(...) { super(...); }

   protected void beforeExecute(Thread t, Runnable r) {
     super.beforeExecute(t, r);
     pauseLock.lock();
     try {
       while (isPaused) unpaused.await();
     } catch (InterruptedException ie) {
       t.interrupt();
     } finally {
       pauseLock.unlock();
     }
   }

   public void pause() {
     pauseLock.lock();
     try {
       isPaused = true;
     } finally {
       pauseLock.unlock();
     }
   }

   public void resume() {
     pauseLock.lock();
     try {
       isPaused = false;
       unpaused.signalAll();
     } finally {
       pauseLock.unlock();
     }
   }
 }

導入されたバージョン:
1.5