1 カスタム・ステージおよびカスタム関数の開発

カスタム・ステージ・タイプまたはカスタム関数を使用すると、一般的なステージおよび関数では使用できない機能を開発できます。たとえば、一般的でない計算、変換、アルゴリズムなどです。

たとえば、MD5アルゴリズムを使用してメッセージ・ダイジェストを計算するとします。このアルゴリズムは、組込み関数ライブラリに含まれておらず、またこれを式として実装するのは現実的ではありません。

カスタム・ステージ・タイプおよびカスタム関数は、osa.spark-cql.extensibility.api.jarライブラリ内に提供されているインタフェース、クラスおよび注釈を使用して、Javaプログラミング言語で実装します。このjarファイルは、インストール・フォルダosa-base/extensibility-api/osa.spark-cql.extensibility.api.jarからダウンロードできます。詳細は、Oracle Stream AnalyticsにおけるSparkのCQLに対する拡張機能を参照してください。

カスタム・ステージ・タイプの場合、EventProcessorインタフェースを実装し、クラス宣言に@OsaStage注釈を適用する必要があります。入力イベントを取得して出力イベントを返すprocessEvent()メソッドを実装する必要があります。両方とも、それぞれ入力仕様と出力仕様を使用して定義する必要があります。

カスタムjarの作成

カスタムjarは、パイプライン内で使用されるカスタム・ステージ・タイプまたはカスタム関数のJavaクラスが含まれる、ユーザー提供のJarアーカイブです。

カスタムjarを作成する手順:
  1. 「新規アイテムの作成」メニューで、「カスタムjar」を選択します。
    カスタム・ステージおよびカスタム関数用のjarのインポート・ウィザードが表示されます。
  2. 「タイプ・プロパティ」ページで、適切な値を入力するか選択し、「次へ」をクリックします。
    1. 「名前」フィールドに、アプリケーションにインポートするカスタムjarのわかりやすい名前を入力します。
    2. 「説明」フィールドに、適切な説明を入力します。
    3. 「タグ」フィールドで、既存のタグを1つ以上選択するか、独自のタグを入力します。
    4. カスタムjarタイプ・ドロップダウン・リストで、「カスタムjar」を選択します。
  3. カスタムjarの詳細ページで、「ファイルのアップロード」をクリックし、アプリケーションにインポートするjarファイルを選択して、「保存」をクリックします。
    アップロードに選択したjarファイルが有効なjarファイルであり、必要な依存関係がすべて含まれていることを確認してください。

カスタム・ステージ・タイプ

カスタム・ステージは、パイプライン内のストリーム・データにカスタム・ステージ・タイプを適用できるステージ・タイプです。その動作は、データが入って出ていく他のステージ・タイプと同様です。ストリームにロジックが適用される前にいくつかのパラメータを構成するように要求される点で、パターン・ステージと似ています。

カスタム・ステージの追加

パイプライン内にフィルタを追加すると、より正確なストリーム・データを取得できます。

カスタム・ステージを追加する手順:
  1. パイプライン・エディタで、必要なパイプラインを開きます。
  2. カスタム・ステージの追加先の前にあるステージを右クリックします。ステージの追加「カスタム」の順にクリックし、カスタムjarからのカスタム・ステージを選択します。
  3. スコアリング・ステージにわかりやすい名前および適切な説明を入力し、「保存」をクリックします。
  4. ステージ・エディタで、次の項目に適切な値を選択します。
    1. カスタム・ステージ・タイプ — カスタムjarを介して以前にインストールされたカスタム・ステージ
    2. 入力マッピング — 各入力パラメータの前のステージの対応する列
ユースケースに基づいて、複数のカスタム・ステージを追加できます。

カスタム・ステージの実装

カスタム・ステージ・タイプの場合、EventProcessorインタフェースを実装し、クラス宣言に@OsaStage注釈を適用する必要があります。入力イベントを取得して出力イベントを返すprocessEvent()メソッドを実装する必要があります。両方とも、それぞれ入力仕様と出力仕様を使用して定義する必要があります。

カスタム関数

カスタムjarを追加するとインストールされる関数は、カスタム関数と呼ばれます。

カスタム関数は、インストールすると式ビルダーで使用可能になります。カスタム関数は、「カスタム」カテゴリの下にリストされます。これらの関数には、Oracle Stream Analytics内の即時利用可能な関数と同じようにアクセスできます。

カスタム関数の実装

カスタム関数の場合、任意のクラス(カスタム・ステージ・タイプを実装するクラスを含む)内のメソッドに@OsaFunction注釈を適用します。詳細は、Javadocおよびサンプルを参照してください。

注意:

同じまたは異なるjarに含まれる同じパッケージ/クラス/メソッド内で、複数の関数に同じ名前を付けることはサポートされていません。

制限

この項では、カスタム・ステージおよびカスタム関数の制限と制約を示します。

カスタム・ステージ・タイプおよびカスタム関数の条件は、次のとおりです。

  • ステートレス変換にのみ使用されること。ステージ・タイプまたは関数に対する以前の呼び出しからの状態へのアクセスは保証できず、最適化に基づいて変わる可能性があります。

  • ブロッキング呼出しを使用しないこと。

  • 新しいスレッドを開始しないこと。

  • スレッド同期プリミティブ(wait()メソッドを含む)を使用しないこと。このメソッドを使用すると、デッドロックが発生する可能性があります。

  • 完全修飾クラス名を持つか、その一部であること。

カスタム・ステージまたはカスタム関数を使用する際には、ヒープ領域の使用量に注意してください。

注意:

結果として生成されるjarには必要なすべての依存関係およびサード・パーティ・クラスが含まれる必要があり、また、jarファイルのサイズは160 MB未満である必要があります。

データ型のマッピング

次の表に、カスタム・ステージ・タイプおよびカスタム関数で使用可能なデータ型を示します。

Oracle Stream Analyticsのデータ型 Javaのデータ型 コメント

BOOLEAN

boolean

INT

int

BIGINT

long

FLOAT

float

DOUBLE

double

STRING

String

BIGDECIMAL

BigDecimal

TIMESTAMP

long (ナノ秒)

カスタム・ステージ・タイプでのみ使用できます

INTERVAL

long

カスタム・ステージ・タイプでのみ使用できます