パッケージjava.util.stream
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
ここでは、Collection<Widget>
であるwidgets
をストリームのソースとして使用した後、そのストリームに対してフィルタ-マップ-リデュースを実行し、赤いウィジェットの重量の合計を取得しています。 (合計はリダクション操作の一例です。)
このパッケージで導入された主な抽象は、ストリームです。 クラスStream
、IntStream
、LongStream
およびDoubleStream
は、オブジェクトおよびプリミティブ型int
、long
,およびdouble
に対するストリームです。 ストリームはいくつかの点でコレクションと異なります。
- ストレージなし。 ストリームは要素を格納するデータ構造ではありません。代わりに、なんらかのソース(データ構造、配列、ジェネレータ関数、入出力チャネルなど)に含まれる要素を計算操作のパイプラインを介して運搬します。
- 本質的に関数型。 ストリームに対して操作を行うと結果が生成されますが、そのソースは変更されません。 たとえば、コレクションから得られた
Stream
をフィルタリングすると、ソースであるコレクションの要素が削除されるのではなく、フィルタリングされた要素を含まない新しいStream
が生成されます。 - 遅延指向。 フィルタリング、マッピング、重複削除など、多くのストリーム操作は遅延的に実装可能なため、最適化の余地があります。 たとえば、連続する3つの母音を含んだ最初の
String
を探す場合、すべての入力文字列を検査する必要はありません。 ストリーム操作は中間操作(Stream
を生成する操作)と終端操作(値または副作用を生成する操作)に分けられます。 中間操作は常に遅延的です。 - 無限である可能性。 コレクションのサイズは有限ですが、ストリームのサイズは有限である必要はありません。
limit(n)
やfindFirst()
などの短絡操作を使えば、無限ストリームに対する計算を有限の時間で終わらせることができます。 - 消費可能。 ストリームの存続期間中、ストリームの要素が使用されるのは一度だけです。
Iterator
と同様、ソースの同じ要素を再度使用するには、新しいストリームを生成する必要があります。
Collection
から(stream()
およびparallelStream()
メソッド経由)。- 配列から(
Arrays.stream(Object[])
経由)。 - ストリーム・クラスのstaticファクトリ・メソッド(
Stream.of(Object[])
、IntStream.range(int, int)
、Stream.iterate(Object, UnaryOperator)
など)から。 BufferedReader.lines()
からファイルの行を取得できます。Files
のメソッドからファイル・パスのストリームを取得できます。Random.ints()
から乱数のストリームを取得できます。- JDKに含まれる他のさまざまなストリーム生成メソッド(
BitSet.stream()
、Pattern.splitAsStream(java.lang.CharSequence)
、JarFile.stream()
など)。
サードパーティー・ライブラリは、これらのテクニックを使って追加のストリーム・ソースを提供できます。
ストリーム操作とパイプライン
ストリーム操作は中間操作と終端操作に分かれますが、これらを組み合わせてストリーム・パイプラインが形成されます。 ストリーム・パイプラインは、1つのソース(Collection
、配列、ジェネレータ関数、入出力チャネルなど)、それに続く0個以上の中間操作(Stream.filter
、Stream.map
など)および1つの終端操作(Stream.forEach
、Stream.reduce
など)から構成されます。
中間操作は新しいストリームを返します。 これらの操作は常に遅延されます。filter()
などの中間操作を実行しても、実際のフィルタリングは一切実行されず、代わりに、トラバース時に最初のストリームの要素のうち指定された述語に一致するものが格納される新しいストリームが作成されます。 パイプラインの終端操作が実行されるまで、パイプライン・ソースのトラバーサルは開始されません。
Stream.forEach
やIntStream.sum
などの終端操作は、ストリームをトラバースして結果や副作用を生成できます。 終端操作の実行が完了するとそのストリーム・パイプラインは消費済とみなされ、以降使用できなくなります。同じデータ・ソースを再度トラバースする必要が生じた場合は、データ・ソースに戻って新しいストリームを取得する必要があります。 終端操作はほとんどすべての場合に積極的であり、データ・ソースのトラバーサルやパイプラインの処理を完了させた後でリターンします。 そうでない終端操作はiterator()
とspliterator()
だけです。これらは、既存の操作では不十分でタスクを実行できない場合に任意のクライアント制御パイプライン・トラバーサルを行えるようにするための「エスケープ・ハッチ」として提供されています。
ストリームの遅延処理は効率性を大幅に向上させます。上記のフィルタ-マップ-合計の例のようなパイプラインでは、フィルタリング、マッピングおよび合計をデータに対する単一パスに融合することができ、中間状態も最小限に抑えられます。 遅延処理では、全データの検査が不要であればそれを回避することも可能となります。「1000文字を超える長さの最初の文字列を見つける」といった操作では、必要な特性を備えた文字列を見つけるのに十分な数の文字列を検査するだけで済み、ソースから取得可能なすべての文字列を検査する必要もなくなります。 (この動作は、入力ストリームがただ大きいだけでなく無限である場合にさらに重要となります。)
中間操作はさらにステートレス操作とステートフル操作に分けられます。 ステートレス操作(filter
やmap
など)は、新しい要素を処理する際に、以前に参照した要素の状態を保持しません。各要素の処理は、他の要素の操作とは無関係に実行できます。 ステートフル操作(distinct
やsorted
など)は、新しい要素を処理する際に、以前に参照した要素の状態を組み込む可能性があります。
ステートフル操作は、入力の全体を処理しないと結果を生成できない可能性があります。 たとえばストリームをソートする場合、ストリームのすべての要素が処理されるまで、結果を生成することができません。 このため、並列計算下では、ステートフルな中間操作を含む一部のパイプラインで、データに対するパスが複数必要になったり、大量のデータをバッファーに格納する必要が生じたりする可能性があります。 ステートレスな中間操作のみを含むパイプラインは、順次、並列のいずれであっても、最小限のデータ・バッファリングで単一パスで処理できます。
さらに、一部の操作は短絡操作とみなされます。 中間操作が無限の入力が与えられたときに有限のストリームを結果として生成する可能性がある場合、その操作は短絡操作になります。 終端操作が無限の入力が与えられたときに有限の時間で終了する可能性がある場合、その操作は短絡操作になります。 短絡操作をパイプラインに含めることは、無限ストリームの処理が有限時間で正常終了するための必要条件ではありますが、十分条件ではありません。
並列性
明示的なfor-
ループで要素を処理する操作は、本質的に順次的です。 ストリームでは並列実行をしやすいように、計算が、個々の要素ごとの命令型操作としてではなく、集約操作のパイプラインとして再構成されます。 すべてのストリーム操作は順次、並列のどちらでも実行できます。 JDKのストリーム実装は、並列性が明示的に要求されない限り、順次ストリームを作成します。 たとえば、Collection
に含まれるメソッドCollection.stream()
、Collection.parallelStream()
はそれぞれ順次ストリーム、並列ストリームを生成します。IntStream.range(int, int)
などのその他のストリーム生成メソッドでは順次ストリームが生成されますが、それらのストリームは、BaseStream.parallel()
メソッドを呼び出すことで効率的に並列化することができます。 前述の「ウィジェットの重さの合計」クエリーを並列で実行するには、次のようにします。
int sumOfWeights = widgets.parallelStream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
この例の順次版と並列版の違いは、初期ストリームの作成時に「stream()
」ではなく「parallelStream()
」を使用する点だけです。 ストリーム・パイプラインは、端末操作が呼び出されるストリームのモードに応じて、順次または並列に実行されます。 ストリームのシーケンシャルまたはパラレル・モードはBaseStream.isParallel()
メソッドで判断でき、ストリーム・モードはBaseStream.sequential()
およびBaseStream.parallel()
操作で変更できます。 最新のシーケンシャル・モードまたはパラレル・モードの設定は、ストリーム・パイプライン全体の実行に適用されます。
findAny()
のような明示的に非決定論的として識別されている操作を除き、ストリームが順次、並列のどちらで実行されるかによって計算結果が変わるべきではありません。
ほとんどのストリーム操作はユーザー指定の動作を記述するパラメータを受け取りますが、このパラメータは通常、ラムダ式になります。 正しい動作を維持するには、これらの動作パラメータが非干渉的であり、かつほとんどの場合ステートレスでなければいけません。 そのようなパラメータは常に関数型インタフェース(Function
など)のインスタンスであり、通常はラムダ式やメソッド参照になります。
非干渉
ストリームでは、ArrayList
などのスレッドセーフでないコレクションも含め、さまざまなデータ・ソースに対して並列の可能性がある集約操作を実行できます。 これが可能になるのは、ストリーム・パイプラインの実行時にデータ・ソースへの干渉を防げる場合だけです。 エスケープハッチ操作のiterator()
とspliterator()
を除き、実行は終端操作が呼び出された時点で始まり、終端操作が完了した時点で終わります。 ほとんどのデータ・ソースの場合、干渉を防ぐとは、ストリーム・パイプラインの実行中にデータ・ソースが一切変更されないことを意味します。 これに対する顕著な例外は、同時変更を扱えるように特別に設計された並行コレクションをソースに持つストリームです。 並行ストリーム・ソースは、Spliterator
からCONCURRENT
特性が報告されるソースです。
したがって、ソースが並行的でない可能性のあるストリーム・パイプラインに含まれる動作パラメータは、ストリームのデータ・ソースを決して変更すべきではありません。 動作パラメータが非並行データ・ソースに干渉すると言われるのは、動作パラメータがストリームのデータ・ソースを変更するか、そのような変更の引き金となる場合です。 非干渉の必要性は、並行パイプラインだけでなく、すべてのパイプラインに当てはまります。 ストリーム・ソースが並行的でない限り、ストリーム・パイプラインの実行中にストリームのデータ・ソースを変更すると、例外、不正な回答、または不適切な動作が発生する可能性があります。 適切に動作するストリーム・ソースの場合、終端操作が開始される前であればソースを変更でき、そうした変更は対象となる要素に反映されます。 たとえば、次のコードを考えてみましょう。
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));
最初に、2つの文字列で構成されるリストを作成: "one"および"two"。 次に、そのリストからストリームが作成されます。 次に、リストを変更するために、3つ目の文字列threeが追加されます。 最後に、ストリームの要素が集められて連結されます。 リストが変更されたのは終端操作collect
が開始される前だったので、結果は文字列one two threeになります。 JDKコレクションや他の大部分のJDKクラスから返されるストリームはすべて、このように適切に動作します。その他のライブラリで生成されるストリームについては、「低レベルのストリーム構築」を参照し、適切に動作するストリームを構築するための要件を確認してください。
ステートレス動作
ストリーム・パイプラインの結果が非決定論的または不正となる可能性があるのは、ストリーム操作の動作パラメータがステートフルの場合です。 ステートフルなラムダ(または対応する関数型インタフェースを実装したその他のオブジェクト)とは、ストリーム・パイプラインの実行中に変化する可能性のある状態に結果が依存するようなラムダのことです。 次のmap()
に対するパラメータが、ステートフル・ラムダの一例です。
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
ここで、マッピング操作が並列実行されると、スレッドのスケジューリングの違いにより、同じ入力に対する結果が実行のたびに変わる可能性があります。一方、ステートレスなラムダ式では、結果は常に同じになります。
また、動作パラメータから可変状態へのアクセスを試みることは、安全性やパフォーマンスの点から良くない選択肢と言えます。その状態へのアクセスの同期を取らなかった場合、データ競合が発生するためにコードが中断してしまいますが、その状態へのアクセスの同期を取った場合、得ようとしている並列性のメリットが競合によって薄れてしまう危険性があります。 最良のアプローチは、ストリーム操作のステートフル動作パラメータを一切使用しないことです。通常は、ストリーム・パイプラインを再構成してステートフルになるのを避ける方法があります。
副作用
ストリーム操作の動作パラメータでの副作用は一般にお薦めできません。そのような副作用はしばしば、ステートレス要件への無意識の違反や、スレッドの安全性を脅かすその他の危険につながる可能性があるからです。行動パラメータに副作用がある場合は、明示的に記載されていない限り、以下についての保証はありません:
- それらの副作用の他のスレッドへの可視性。
- 同じストリーム・パイプライン内の"同じ"要素の異なる操作は同じスレッドで実行されること。そして
- ストリームの実装はストリーム・パイプラインから演算の結果に影響を与えないことを証明できた演算(もしくは全ステージ)を自由に削除できるので、その動作パラメータが常に呼び出されること。
副作用の順序に驚くかもしれません。 ストリーム・ソースの検出順序と矛盾しない結果を生成するようにパイプラインが制約されている場合でも(たとえば、IntStream.range(0,5).parallel().map(x -> x*2).toArray()
は[0, 2, 4, 6, 8]
を生成する必要がある)、マッパー関数が個々の要素に適用される順序や、任意の動作パラメータがどのスレッド内で特定の要素に対して実行されるか、に関する保証は一切ありません。
副作用の逃避も意外なことかもしれません。 ターミナル操作forEach
とforEachOrdered
を除いて、ストリームの実装が計算結果に影響を与えずにパラメータの実行を最適化できるとき、パラメータの副作用は必ずしも実行されないかもしれません。 (特定の例については、count
操作で説明されているAPIノートを参照してください。)
副作用を使用するようにテンプレート設定されている多くの計算は、可変累計のかわりにreductionを使用するなど、副作用なしでより安全かつ効率的に表現できます。 ただし、println()
をデバッグ目的で使用するなどの副作用は、通常無害です。 副作用を介さないと動作できないストリーム操作は、forEach()
やpeek()
など、ごく少数です。これらは注意して使用すべきです。
副作用を不適切に使用しているストリーム・パイプラインを、副作用を使用しないパイプラインに変換する方法の一例として、指定された正規表現に一致する文字列を文字列ストリーム内で検索し、一致した文字列をリスト内に格納するコードを、次に示します。
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
このコードでは副作用が不必要に使用されています。 並列実行時には、ArrayList
がスレッドセーフでないために不正な結果が生成されますし、必要な同期を追加すれば競合が発生し、並列性のメリットが薄れます。 さらに言えば、ここで副作用を使用する必要はまったくありません。forEach()
は単純に、より効率的で安全な、並列化により適したリダクション操作で置き換えることができます。
List<String> results =
stream.filter(s -> pattern.matcher(s).matches())
.toList(); // No side-effects!
順序付け
ストリームは、定義された検出順序を持つことも持たないこともあります。 ストリームが検索順序を持つかどうかは、そのソースと中間操作によって決まります。 ストリーム・ソースの中には、本質的に順序付けされたもの(List
や配列など)もあれば、そうでないもの(HashSet
など)もあります。 中間操作の中には、sorted()
のように、もともと順序付けされていないストリームに検出順序を課すものもあれば、BaseStream.unordered()
のように、順序付けされたストリームを順序付けなしに変更するものもあります。 さらに終端操作の中には、forEach()
のように検出順序を無視するものもあります。
順序付けされたストリームでは、ほとんどの操作は要素を検出順に処理するよう制約されます。ストリームのソースが[1, 2, 3]
を含むList
である場合、map(x -> x*2)
の実行結果は[2, 4, 6]
でなければいけません。 一方、ソースに検出順序が定義されていない場合は、値[2, 4, 6]
の要素を任意に入れ替えたものも、有効な結果になります。
順次ストリームの場合、検出順序の有無はパフォーマンスには影響せず、決定論にのみ影響します。 ストリームが順序付けされている場合は、同一ソースで同一のストリーム・パイプラインを繰り返し実行しても同一の結果が生成されます。順序付けされていない場合は、繰り返し実行すると異なる結果が生成される可能性があります。
並列ストリームの場合、順序付けの制約を緩和すると、実行の効率性を改善できることがあります。 重複フィルタリング(distinct()
)やグループ化リダクション(Collectors.groupingBy()
)といった特定の集約操作は、要素の順序付けを無視できれば、より効率的に実装できます。 同様に、limit()
のような、検出順序と本質的な関連性を持つ操作では、適切な順序付けのためにバッファリングの必要性が生じ、並列性のメリットが薄れてしまう可能性があります。 ストリームに検出順序が含まれているが、ユーザーはその検出順序のことは特に気にしていない、という場合、unordered()
でストリームの順序付けを明示的に解除することで、一部のステートフル操作や終端操作で並列時のパフォーマンスが改善される可能性があります。 ただし、前述の「ブロックの重さの合計」例など、大部分のストリーム・パイプラインは、順序付け制約の下でも効率的に並列化されます。
リダクション操作
リダクション操作(畳み込みとも呼ばれる)は、一連の入力要素を受け取り、結合操作を繰り返し適用することでそれらを結合し、単一のサマリー結果を出力します(一連の数値の合計または最大値の検索や、リストへの要素の蓄積など) ストリーム・クラスには、reduce()
、collect()
と呼ばれる複数の形式の汎用リダクション操作と、sum()
、max()
、count()
など、複数の特殊化されたリダクション形式が含まれています。
もちろん、そのような操作は、次のように単純な順次処理ループとして簡単に実装できます。
int sum = 0;
for (int x : numbers) {
sum += x;
}
ただし、上のような推移的蓄積よりもリデュース操作を好む理由があります。 リダクションは抽象度がより高い(個々の要素ではなくストリーム全体に作用する)だけでなく、適切に構築されたリデュース操作は本質的に並列化可能となります(ただし、要素の処理に使用される関数が結合的かつステートレスである必要がある)。 たとえば、与えられた数値ストリームについて、合計を計算する場合には、次のように記述できます。
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
または
int sum = numbers.stream().reduce(0, Integer::sum);
これらのリダクション操作は、ほとんど何も変更しなくても、安全に並列実行できます。
int sum = numbers.parallelStream().reduce(0, Integer::sum);
リダクションの並列化が良好に行えるのは、実装が、データのサブセットを並列処理した後、中間結果を結合して最終的な正しい答えを得ることができるからです。 (言語に"パラレルfor-each"構成がある場合でも、可変累積アプローチでは、開発者が共有累積変数sum
にスレッド・セーフな更新を提供し、必要な同期によって、パラレル化によるパフォーマンスの向上が排除されます。) 代わりにreduce()
を使用すれば、リダクション操作の並列化に関するあらゆる重荷が消失し、ライブラリは同期を追加しなくても効率的な並列実装を提供できます。
前述の"ウィジェット"の例は、削減を他の操作と組み合せてfor-loopsを一括操作に置き換える方法を示しています。 widgets
が、getWeight
メソッドを含むWidget
オブジェクトのコレクションである場合、もっとも重いウィジェットを見つけるには次のようにします。
OptionalInt heaviest = widgets.parallelStream()
.mapToInt(Widget::getWeight)
.max();
より汎用的な形式の、<T>
型の要素に作用して<U>
型の結果を生成するreduce
操作では、次の3つのパラメータが必要になります。
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
ここで、identity要素は、リダクションの初期シード値であるとともに、入力要素が存在しない場合のデフォルト結果でもあります。 accumulator関数は、部分的な結果と次の要素を受け取り、新しい部分的な結果を生成します。 combiner関数は、2つの部分的な結果を結合し、新しい部分的な結果を生成します。 (コンバイナは並列リダクションで必要になります。並列リダクションでは入力が分割され、パーティションごとに部分的な蓄積値が計算された後、部分的な結果が結合されて最終結果が生成されます。)
より正式には、identity
値はコンバイナ関数の単位元でなければいけません。 つまり、すべてのu
について、combiner.apply(identity, u)
がu
に等しくなります。 さらに、combiner
関数は結合的である必要があり、かつaccumulator
関数と互換性がある必要があります。すべてのu
とt
について、combiner.apply(u, accumulator.apply(identity, t))
とaccumulator.apply(u, t)
がequals()
で等しくなる必要があります。
3引数形式は2引数形式を一般化したものであり、マッピング・ステップが蓄積ステップに組み込まれています。 単純な重量合計の例を、より汎用的な形式を使って次のように書き直すことができます。
int sumOfWeights = widgets.stream()
.reduce(0,
(sum, b) -> sum + b.getWeight(),
Integer::sum);
ただし、明示的なマップ-リデュース形式のほうが可読性が高いので、通常はそちらを使用することをお薦めします。 一般化された形式は、マッピングとリダクションを単一の関数にまとめることで、かなりの作業量を省いて最適化できる場合のために用意されています。
可変リダクション
可変リダクション操作は、ストリーム内の要素を処理する際に、可変結果コンテナ(Collection
やStringBuilder
など)に入力要素を蓄積します。
文字列のストリームを受け取り、文字列を連結して単一の長い文字列にする必要がある場合、通常のリダクションを使ってこれを実現できます。
String concatenated = strings.reduce("", String::concat)
これで必要な結果が得られますし、並列でも動作します。 ただし、パフォーマンスに満足できない可能性があります。 そのような実装は大量の文字列コピーを行うため、実行時間はO(n^2)(nは文字数)になります。 より高パフォーマンスなアプローチは、文字列蓄積用の可変コンテナであるStringBuilder
内に結果を蓄積することです。 可変リダクションを並列化する際には、通常のリダクションの場合と同じテクニックを使用できます。
可変リダクション操作にはcollect()
という名前が付いていますが、これは、Collection
などの結果コンテナ内に必要な結果を集めるからです。 collect
は3つの関数を要求します。結果コンテナの新しいインスタンスを構築するサプライヤ関数、結果コンテナに入力要素を組み込むアキュムレータ関数、ある結果コンテナの内容を別のコンテナにマージする結合関数です。 この形式は、通常のリダクションの汎用形式に非常に似ています。
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
collect
をこのような抽象的な方法で表現するメリットは、reduce()
の場合と同じく、直接的な並列化が可能になる点にあります。いくつかの部分的な結果を並列して蓄積した後、それらを結合することができます(ただし、蓄積関数と結合関数が該当する要件を満たす必要がある)。 たとえば、ストリーム内の要素のString表現をArrayList
内に集めるには、次のような明らかな順次for-each形式を記述できます。
ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
strings.add(element.toString());
}
あるいは、並列化可能なcollect形式を使用することもできます。
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
(c, e) -> c.add(e.toString()),
(c1, c2) -> c1.addAll(c2));
あるいは、アキュムレータ関数からマッピング操作を抜き出し、次のようにより簡潔に表現することもできます。
List<String> strings = stream.map(Object::toString)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
ここで、サプライヤは単なるArrayListのコンストラクタ
であり、アキュムレータは文字列化された要素をArrayList
に追加し、コンバイナは単純にaddAll
を使って一方のコンテナの文字列を他方にコピーします。
collect
の3つの側面であるサプライヤ、アキュムレータ、コンバイナは緊密に結合しています。 Collector
という抽象を使えば、3つすべての側面を捉えることができます。 List
内に文字列を集める上の例は、標準のCollector
を使って次のように書き換えることができます。
List<String> strings = stream.map(Object::toString)
.collect(Collectors.toList());
可変リダクションをCollector内にパッケージ化することには、合成可能性というもう1つのメリットがあります。 クラスCollectors
に含まれるさまざまな定義済のコレクタ・ファクトリの中には、あるコレクタを別のコレクタに変換するコンビネータも含まれています。 たとえば、従業員ストリームの給料の合計を計算する次のようなコレクタがあるとします。
Collector<Employee, ?, Integer> summingSalaries
= Collectors.summingInt(Employee::getSalary);
(2番目の型パラメータの?
は、このコレクタで使用される中間表現には興味がないことを示しているだけです。) 給料の部門別合計の表を作成するコレクタを作成する場合、groupingBy
を使えばsummingSalaries
を再利用できます。
Map<Department, Integer> salariesByDept
= employees.stream().collect(Collectors.groupingBy(Employee::getDepartment,
summingSalaries));
通常のリダクション操作の場合と同じく、collect()
操作を並列化できるのは、該当の条件が満たされた場合だけです。 部分的に蓄積された任意の結果について、その結果を空の結果コンテナと結合したときに元と同じ結果が生成される必要があります。 つまり、一連のアキュムレータ呼出しやコンバイナ呼出しの結果として得られた、部分的に蓄積された結果p
について、p
はcombiner.apply(p, supplier.get())
と等しくなる必要があります。
さらに、計算を分割しても同じ結果が得られる必要があります。 任意の入力要素t1
とt2
について、以下の計算の結果r1
とr2
が等しくなる必要があります。
A a1 = supplier.get();
accumulator.accept(a1, t1);
accumulator.accept(a1, t2);
R r1 = finisher.apply(a1); // result without splitting
A a2 = supplier.get();
accumulator.accept(a2, t1);
A a3 = supplier.get();
accumulator.accept(a3, t2);
R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting
ここで、等価性の判定は一般にObject.equals(Object)
によりますが、順序の違いを考慮するために等価性の条件が緩和されることもあります。
リダクション、並行性、および順序付け
ある複雑なリダクション操作、たとえば次のようなMap
を生成するcollect()
を考えます。
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.collect(Collectors.groupingBy(Transaction::getBuyer));
この操作を並列実行すると、実際には逆効果となる可能性があります。 それは、結合ステップ(あるMap
をキーに基づいて別のマップにマージするステップ)のコストが、一部のMap
実装で高くなる可能性があるからです。
ところが、このリダクションで使用される結果コンテナが、ConcurrentHashMap
のような、同時変更可能なコレクションであったとします。 その場合、アキュムレータの並列呼出しでは、実際には同じ共有結果コンテナに結果を同時に蓄積できるため、コンバイナで複数の結果コンテナをマージする必要がなくなります。 これにより、並列実行のパフォーマンスが向上する可能性があります。 これを並行リダクションと呼びます。
並行リダクションをサポートするCollector
には、Collector.Characteristics.CONCURRENT
特性のマークが付けられています。 ただし、並行コレクションにはデメリットもあります。 複数のスレッドから同時に共有コンテナに結果が蓄積されるので、結果が蓄積される順番が非決定論的になります。 したがって、並行リダクションが可能なのは、処理対象のストリームで順序付けが重要でない場合のみになります。 Stream.collect(Collector)
実装が並行リダクションを実行するのは、次の場合だけです。
- ストリームが並列的であり、
- コレクタが
Collector.Characteristics.CONCURRENT
特性を持っていて、 - ストリームが順序付けされていないか、コレクタが
Collector.Characteristics.UNORDERED
特性を持つ。
BaseStream.unordered()
メソッドを使用します。 たとえば、
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.unordered()
.collect(groupingByConcurrent(Transaction::getBuyer));
(ここで、Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>)
はgroupingBy
の並行版です)。
ある特定のキーの各要素がソースと同じ順番で現れることが重要である場合には、並行リダクションは使用できません。順序付けは、並行挿入で犠牲になるものの1つだからです。 その場合は、順次リダクションかマージベースの並列リダクションを実装するように制約されます。
結合性
演算子または関数op
が結合的となるのは、次が成り立つ場合です。
(a op b) op c == a op (b op c)
ここで項を4つに増やせば、並列評価でのこれの重要性が明確になります。
a op b op c op d == (a op b) op (c op d)
したがって、(a op b)
と(c op d)
を並列に評価した後、それらの結果に対してop
を呼び出すことができます。
結合的な演算の例として、数値の加算、最小、最大、および文字列の連結が挙げられます。
低レベルのストリーム構築
ここまで、ストリームのすべての例で、Collection.stream()
やArrays.stream(Object[])
のようなメソッドを使ってストリームが取得されていました。 それらのストリーム生成メソッドはどのように実装されているのでしょうか。
クラスStreamSupport
には、ストリーム作成用の低レベルのメソッドがいくつか含まれていますが、それらはすべて、なんらかの形式のSpliterator
を使用しています。 スプリッテレータはIterator
の並列版であり、(無限個の可能性もある)要素のコレクションを記述し、順次前進、一括トラバーサル、および入力内で並列処理可能な部分を抜き取って別のスプリッテレータ内に格納する操作をサポートします。 最低レベルでは、すべてのストリームがスプリッテレータによって駆動されます。
スプリッテレータを実装するうえでの実装上の選択肢は多数存在していますが、そのほとんどすべてが、実装の単純さと、そのスプリッテレータを使用するストリームの実行時パフォーマンスとのトレードオフの上に成り立っています。 もっとも単純だがもっとも低パフォーマンスなスプリッテレータの作成方法は、Spliterators.spliteratorUnknownSize(java.util.Iterator, int)
を使ってイテレータから作成することです。 そのようなスプリッテレータは動作はするものの、並列パフォーマンスが低いことが予想されます。サイズ設定情報(基礎となるデータ・セットの大きさ)が失われているほか、単純すぎる分割アルゴリズムに制約されているからです。
より高品質のスプリッテレータは、バランスの取れた既知サイズのスプリット、正確なサイズ設定情報、およびスプリッテレータやデータのその他のいくつかのcharacteristics
を提供します(これらの情報は実装内で、実行を最適化するために使用できる)。
可変データ・ソースのスプリッテレータには、データへのバインドのタイミングという、もう1つの課題があります。スプリッテレータが作成されてからストリーム・パイプラインが実行されるまでの間に、データが変更される可能性があるからです。 ストリームのスプリッテレータは、IMMUTABLE
、CONCURRENT
のいずれかの特性を報告するのが理想ですが、そうでない場合は遅延バインディングであるべきです。 ソースが推奨のスプリッテレータを直接提供できない場合は、Supplier
を使ってスプリッテレータを間接的に提供し、Supplier
を受け取るバージョンのstream()
経由でストリームを構築することができます。 ストリーム・パイプラインの終端操作が開始された後で初めて、サプライヤからスプリッテレータが取得されます。
これらの要件に従えば、ストリーム・ソースの変更とストリーム・パイプラインの実行の間の潜在的な干渉のスコープが大幅に減少します。 必要な特性を備えたスプリッテレータに基づくストリーム、またはSupplierベースのファクトリ形式を使用するストリームは、終端操作が開始される前にデータ・ソースが変更されても対応できます(ただし、ストリーム操作の動作パラメータが、非干渉とステートレスに関する必要条件を満たしている必要があります)。 詳細については、「非干渉」を参照してください。
- 導入されたバージョン:
- 1.8
-
クラス説明BaseStream<T,
S extends BaseStream<T, S>> 順次および並列の集約操作をサポートする要素シーケンスであるストリームの基底インタフェース。Collector<T,A, R> 可変結果コンテナに入力要素を蓄積し、オプションですべての入力要素が処理された後で蓄積された結果を最終的な表現に変換する可変リダクション操作。リダクション実装の最適化に使用可能な、Collector
のプロパティーを示す特性。要素をコレクションに蓄積したり、さまざまな条件に従って要素を要約するなど、有用な各種リダクション操作を実装したCollector
実装。順次および並列の集約操作をサポートするプリミティブdouble値要素のシーケンスです。DoubleStream
の可変ビルダーです。double
値の引数およびDoubleConsumerを受け入れ、結果を返さない演算を表します。順次および並列の集約操作をサポートするプリミティブint値要素のシーケンスです。IntStream
の可変ビルダーです。int
値の引数およびIntConsumerを受け入れ、結果を返さない演算を表します。順次および並列の集約操作をサポートするプリミティブlong値要素のシーケンスです。LongStream
の可変ビルダーです。long
値の引数およびLongConsumerを受け入れ、結果を返さない演算を表します。Stream<T>順次および並列の集約操作をサポートする要素のシーケンスです。Stream
の可変ビルダーです。ストリームを作成および操作するための低レベルのユーティリティ・メソッドです。