Kafka用Hiveストレージ・ハンドラを使用すると、Hive (およびOracle Big Data SQL)でKafkaトピックを問い合せることができます。
Kafkaデータにアクセスできるようにするには、Kafkaトピックに対してHive外部表を作成します。HiveでKafkaデータ形式を読み取れるようにするOracle Big Data SQLストレージ・ハンドラは、oracle.hadoop.kafka.hive.KafkaStorageHandler
です。
このストレージ・ハンドラを使用して、Kafkaにあるデータが裏に存在するHive外部表を作成できます。Hive外部表を介して、Big Data SQLでKafkaデータを問い合せることができるようになります。
Hive DDLを次の例に示します。ここで、topic1およびtopic2は、KafkaのStringシリアライザによってキーがシリアライズされ、KafkaのLongシリアライザによって値がシリアライズされるKafkaブローカの2つのトピックです。
CREATE EXTERNAL TABLE test_table row format serde ‘oracle.hadoop.kafka.hive.KafkaSerDe’ stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler' tblproperties('oracle.kafka.table.key.type'='string', 'oracle.kafka.table.value.type'='long', 'oracle.kafka.bootstrap.servers'='nshgc0602:9092', 'oracle.kafka.table.topics'='topic1,topic2');
次の例は、結果のHive表を示します。Kafkaのキー、値、オフセット、トピック名およびpartitionidがHive列にマップされます。 Hive問合せのWHERE句によって、トピック/パーティションの各ペアにオフセットを明示的に指定できます。
hive> describe test_table; OK topic string from deserializer partitionid int from deserializer key string from deserializer value bigInt from deserializer offset bigint from deserializer timestamptype smallInt from deserializer timestamp timestamp from deserializer Time taken: 0.084 seconds, Fetched: 7 row(s)
hive> select * from test_table where (topic="topic1" and partitoinid=0 and offset > 199) or (topic="topic1" and partitionid=1 and offset > 198) or (topic="topic2" and partitionid=0 and offset > 177) or (topic="topic2" and partitionid=1 and offset > 176);すべてのトピック/パーティションのオフセットを追跡する必要があります。たとえば、Oracle表を使用してこれらのオフセットを格納できます。新しいデータを追跡するさらに便利な方法は、タイムスタンプ列を使用することです。次の問合せを使用して、特定の時点の後にデータを問い合せることができます。
hive> select * from test_table where timestamp > '2017-07-12 11:30:00';
すべての表プロパティの詳細は、次の「プロパティ・リファレンス」の項を参照してください
プロパティ・リファレンス
表A-1 Kafka用Hiveストレージ・ハンドラの表プロパティ
プロパティ名 | 必要性 | 説明 |
---|---|---|
oracle.kafka.table.topics |
必須 |
Kafkaトピックのカンマ区切りリスト。各Kafkaトピック名は、文字(大文字と小文字)、数字、.(ドット)、_(アンダースコア)および-(マイナス)のみで構成される必要があります。各トピック名の最大長は249です。これらのトピックは、同じシリアライズ・メカニズムを持つ必要があります。結果のHive表は、ここにリストされるすべてのトピックのレコードで構成されます。Hive列"topic"が追加され、各レコードのトピック名に設定されます。 |
oracle.kafka.bootstrap.servers |
必須 |
このプロパティは、基礎となるKafkaコンシューマに対して"bootstrap.servers"プロパティに変換されます。コンシューマは、ここでどのサーバーがブートストラップに指定されているかに関係なく、すべてのサーバーを利用します。このリストは、サーバーの完全なセットの検出に使用される初期ホストにのみ影響します。このリストは |
oracle.kafka.table.key.type |
オプション |
レコードのキー・タイプ。設定していない場合、Kafkaレコードのキーの部分はHive行で無視されます。"string"、"integer"、"long"、"double"、"avro"、"avro_confluent"の値のみがサポートされます。"string"、"integer"、"double"および"long"は、Kafkaでサポートされる組込みのプリミティブ・シリアライズ・タイプに対応します。このプロパティがこれらのプリミティブ・タイプのいずれかである場合、各レコードのKafkaキーが単一のHive列にマップされます。このプロパティが"avro"または"avro_confluent"に設定されている場合、 "avro"と"avro_confluent"の違いは、シリアライズのワイヤー形式がわずかに異なることです。"avro"の場合、キーのバイト配列全体がavroシリアライズのバイトで構成されます。"avro_confluent"の場合、バイト配列はマジック・バイト、バージョン番号、次にキーのavroシリアライズのバイトで構成されます。 |
oracle.kafka.table.value.type |
オプション |
レコードの値タイプ。設定していない場合、Kafkaレコードの値の部分はHive行で無視されます。このプロパティの使用方法は、 |
oracle.kafka.table.key.writer.schema |
オプション |
KafkaキーのAvroシリアライズに対するオプションのライター・スキーマ。キーのリーダー・スキーマが、キーがKafkaブローカに書き込まれたスキーマと異なる場合に必要です。Kafkaキーがシリアライズされる正確なスキーマである必要があります。 |
oracle.kafka.table.key.schema |
"oracle.kafka.table.key.type"が"avro"または"avro_confluent"の場合に必須 |
KafkaキーのAvroリーダー・スキーマのJSON文字列。KafkaキーのAvroライター・スキーマと正確に同じである必要はありません。リーダー・スキーマがKafkaキーまたはコンバータから変換されたオブジェクトと互換性があるかぎり、有効です。これを使用すると、Hive列の名前の変更や、Hive行でKafkaキーから保持するフィールドの選択を行うことができます。このプロパティのスキーマが、Kafkaキーをシリアライズしたスキーマと異なる場合、 |
oracle.kafka.table.value.writer.schema |
オプション |
Kafka値のAvroシリアライズに対するオプションのライター・スキーマ。この使用方法は |
oracle.kafka.table.value.schema |
"oracle.kafka.table.value.type"が"avro"または"avro_confluent"の場合に必須 |
Kafka値のAvroリーダー・スキーマのJSON文字列。この使用方法は |
oracle.kafka.table.extra.columns |
オプション、デフォルトで"true" |
追加のKafka列: |
oracle.kafka.chop.partition |
オプション、デフォルトでfalse |
Kafkaパーティションを小さいチャンクに分割するかどうかを制御するブール・フラグ。これは、Kafkaパーティションの数が少なく、各Kafkaパーティションのサイズが大きい場合に役立ちます。 |
oracle.kafka.partition.chunk.size |
オプション |
oracle.kafka.chop.partitionがtrueの場合、このプロパティは各パーティション・チャンクのKafkaレコードの数を制御します。(分割の最適なサイズ)/(Kafkaレコードの平均サイズ)によって見積もった値を設定する必要があります。たとえば、分割の最適なサイズが256 MBで、Kafkaレコードの平均サイズが256バイトの場合、このプロパティは1000000に設定する必要があります。 |