主コンテンツへ
Oracle® Big Data Connectorsユーザーズ・ガイド
リリース5 (5.0)
F21918-01
目次へ移動
目次
索引へ移動
索引

前
次

B OracleのKafka用Hiveストレージ・ハンドラを使用したKafkaトピック用Hive外部表の作成

Kafka用Hiveストレージ・ハンドラを使用すると、Hive (およびOracle Big Data SQL)でKafkaトピックを問い合せることができます。

Kafkaデータにアクセスできるようにするには、Kafkaトピックに対してHive外部表を作成します。HiveでKafkaデータ形式を読み取れるようにするOracle Big Data SQLストレージ・ハンドラは、oracle.hadoop.kafka.hive.KafkaStorageHandlerです。

このストレージ・ハンドラを使用して、Kafkaにあるデータが裏に存在するHive外部表を作成できます。Hive外部表を介して、Big Data SQLでKafkaデータを問い合せることができるようになります。 

Hive DDLを次の例に示します。ここで、topic1およびtopic2は、KafkaのStringシリアライザによってキーがシリアライズされ、KafkaのLongシリアライザによって値がシリアライズされるKafkaブローカの2つのトピックです。

CREATE EXTERNAL TABLE test_table
row format serde ‘oracle.hadoop.kafka.hive.KafkaSerDe’
stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler'
tblproperties('oracle.kafka.table.key.type'='string',
                     'oracle.kafka.table.value.type'='long',
                     'oracle.kafka.bootstrap.servers'='nshgc0602:9092',
                     'oracle.kafka.table.topics'='topic1,topic2');

次の例は、結果のHive表を示します。Kafkaのキー、値、オフセット、トピック名およびpartitionidがHive列にマップされます。  Hive問合せのWHERE句によって、トピック/パーティションの各ペアにオフセットを明示的に指定できます。  

hive> describe test_table;
OK
topic            string              	from deserializer   
partitionid      int                 	from deserializer   
key              string              	from deserializer   
value            bigInt              	from deserializer   
offset           bigint              	from deserializer
timestamptype    smallInt            from deserializer
timestamp        timestamp           from deserializer
Time taken: 0.084 seconds, Fetched: 7 row(s) 
表の内容は、Hive問合せの実行時のKafkaトピックのスナップショットです。新しいデータをKafkaトピックに挿入するとき、オフセット列またはタイムスタンプ列を使用してトピックへの変更を追跡できます。オフセットはトピック/パーティションごとです。たとえば、次の問合せは、各トピック/パーティションのWHERE句に指定されたオフセットの後に新しいメッセージを返します。
hive> select * from test_table where (topic="topic1" and partitoinid=0 and offset > 199) or (topic="topic1" and partitionid=1 and offset > 198) or (topic="topic2" and partitionid=0 and offset > 177) or (topic="topic2" and partitionid=1 and offset > 176);
すべてのトピック/パーティションのオフセットを追跡する必要があります。たとえば、Oracle表を使用してこれらのオフセットを格納できます。新しいデータを追跡するさらに便利な方法は、タイムスタンプ列を使用することです。次の問合せを使用して、特定の時点の後にデータを問い合せることができます。
hive> select * from test_table where timestamp > '2017-07-12 11:30:00'; 

すべての表プロパティの詳細は、次の「プロパティ・リファレンス」の項を参照してください

プロパティ・リファレンス

表A-1 Kafka用Hiveストレージ・ハンドラの表プロパティ

プロパティ名 必要性 説明

oracle.kafka.table.topics

必須

Kafkaトピックのカンマ区切りリスト。各Kafkaトピック名は、文字(大文字と小文字)、数字、.(ドット)、_(アンダースコア)および-(マイナス)のみで構成される必要があります。各トピック名の最大長は249です。これらのトピックは、同じシリアライズ・メカニズムを持つ必要があります。結果のHive表は、ここにリストされるすべてのトピックのレコードで構成されます。Hive列"topic"が追加され、各レコードのトピック名に設定されます。

oracle.kafka.bootstrap.servers

必須

このプロパティは、基礎となるKafkaコンシューマに対して"bootstrap.servers"プロパティに変換されます。コンシューマは、ここでどのサーバーがブートストラップに指定されているかに関係なく、すべてのサーバーを利用します。このリストは、サーバーの完全なセットの検出に使用される初期ホストにのみ影響します。このリストはhost1:port1,host2:port2,....という形式にする必要があります。これらのサーバーは、完全なクラスタ・メンバーシップ(動的に変更される場合があります)を検出するための初期接続にのみ使用されるため、このリストにサーバーの完全なセットを含める必要はありません。可用性の理由のため、複数のサーバーをリストしてください。

oracle.kafka.table.key.type

オプション

レコードのキー・タイプ。設定していない場合、Kafkaレコードのキーの部分はHive行で無視されます。"string"、"integer"、"long"、"double"、"avro"、"avro_confluent"の値のみがサポートされます。"string"、"integer"、"double"および"long"は、Kafkaでサポートされる組込みのプリミティブ・シリアライズ・タイプに対応します。このプロパティがこれらのプリミティブ・タイプのいずれかである場合、各レコードのKafkaキーが単一のHive列にマップされます。このプロパティが"avro"または"avro_confluent"に設定されている場合、oracle.kafka.table.key.schemaが必須です。各レコードのKafkaキーはAvroオブジェクトにデシリアライズされます。Avroスキーマがレコード・タイプのものである場合、レコードの最初のレベルの各フィールドが単一のHive列にマップされます。Avroスキーマがレコード・タイプのものでない場合は、"key"という名前の単一のHive列にマップされます。

"avro"と"avro_confluent"の違いは、シリアライズのワイヤー形式がわずかに異なることです。"avro"の場合、キーのバイト配列全体がavroシリアライズのバイトで構成されます。"avro_confluent"の場合、バイト配列はマジック・バイト、バージョン番号、次にキーのavroシリアライズのバイトで構成されます。

oracle.kafka.table.value.type

オプション

レコードの値タイプ。設定していない場合、Kafkaレコードの値の部分はHive行で無視されます。このプロパティの使用方法は、oracle.kafka.table.key.typeの使用方法と似ています。それらの違いは、Kafka値のAvroスキーマがレコード・タイプのものではない場合です。Avroオブジェクト全体が、"key"ではなく"value"という名前の単一のHive列にマップされます。

oracle.kafka.table.key.writer.schema

オプション

KafkaキーのAvroシリアライズに対するオプションのライター・スキーマ。キーのリーダー・スキーマが、キーがKafkaブローカに書き込まれたスキーマと異なる場合に必要です。Kafkaキーがシリアライズされる正確なスキーマである必要があります。

oracle.kafka.table.key.schema

"oracle.kafka.table.key.type"が"avro"または"avro_confluent"の場合に必須

KafkaキーのAvroリーダー・スキーマのJSON文字列。KafkaキーのAvroライター・スキーマと正確に同じである必要はありません。リーダー・スキーマがKafkaキーまたはコンバータから変換されたオブジェクトと互換性があるかぎり、有効です。これを使用すると、Hive列の名前の変更や、Hive行でKafkaキーから保持するフィールドの選択を行うことができます。このプロパティのスキーマが、Kafkaキーをシリアライズしたスキーマと異なる場合、oracle.kafka.table.key.writer.schemaが必須です。

oracle.kafka.table.value.writer.schema

オプション

Kafka値のAvroシリアライズに対するオプションのライター・スキーマ。この使用方法はoracle.kafka.table.key.writer.schemaと似ています。

oracle.kafka.table.value.schema

"oracle.kafka.table.value.type"が"avro"または"avro_confluent"の場合に必須

Kafka値のAvroリーダー・スキーマのJSON文字列。この使用方法はoracle.kafka.table.key.schemaと似ています。

oracle.kafka.table.extra.columns

オプション、デフォルトで"true"

追加のKafka列: paritionidoffsettimestamptypeを含めるかどうかを制御するブール・フラグ。

oracle.kafka.chop.partition

オプション、デフォルトでfalse

Kafkaパーティションを小さいチャンクに分割するかどうかを制御するブール・フラグ。これは、Kafkaパーティションの数が少なく、各Kafkaパーティションのサイズが大きい場合に役立ちます。

oracle.kafka.partition.chunk.size

オプション

oracle.kafka.chop.partitionがtrueの場合、このプロパティは各パーティション・チャンクのKafkaレコードの数を制御します。(分割の最適なサイズ)/(Kafkaレコードの平均サイズ)によって見積もった値を設定する必要があります。たとえば、分割の最適なサイズが256 MBで、Kafkaレコードの平均サイズが256バイトの場合、このプロパティは1000000に設定する必要があります。