A Using Oracle's Hive Storage Handler for Kafka to Create a Hive External Table for Kafka Topics

The Hive storage handler for Kafka enables Hive (as well as Oracle Big Data SQL) to query Kafka topics.

To provide access to Kafka data, you create a Hive external table over the Kafka topics. The Oracle Big Data SQL storage handler that enables Hive to read the Kafka data format is oracle.hadoop.kafka.hive.KafkaStorageHandler .

You can use this storage handler to create external Hive tables backed by data residing in Kafka. Big Data SQL can then query the Kafka data through the external Hive tables.  

The Hive DDL is demonstrated by the following example, where topic1 and topic2 are two topics in Kafka broker whose keys are serialized by Kafka's String serializer and whose values are serialized by kafka's Long serializer.

CREATE EXTERNAL TABLE test_table
row format serde ‘oracle.hadoop.kafka.hive.KafkaSerDe’
stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler'
tblproperties('oracle.kafka.table.key.type'='string',
                     'oracle.kafka.table.value.type'='long',
                     'oracle.kafka.bootstrap.servers'='nshgc0602:9092',
                     'oracle.kafka.table.topics'='topic1,topic2');

The example below shows the resulting Hive table. The Kafka key, value, offset, topic name, and partitionid are mapped to Hive columns.  You can explicitly designate the offset for each topic/partition pair through a WHERE clause in you Hive query.  

hive> describe test_table;
OK
topic            string                 from deserializer   
partitionid      int                    from deserializer   
key              string                 from deserializer   
value            bigInt                 from deserializer   
offset           bigint                 from deserializer
timestamptype    smallInt            from deserializer
timestamp        timestamp           from deserializer
Time taken: 0.084 seconds, Fetched: 7 row(s) 
The content of the table is a snapshot of the Kafka topics when the Hive query is executed. When new data is inserted into the Kafka topics, you can use the offset column or the timestamp column to track the changes to the topic. The offsets are per topic/partition. For example, the following query will return new messages after the specified offsets in the where clause for each topic/partition:
hive> select * from test_table where (topic="topic1" and partitoinid=0 and offset > 199) or (topic="topic1" and partitionid=1 and offset > 198) or (topic="topic2" and partitionid=0 and offset > 177) or (topic="topic2" and partitionid=1 and offset > 176);
You need to keep track of the offsets for all topic/partition. For example, you can use an Oracle table to store these offsets. A more convenient way to keep track of new data is using the timestamp column. You can query data after a specific time point using the following query:
hive> select * from test_table where timestamp > '2017-07-12 11:30:00'; 

See the Property Reference section below for descriptions of all table properties

Property Reference

Table A-1 Table Properties of Hive Storage Handler for Kafka

Property Name Requirement Description

oracle.kafka.table.topics

Required

A comma-separated list of Kafka topics. Each Kafka topic name must consists of only letters (uppercase and lowercase), numbers, .(dot), _(underscore), and -(minus). The maximum length for each topic name is 249. These topics must have the same serialization mechanisms. The resulting Hive table consists of records from all the topics listed here. A Hive column “topic” will be added and it will be set to the topic name for each record.

oracle.kafka.bootstrap.servers

Required

This property will be translated to the “bootstrap.servers” property for the underlying Kafka consumer. The consumer makes use of all servers, irrespective of which servers are specified here for bootstrapping. This list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers. For availability reasons, you may want to list more than one server.

oracle.kafka.table.key.type

Optional

The key type for your record. If unset, then the key part of the Kafka record will be ignored in the Hive row. Only values of “string”, “integer”, “long”, “double”, “avro”, “avro_confluent”are supported. “string”, “integer”, “double” and “long” correspond to the built-in primitive serialization types supported by Kafka. If this property is one of these primitive types, then the Kafka key for each record will be mapped to one single Hive Column. If this property is set to “avro” or “avro_confluent”, then oracle.kafka.table.key.schema is required. The Kafka key for each record will be deserialized into an Avro Object. If the Avro schema is of record type then each first level field of the record will be mapped to a single Hive column. If the Avro schema is not of Record Type, then it will be mapped to a single Hive Column named “key”.

The difference between “avro” and “avro_confluent” is that the wire format for the serialization is slightly different. For “avro”, the entire bytes array of the key consists of the bytes of avro serialization. For “avro_confluent”, the bytes array consists of a magic byte, a version number, then the bytes of avro serialization of the key.

oracle.kafka.table.value.type

Optional

The value type of your record. If unset, then the value part of Kafka record will be ignored in the Hive row. Use of this property is similar to use of oracle.kafka.table.key.type. The difference between them is: when the Avro Schema for Kafka value is not of record type. The whole Avro object will be mapped to a single Hive Column named “value” instead of “key”.

oracle.kafka.table.key.writer.schema

Optional

An optional writer schema for the Kafka key’s Avro serialization. It’s required when the reader schema for the key is different from the schema in which the keys are written to Kafka brokers. It must be the exact schema in which Kafka keys are serialized.

oracle.kafka.table.key.schema

Required when “oracle.kafka.table.key.type” is “avro” or “avro_confluent”

The JSON string for the Kafka key's Avro reader schema. It doesn't need to be exactly the same as the Kafka key's writer Avro schema. As long as the reader schema is compatible with the Kafka key or the converted object from the converter, it is valid. This enables you to rename Hive columns and choose what fields to keep from the Kafka key in the Hive row. If the schema in this property is different from the schema in which the Kafka keys are serialized, then oracle.kafka.table.key.writer.schema is required.

oracle.kafka.table.value.writer.schema

Optional

An optional writer schema for the Kafka value’s Avro serialization. Its use is similar to oracle.kafka.table.key.writer.schema.

oracle.kafka.table.value.schema

Required when “oracle.kafka.table.value.type” is “avro” or “avro_confluent”

The JSON string for the Kafka value's Avro reader schema. Its use is similar to oracle.kafka.table.key.schema.

oracle.kafka.table.extra.columns

Optional, default to “true”

A boolean flag to control whether to include extra Kafka columns: paritionid, offset, timestamptype.

oracle.kafka.chop.partition

Optional, default to false

A Boolean flag to control whether to chop Kafka partitions into smaller chunks. This is useful when the number of Kafka partitions is small and the size of each Kafka partition is large.

oracle.kafka.partition.chunk.size

Optional

When oracle.kafka.chop.partition is true, this property controls the number of Kafka records in each partition chunk. It should be set a value estimated by (Ideal size of a split)/(Average size of a Kafka record). For example, if the ideal size of a split is 256 MB and the average size of s Kafka record is 256 Bytes, then this property should be set to 1000000.