B Using Oracle's Hive Storage Handler for Kafka to Create a Hive External Table for Kafka Topics
The Hive storage handler for Kafka enables Hive (as well as Oracle Big Data SQL) to query Kafka topics.
To provide access to Kafka data, you create a Hive external table over the Kafka topics. The Oracle Big Data SQL storage handler that enables Hive to read the Kafka data format is oracle.hadoop.kafka.hive.KafkaStorageHandler
.
You can use this storage handler to create external Hive tables backed by data residing in Kafka. Big Data SQL can then query the Kafka data through the external Hive tables.
The Hive DDL is demonstrated by the following example, where topic1 and topic2 are two topics in Kafka broker whose keys are serialized by Kafka's String serializer and whose values are serialized by kafka's Long serializer.
CREATE EXTERNAL TABLE test_table
row format serde ‘oracle.hadoop.kafka.hive.KafkaSerDe’
stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler'
tblproperties('oracle.kafka.table.key.type'='string',
'oracle.kafka.table.value.type'='long',
'oracle.kafka.bootstrap.servers'='nshgc0602:9092',
'oracle.kafka.table.topics'='topic1,topic2');
The example below shows the resulting Hive table. The Kafka key, value, offset, topic name, and partitionid are mapped to Hive columns. You can explicitly designate the offset for each topic/partition pair through a WHERE clause in you Hive query.
hive> describe test_table;
OK
topic string from deserializer
partitionid int from deserializer
key string from deserializer
value bigInt from deserializer
offset bigint from deserializer
timestamptype smallInt from deserializer
timestamp timestamp from deserializer
Time taken: 0.084 seconds, Fetched: 7 row(s)
hive> select * from test_table where (topic="topic1" and partitoinid=0 and offset > 199) or (topic="topic1" and partitionid=1 and offset > 198) or (topic="topic2" and partitionid=0 and offset > 177) or (topic="topic2" and partitionid=1 and offset > 176);
You need to keep track of the offsets for all topic/partition. For example, you can use an Oracle table to store these offsets. A more convenient way to keep track of new data is using the timestamp column. You can query data after a specific time point using the following query:hive> select * from test_table where timestamp > '2017-07-12 11:30:00';
See the Property Reference section below for descriptions of all table properties
Property Reference
Table A-1 Table Properties of Hive Storage Handler for Kafka
Property Name | Requirement | Description |
---|---|---|
oracle.kafka.table.topics |
Required |
A comma-separated list of Kafka topics. Each Kafka topic name must consists of only letters (uppercase and lowercase), numbers, .(dot), _(underscore), and -(minus). The maximum length for each topic name is 249. These topics must have the same serialization mechanisms. The resulting Hive table consists of records from all the topics listed here. A Hive column “topic” will be added and it will be set to the topic name for each record. |
oracle.kafka.bootstrap.servers |
Required |
This property will be translated to the “bootstrap.servers” property for the underlying Kafka consumer. The consumer makes use of all servers, irrespective of which servers are specified here for bootstrapping. This list only impacts the initial hosts used to discover the full set of servers. This list should be in the form |
oracle.kafka.table.key.type |
Optional |
The key type for your record. If unset, then the key part of the Kafka record will be ignored in the Hive row. Only values of “string”, “integer”, “long”, “double”, “avro”, “avro_confluent”are supported. “string”, “integer”, “double” and “long” correspond to the built-in primitive serialization types supported by Kafka. If this property is one of these primitive types, then the Kafka key for each record will be mapped to one single Hive Column. If this property is set to “avro” or “avro_confluent”, then The difference between “avro” and “avro_confluent” is that the wire format for the serialization is slightly different. For “avro”, the entire bytes array of the key consists of the bytes of avro serialization. For “avro_confluent”, the bytes array consists of a magic byte, a version number, then the bytes of avro serialization of the key. |
oracle.kafka.table.value.type |
Optional |
The value type of your record. If unset, then the value part of Kafka record will be ignored in the Hive row. Use of this property is similar to use of |
oracle.kafka.table.key.writer.schema |
Optional |
An optional writer schema for the Kafka key’s Avro serialization. It’s required when the reader schema for the key is different from the schema in which the keys are written to Kafka brokers. It must be the exact schema in which Kafka keys are serialized. |
oracle.kafka.table.key.schema |
Required when “oracle.kafka.table.key.type” is “avro” or “avro_confluent” |
The JSON string for the Kafka key's Avro reader schema. It doesn't need to be exactly the same as the Kafka key's writer Avro schema. As long as the reader schema is compatible with the Kafka key or the converted object from the converter, it is valid. This enables you to rename Hive columns and choose what fields to keep from the Kafka key in the Hive row. If the schema in this property is different from the schema in which the Kafka keys are serialized, then |
oracle.kafka.table.value.writer.schema |
Optional |
An optional writer schema for the Kafka value’s Avro serialization. Its use is similar to |
oracle.kafka.table.value.schema |
Required when “oracle.kafka.table.value.type” is “avro” or “avro_confluent” |
The JSON string for the Kafka value's Avro reader schema. Its use is similar to |
oracle.kafka.table.extra.columns |
Optional, default to “true” |
A boolean flag to control whether to include extra Kafka columns: |
oracle.kafka.chop.partition |
Optional, default to false |
A Boolean flag to control whether to chop Kafka partitions into smaller chunks. This is useful when the number of Kafka partitions is small and the size of each Kafka partition is large. |
oracle.kafka.partition.chunk.size |
Optional |
When oracle.kafka.chop.partition is true, this property controls the number of Kafka records in each partition chunk. It should be set a value estimated by (Ideal size of a split)/(Average size of a Kafka record). For example, if the ideal size of a split is 256 MB and the average size of s Kafka record is 256 Bytes, then this property should be set to 1000000. |