public class KafkaConsumer<K,V> extends java.lang.Object implements Consumer<K,V>
group.id
will be part of the same consumer group.
In consumer group there can be multiple consumer instances. Each instance internally holds single connection and session to TEQ. In 20c release or version 0.8 of OKafka.jar file, each consumer instance/session is assigned a single partition (or a single stream) of of subscribed topic. Client application has to start as many consumer instances as number of partitions of the topic. If consumer instance goes down , messages from assigned partition are not dequeued untill new/same consumer instance comes up. However other instances continue consuming fron their assigned partition whether consumer instance goes up or down i.e. there is no consumer rebalancing in any situation. If oracle db instance goes down, consumer instance tries connecting to same oracle db instance and consumes from newly assigned partition.
A Consumer can subscribe to single topic using subscribe
. Consumer throws an exception if topic subscription collection size is greater than one. Also consumers can't subscribe using subscribe(Pattern)
, subscribe(Pattern, ConsumerRebalanceListener)
, subscribe(Collection, ConsumerRebalanceListener)
.
committed position
. This is the last offset that has been stored securely. Should the process starts or fail and restart , this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync
and commitAsync
). Below examples show how to use periodic or manual offset commit.
Properties props = new Properties();
props.put("oracle.service.name", "serviceid.regress.rdbms.dev.us.oracle.com");
props.put("oracle.instance.name", "instancename");
props.put("oracle.user.name", "username");
props.put("oracle.password", "pwd");
props.put("bootstrap.servers", "IP:PORT");
props.put("group.id", "groupid");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "10000");
props.put("key.deserializer", "org.oracle.okafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.oracle.okafka.common.serialization.StringDeserializer"); p
rops.put("max.poll.records", 100);
KafkaConsumer<String, String> consumer = null;
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = null;
try {
records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = , partition= ,key= , value = \n"+
record.topic()+ " "+record.partition()+ " "+record.key()+" "+ record.value());
System.out.println(".......");
}
consumer.commitSync();
}catch(Exception ex) {
ex.printStackTrace();
} finally {
consumer.close();
}
The connection to the cluster is bootstrapped by specifying a one broker to contact using the configuration bootstrap.servers
.
Setting enable.auto.commit
means that offsets are committed automatically with a frequency controlled by the config auto.commit.interval.ms
.
In this example the consumer is subscribing to the topic foo as part of a group of consumers called groupid as configured with group.id
.
The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we are saying that our record's key and value will just be simple strings.
Instead of relying on the consumer to periodically commit consumed offsets, users can also control when records should be considered as consumed and hence commit their offsets. This is useful when the consumption of the messages is coupled with some processing logic and hence a message should not be considered as consumed until it is completed processing.
Properties props = new Properties(); props.put("oracle.service.name", "serviceid.regress.rdbms.dev.us.oracle.com"); props.put("oracle.instance.name", "instancename"); props.put("oracle.user.name", "username"); props.put("oracle.password", "pwd"); props.put("bootstrap.servers", "IP:PORT"); props.put("group.id", "groupid"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "10000"); props.put("key.deserializer", "org.oracle.okafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.oracle.okafka.common.serialization.StringDeserializer"); props.put("max.poll.records", 100); KafkaConsumer<String, String> consumer = null; consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); ConsumerRecords<String, String> records = null; try { final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() <= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } } }catch(Exception ex) { ex.printStackTrace(); } finally { consumer.close(); }In this example we will consume a batch of records and batch them up in memory. When we have enough records batched, we will insert them into a database. If our process fails before commitSync() then all consumed messages after previous commit are rolled back and considered not consumed. If process restarted it starts consuming from next of previous committed offset(msgid).
The okafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException
.
Constructor and Description |
---|
KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
A consumer is instantiated by providing a set of key-value pairs as configuration.
|
KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value
Deserializer . |
KafkaConsumer(java.util.Properties properties)
A consumer is instantiated by providing a
Properties object as configuration. |
KafkaConsumer(java.util.Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
A consumer is instantiated by providing a
Properties object as configuration, and a key and a value Deserializer . |
Modifier and Type | Method and Description |
---|---|
void |
assign(java.util.Collection<TopicPartition> partitions)
This method is not yet supported.
|
java.util.Set<TopicPartition> |
assignment()
This method is not yet supported.
|
java.util.Map<TopicPartition,java.lang.Long> |
beginningOffsets(java.util.Collection<TopicPartition> partitions)
This method is not yet supported.
|
java.util.Map<TopicPartition,java.lang.Long> |
beginningOffsets(java.util.Collection<TopicPartition> partitions, java.time.Duration timeout)
This method is not yet supported.
|
void |
close()
Tries to close the consumer cleanly.
|
void |
close(java.time.Duration timeout)
Tries to close the consumer cleanly.
|
void |
close(long timeout, java.util.concurrent.TimeUnit timeUnit)
Deprecated.
|
void |
commitAsync()
Commit mesages consumed using
poll(Duration) for the subscribed topic and assigned partition in this session since last commit. |
void |
commitAsync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, OffsetCommitCallback callback)
This method is not yet supported.
|
void |
commitAsync(OffsetCommitCallback callback)
Commits messages(offsets) consumed using
) since last commit in this session. |
void |
commitSync()
Commits messages(offsets) consumed using
poll() since last commit in this session. |
void |
commitSync(java.time.Duration timeout)
Commits messages(offsets) consumed using
poll() since last commit in this session. |
void |
commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets)
This method not yet supported.
|
void |
commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.time.Duration timeout)
This method not yet supported.
|
OffsetAndMetadata |
committed(TopicPartition partition)
This method is not yet supported.
|
OffsetAndMetadata |
committed(TopicPartition partition, java.time.Duration timeout)
This method is not yet supported.
|
java.util.Map<TopicPartition,java.lang.Long> |
endOffsets(java.util.Collection<TopicPartition> partitions)
This method is not yet supported.
|
java.util.Map<TopicPartition,java.lang.Long> |
endOffsets(java.util.Collection<TopicPartition> partitions, java.time.Duration timeout)
This method is not yet supported.
|
java.util.Map<java.lang.String,java.util.List<PartitionInfo>> |
listTopics()
This method is not yet supported.
|
java.util.Map<java.lang.String,java.util.List<PartitionInfo>> |
listTopics(java.time.Duration timeout)
This method is not yet supported.
|
java.util.Map<MetricName,? extends Metric> |
metrics()
This method is not yet supported.
|
java.util.Map<TopicPartition,OffsetAndTimestamp> |
offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch)
This method is not yet supported.
|
java.util.Map<TopicPartition,OffsetAndTimestamp> |
offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch, java.time.Duration timeout)
This method is not yet supported.
|
java.util.List<PartitionInfo> |
partitionsFor(java.lang.String topic)
This method is not yet supported.
|
java.util.List<PartitionInfo> |
partitionsFor(java.lang.String topic, java.time.Duration timeout)
This method is not yet supported.
|
void |
pause(java.util.Collection<TopicPartition> partitions)
This method is not yet supported.
|
java.util.Set<TopicPartition> |
paused()
This method is not yet supported.
|
ConsumerRecords<K,V> |
poll(java.time.Duration timeout)
Fetch data for the topic specified using
subscribe(Collection) APIs. |
ConsumerRecords<K,V> |
poll(long timeout)
Deprecated.
|
long |
position(TopicPartition partition)
This method is not yet supported.
|
long |
position(TopicPartition partition, java.time.Duration timeout)
This method is not yet supported.
|
void |
resume(java.util.Collection<TopicPartition> partitions)
This method is not yet supported.
|
void |
seek(TopicPartition partition, long offset)
Overrides the fetch offset that the consumer will use on the next
poll(timeout) . |
void |
seekToBeginning(java.util.Collection<TopicPartition> partitions)
Seek to the first available offset for each of the given partitions.
|
void |
seekToEnd(java.util.Collection<TopicPartition> partitions)
Seek to the last offset for each of the given partitions.
|
void |
subscribe(java.util.Collection<java.lang.String> topics)
Subscribe to the given list of topics to get dynamically assigned partitions.
|
void |
subscribe(java.util.Collection<java.lang.String> topics, ConsumerRebalanceListener listener)
Subscribe to the given list of topics to get dynamically assigned partitions.
|
void |
subscribe(java.util.regex.Pattern pattern)
This method is not yet supported.
|
void |
subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceListener listener)
This method is not yet supported.
|
java.util.Set<java.lang.String> |
subscription()
This method is not yet supported.
|
void |
unsubscribe()
Unsubscribe from topic currently subscribed with
subscribe(Collection) . |
void |
wakeup()
This method is not yet supported.
|
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaConsumer
you must always close()
it to avoid resource leaks.
configs
- The consumer configspublic KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
Deserializer
.
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaConsumer
you must always close()
it to avoid resource leaks.
configs
- The consumer configskeyDeserializer
- The deserializer for key that implements Deserializer
. The configure() method won't be called in the consumer when the deserializer is passed in directly.valueDeserializer
- The deserializer for value that implements Deserializer
. The configure() method won't be called in the consumer when the deserializer is passed in directly.public KafkaConsumer(java.util.Properties properties)
Properties
object as configuration.
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaConsumer
you must always close()
it to avoid resource leaks.
properties
- The consumer configuration propertiespublic KafkaConsumer(java.util.Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
Properties
object as configuration, and a key and a value Deserializer
.
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaConsumer
you must always close()
it to avoid resource leaks.
properties
- The consumer configuration propertieskeyDeserializer
- The deserializer for key that implements Deserializer
. The configure() method won't be called in the consumer when the deserializer is passed in directly.valueDeserializer
- The deserializer for value that implements Deserializer
. The configure() method won't be called in the consumer when the deserializer is passed in directly.public java.util.Set<TopicPartition> assignment()
assignment
in interface Consumer<K,V>
assignment()
public java.util.Set<java.lang.String> subscription()
subscription
in interface Consumer<K,V>
subscription()
public void subscribe(java.util.Collection<java.lang.String> topics, ConsumerRebalanceListener listener)
unsubscribe()
. This call has effect only when poll is invoked.
okafka 0.8 doesn't support consumer group rebalance listener i.e. ConsumerRebalanceListener.
subscribe
in interface Consumer<K,V>
topics
- The list of topics to subscribe tolistener
- null if not null is ignoredjava.lang.IllegalArgumentException
- If topics is null or contains null or empty elements or size of topics is greater than one.subscribe(Collection, ConsumerRebalanceListener)
public void subscribe(java.util.Collection<java.lang.String> topics)
unsubscribe()
. This call has effect only when poll is invoked.
This is a short-hand for subscribe(Collection, ConsumerRebalanceListener)
, which uses a no-op listener. okafka 0.8 doesn't support consumer group rebalance listener i.e. ConsumerRebalanceListener.
subscribe
in interface Consumer<K,V>
topics
- The list of topics to subscribe tojava.lang.IllegalArgumentException
- If topics is null or contains null or empty elements or size of topics is greater than one.subscribe(Collection)
public void subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceListener listener)
subscribe
in interface Consumer<K,V>
subscribe(Pattern, ConsumerRebalanceListener)
public void subscribe(java.util.regex.Pattern pattern)
subscribe
in interface Consumer<K,V>
subscribe(Pattern)
public void unsubscribe()
subscribe(Collection)
.unsubscribe
in interface Consumer<K,V>
unsubscribe()
public void assign(java.util.Collection<TopicPartition> partitions)
assign
in interface Consumer<K,V>
assign(Collection)
@Deprecated public ConsumerRecords<K,V> poll(long timeout)
subscribe(Collection)
APIs. It is an error to not have subscribed to any topic before polling for data Consumer maintains a single connection/session to any one of the oracle database instance. Each consumer(session )in a group is assigned a single unique partition of subscribed topic. Hence, Poll fetches data from its assigned partition till connection/session exists. If existing connection lost and connected to any instance of database then consumer(session) might be assigned with new partition of subscribed topic.
On each poll consumer tries to fetch from last consumed message id(offset). If consumer goes down without commiting then all consumed messages are rolled back. and next consumer instance of same group who got this partition starts consuming from last committed msgid or from rolled back point. As of 0.8 okafka, there is no group balancing since each instance sticks with its partition.
poll
in interface Consumer<K,V>
timeout
- The time, in milliseconds, spent waiting in poll.KafkaException
- for any other unrecoverable errors (e.g. errors deserializing key/value pairs, or any new error cases in future versions)java.lang.IllegalArgumentException
- if the timeout value is negativejava.lang.IllegalStateException
- if the consumer is not subscribed to any topic.poll(long)
public ConsumerRecords<K,V> poll(java.time.Duration timeout)
subscribe(Collection)
APIs. It is an error to not have subscribed to any topic before polling for data. Each consumer(session )in a group is assigned a single unique partition of subscribed topic. Hence, Poll fetches data from its assigned partition till connection/session exists. If existing connection lost and connected to any instance of database then consumer(session) might be assigned with new partition of subscribed topic.
On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially.
poll
in interface Consumer<K,V>
timeout
- The time, in milliseconds, spent waiting in poll.KafkaException
- for any other unrecoverable errors (e.g. errors deserializing key/value pairs)java.lang.IllegalArgumentException
- if the timeout value is negativejava.lang.IllegalStateException
- if the consumer is not subscribed to any topic.java.lang.ArithmeticException
- if the timeout is greater than Long.MAX_VALUE
milliseconds.poll(Duration)
public void commitSync()
poll()
since last commit in this session. Commit on session is either successfull or rollback. Commit fails only in rare cases like shutdown. Commit failure results in rollback. If rollback occurs then consumed messages since last commit are considered not consumed. If process restarts after failure then it starts consuming from this position.
This is a synchronous commit and will block until either the commit succeeds or rollback happens. Commit does not take any timeout into account for completion of call.
commitSync
in interface Consumer<K,V>
KafkaException
- for any other unrecoverable errors (i.e topic doesn't exist, session rolled back as db shutdown).commitSync()
public void commitSync(java.time.Duration timeout)
poll()
since last commit in this session. Commit on session is either successfull or rollback. Commit fails only in rare cases like shutdown. Commit failure results in rollback. If rollback occurs then consumed messages since last commit are considered not consumed. If process restarts after failure then it starts consuming from this position.
This is a synchronous commit and will block until either the commit succeeds or rollback happens. Commit does not take any timeout into account for completion of call. This call is equivalent to commitSync().
commitSync
in interface Consumer<K,V>
KafkaException
- for any other unrecoverable errors .commitSync(Duration)
public void commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets)
commitSync
in interface Consumer<K,V>
commitSync(Map)
public void commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.time.Duration timeout)
commitSync
in interface Consumer<K,V>
commitSync(Map, Duration)
public void commitAsync()
poll(Duration)
for the subscribed topic and assigned partition in this session since last commit. This call is equivalent to commitAsync(OffsetCommitCallback)
with null callback.commitAsync
in interface Consumer<K,V>
commitAsync()
public void commitAsync(OffsetCommitCallback callback)
)
since last commit in this session. Commit on session is either successfull or rollback. Commit fails only in rare cases like shutdown. Commit failure results in rollback. If rollback occurs then consumed messages since last commit are considered not consumed. If process restarts after failure then it starts consuming from this rollback position.
Internally this is an synchronous call and blocks until either commit is successful or rolled back. Any errors encountered are either passed to the callback (if provided) or discarded.
commitAsync
in interface Consumer<K,V>
callback
- Callback to invoke when the commit completescommitAsync(OffsetCommitCallback)
public void commitAsync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, OffsetCommitCallback callback)
commitAsync
in interface Consumer<K,V>
commitAsync(Map, OffsetCommitCallback)
public void seek(TopicPartition partition, long offset)
poll(timeout)
. If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Seeking to already consumed offset/message , in current or previous sessions, doesn't reconsume the message.seek
in interface Consumer<K,V>
java.lang.IllegalArgumentException
- if the provided offset is negativeseek(TopicPartition, long)
public void seekToBeginning(java.util.Collection<TopicPartition> partitions)
poll(Duration)
is called. Seeking to already consumed offset/message , in current or previous sessions, doesn't reconsume the message.seekToBeginning
in interface Consumer<K,V>
java.lang.IllegalArgumentException
- if partitions
is null
seekToBeginning(Collection)
public void seekToEnd(java.util.Collection<TopicPartition> partitions)
poll(Duration)
is called. Seeking to already consumed offset/message , in current or previous sessions, doesn't reconsume the message.seekToEnd
in interface Consumer<K,V>
java.lang.IllegalArgumentException
- if partitions
is null
seekToEnd(Collection)
public long position(TopicPartition partition)
position
in interface Consumer<K,V>
position(TopicPartition)
public long position(TopicPartition partition, java.time.Duration timeout)
position
in interface Consumer<K,V>
position(TopicPartition, Duration)
public OffsetAndMetadata committed(TopicPartition partition)
committed
in interface Consumer<K,V>
committed(TopicPartition)
public OffsetAndMetadata committed(TopicPartition partition, java.time.Duration timeout)
committed
in interface Consumer<K,V>
committed(TopicPartition, Duration)
public java.util.Map<MetricName,? extends Metric> metrics()
public java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)
partitionsFor
in interface Consumer<K,V>
partitionsFor(String)
public java.util.List<PartitionInfo> partitionsFor(java.lang.String topic, java.time.Duration timeout)
partitionsFor
in interface Consumer<K,V>
partitionsFor(String, Duration)
public java.util.Map<java.lang.String,java.util.List<PartitionInfo>> listTopics()
listTopics
in interface Consumer<K,V>
listTopics()
public java.util.Map<java.lang.String,java.util.List<PartitionInfo>> listTopics(java.time.Duration timeout)
listTopics
in interface Consumer<K,V>
listTopics(Duration)
public void pause(java.util.Collection<TopicPartition> partitions)
pause
in interface Consumer<K,V>
pause(Collection)
public void resume(java.util.Collection<TopicPartition> partitions)
resume
in interface Consumer<K,V>
resume(Collection)
public java.util.Set<TopicPartition> paused()
public java.util.Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch)
offsetsForTimes
in interface Consumer<K,V>
offsetsForTimes(Map)
public java.util.Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch, java.time.Duration timeout)
offsetsForTimes
in interface Consumer<K,V>
offsetsForTimes(Map, Duration)
public java.util.Map<TopicPartition,java.lang.Long> beginningOffsets(java.util.Collection<TopicPartition> partitions)
beginningOffsets
in interface Consumer<K,V>
beginningOffsets(Collection)
public java.util.Map<TopicPartition,java.lang.Long> beginningOffsets(java.util.Collection<TopicPartition> partitions, java.time.Duration timeout)
beginningOffsets
in interface Consumer<K,V>
beginningOffsets(Collection, Duration)
public java.util.Map<TopicPartition,java.lang.Long> endOffsets(java.util.Collection<TopicPartition> partitions)
endOffsets
in interface Consumer<K,V>
endOffsets(Collection)
public java.util.Map<TopicPartition,java.lang.Long> endOffsets(java.util.Collection<TopicPartition> partitions, java.time.Duration timeout)
endOffsets
in interface Consumer<K,V>
endOffsets(Collection, Duration)
public void close()
@Deprecated public void close(long timeout, java.util.concurrent.TimeUnit timeUnit)
close
in interface Consumer<K,V>
timeout
- Not usedtimeUnit
- Not usedjava.lang.IllegalArgumentException
- If the timeout
is negative.KafkaException
- for any other error during closeclose(long, TimeUnit)
public void close(java.time.Duration timeout)
close
in interface Consumer<K,V>
timeout
- not usedjava.lang.IllegalArgumentException
- If the timeout
is negative.KafkaException
- for any other error during closeclose(Duration)