Class KafkaConsumer<K,V>

java.lang.Object
org.oracle.okafka.clients.consumer.KafkaConsumer<K,V>
All Implemented Interfaces:
Closeable, AutoCloseable, org.apache.kafka.clients.consumer.Consumer<K,V>

public class KafkaConsumer<K,V> extends Object implements org.apache.kafka.clients.consumer.Consumer<K,V>
A Java client that consumes records from a Transactional event queues(TxEventQ). The consumer maintains a single JDBC Connection to any one of the available instances of Oracle database to fetch consumer records. Failure to close the consumer after use will leak this connection. The client transparently handles the failure of Oracle database instances, and transparently adapts as topic partitions it fetches migrate within the Oracle database cluster. THis client also allow groups of consumer to load balance consumption using consumer groups.

Consumer Groups and Topic Subscriptions

TxEventQ supports Apache Kafka's concept of consumer groups which allow a pool of processes to divide the work of consuming and processing records and thus facilitating load balancing. These processes can either be running on the same machine or they can be distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances sharing the same group.id will be part of the same consumer group. A Consumer can subscribe to single topic using subscribe or subscribe(Collection, ConsumerRebalanceListener). Consumer throws FeatureNotSupportedException exception if topic subscription collection size is greater than one. Also consumers can't subscribe using subscribe(Pattern), subscribe(Pattern, ConsumerRebalanceListener), as pattern based subscription is not supported for this release. Note: Topic Name must be passed in upper case. Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved from existing consumers to the new one. This is known as rebalancing the group. In addition, when group reassignment happens automatically, consumers can be notified through a ConsumerRebalanceListener, which allows them to finish necessary application-level logic such as state cleanup, manual offset commits, etc. For this release of OKafka, manually assignment of partition is not supported. Application will get FeatureNotSupportedException if assign(Collection) method is invoked.

Detecting Consumer Failures

After subscribing to a set of topics, the consumer will automatically join the group when poll(Duration) is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, OKafka consumer maintains a JDBC connection to the Oracle database. If consumer crashes, its connection to the Oracle Database gets severed and then the consumer will be considered dead and its partitions will be reassigned. OKafka consumer does not send heartbeat to the Oracle database. Also for this release of Okafka,
  • max.poll.interval.ms is also not supported.

    Offsets and Consumer Position

    TxEventQ maintains an offset for each record of a partition in an internal format.This is equivalent to Apache Kafka's Consumer Record offset. This offset or acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. The position of consumer depends on committed position. This is the last offset that has been stored securely. Should the process starts or fail and restart , this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync). Below examples show how to use periodic or manual offset commit.

    Automatic Offset Committing

    This example demonstrates a simple usage of oKafka's consumer api that relies on automatic offset committing.

     
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:1521");
         props.put("oracle.service.name", "freepdb1");
         props.put("oracle.net.tns_admin","."); 
         props.put("group.id", "CG1");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "10000");
         props.put("key.deserializer",  "org.apache.kafka.common.serialization.StringDeserializer");	      
         props.put("value.deserializer",    "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("max.poll.records", 100);
         KafkaConsumer<String, String> consumer = null;
    	   consumer = new KafkaConsumer<String, String>(props);
         consumer.subscribe(Arrays.asList("TXEQ"));
         ConsumerRecords<String, String> records = null; 
         try {
           records = consumer.poll(Duration.ofMillis(1000));
           for (ConsumerRecord<String, String> record : records) {
             System.out.println("topic = , partition=  ,key= , value = \n"+ 		 	  	             
               record.topic()+ "  "+record.partition()+ "  "+record.key()+"  "+ record.value());                  
           }
           consumer.commitSync();		 	  	    	 
         }catch(Exception ex) {
            ex.printStackTrace(); 
         } finally {
            consumer.close();
         }
    
     
    The connection to the Oracle Database cluster is bootstrapped by specifying a one Oracle Cluster node to contact using the configuration bootstrap.servers.

    Setting enable.auto.commit means that offsets are committed automatically with a frequency controlled by the config auto.commit.interval.ms.

    In this example the consumer is subscribing to the topic TXEQ as part of a group of consumers called CG1 as configured with group.id.

    The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we are saying that our record's key and value will just be simple strings.

    Manual Offset Control

    Instead of relying on the consumer to periodically commit consumed offsets, users can also control when records should be considered as consumed and hence commit their offsets. This is useful when the consumption of the messages is coupled with some processing logic and hence a message should not be considered as consumed until it is completed processing.

     
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:1521");
     props.put("oracle.service.name", "freepdb1");
     props.put("oracle.net.tns_admin", ".");
     props.put("group.id", "CG1");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "10000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("max.poll.records", 100);
     KafkaConsumer<String, String> consumer = null;
     consumer = new KafkaConsumer<String, String>(props);
     consumer.subscribe(Arrays.asList("TXEQ"));
     ConsumerRecords<String, String> records = null;
     try {
     	final int minBatchSize = 200;
     	List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     	while (true) {
     		ConsumerRecords<String, String> records = consumer.poll(100);
     		for (ConsumerRecord<String, String> record : records) {
     			buffer.add(record);
     		}
     		if (buffer.size() <= minBatchSize) {
     			insertIntoDb(buffer);
     			consumer.commitSync();
     			buffer.clear();
     		}
     	}
     } catch (Exception ex) {
     	ex.printStackTrace();
     } finally {
     	consumer.close();
     }
     
     
    In this example we will consume a batch of records and batch them up in memory. When we have enough records batched, we will insert them into a database. If our process fails before commitSync() then all consumed messages after previous commit are rolled back and considered not consumed. If process restarted it starts consuming from next of previous committed offset. In this way, OKafka provides "at-least-once" delivery guarantees, as each record will likely be delivered one time but in failure case could be duplicated. With OKafka, "exactly-once" delivery guarantees is possible with the use of getDBConnection() method. Using getDBConnection(), application can retrieve the Oracle database connection which was used to consume the records by the OKafka consumer. Application can use this database connection to store the processed records in database. After that when commitSync() is invoked, the consumption and storage of record into database is committed atomically. Below example depicts that. *
     
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:1521");
     props.put("oracle.service.name", "freepdb1");
     props.put("oracle.net.tns_admin", ".");
     props.put("group.id", "CG1");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "10000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("max.poll.records", 100);
     KafkaConsumer<String, String> consumer = null;
     consumer = new KafkaConsumer<String, String>(props);
     consumer.subscribe(Arrays.asList("TXEQ"));
     ConsumerRecords<String, String> records = null;
     try {
     	List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     	while (true) {
     		ConsumerRecords<String, String> records = consumer.poll(100);
     		Connection conn = ((KafkaConsumer<String, String>) consumer).getDBConnection();
     		for (ConsumerRecord<String, String> record : records) {
     			buffer.add(record);
     		}
     		insertIntoDb(buffer, conn);
     		consumer.commitSync();
     		buffer.clear();
     	}
     } catch (Exception ex) {
     	ex.printStackTrace();
     } finally {
     	consumer.close();
     }
      
     
    For this release of OKafka, commitSync(offsets) methods to manually commit the offset is not supported.

    Multi-threaded Processing

    The okafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException. For this release of OKafka, wakeup() is not supported. Invoking the api would not throw FeatureNotSupportedException.

    • Nested Class Summary

      Nested Classes
      Modifier and Type
      Class
      Description
      static class 
       
    • Constructor Summary

      Constructors
      Constructor
      Description
      A consumer is instantiated by providing a set of key-value pairs as configuration.
      KafkaConsumer(Map<String,Object> configs, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
      A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value Deserializer.
      A consumer is instantiated by providing a Properties object as configuration.
      KafkaConsumer(Properties properties, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
      A consumer is instantiated by providing a Properties object as configuration, and a key and a value Deserializer.
    • Method Summary

      Modifier and Type
      Method
      Description
      void
      assign(Collection<org.apache.kafka.common.TopicPartition> partitions)
      This method is not yet supported.
      Set<org.apache.kafka.common.TopicPartition>
      Get the set of partitions currently assigned to this consumer using topic subscription.
      Map<org.apache.kafka.common.TopicPartition,Long>
      beginningOffsets(Collection<org.apache.kafka.common.TopicPartition> partitions)
      This method is not yet supported.
      Map<org.apache.kafka.common.TopicPartition,Long>
      beginningOffsets(Collection<org.apache.kafka.common.TopicPartition> partitions, Duration timeout)
      This method is not yet supported.
      org.apache.kafka.common.Uuid
       
      void
      Tries to close the consumer cleanly.
      void
      close(long timeout, TimeUnit timeUnit)
      Deprecated.
      void
      close(Duration timeout)
      Tries to close the consumer cleanly.
      void
      Commit mesages consumed using poll(Duration) for the subscribed topic and assigned partition in this session since last commit.
      void
      commitAsync(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.OffsetCommitCallback callback)
      This method is not yet supported.
      void
      commitAsync(org.apache.kafka.clients.consumer.OffsetCommitCallback callback)
      Commits messages(offsets) consumed using ) since last commit in this session.
      void
      Commits messages(offsets) consumed using poll() since last commit in this session.
      void
      Commits messages(offsets) consumed using poll() since last commit in this session.
      void
      commitSync(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
      This method not yet supported.
      void
      commitSync(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, Duration timeout)
      This method not yet supported.
      Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata>
      committed(Set<org.apache.kafka.common.TopicPartition> partitions)
       
      Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata>
      committed(Set<org.apache.kafka.common.TopicPartition> partitions, Duration timeout)
       
      org.apache.kafka.clients.consumer.OffsetAndMetadata
      committed(org.apache.kafka.common.TopicPartition partition)
      This method is not yet supported.
      org.apache.kafka.clients.consumer.OffsetAndMetadata
      committed(org.apache.kafka.common.TopicPartition partition, Duration timeout)
      This method is not yet supported.
      currentLag(org.apache.kafka.common.TopicPartition topicPartition)
       
      Map<org.apache.kafka.common.TopicPartition,Long>
      endOffsets(Collection<org.apache.kafka.common.TopicPartition> partitions)
      This method is not yet supported.
      Map<org.apache.kafka.common.TopicPartition,Long>
      endOffsets(Collection<org.apache.kafka.common.TopicPartition> partitions, Duration timeout)
      This method is not yet supported.
      void
       
      void
       
      Get the Oracle database connection used to consume records from Oracle Transactional Event Queue.
      org.apache.kafka.clients.consumer.ConsumerGroupMetadata
       
      Map<String,List<org.apache.kafka.common.PartitionInfo>>
      This method is not yet supported.
      Map<String,List<org.apache.kafka.common.PartitionInfo>>
      This method is not yet supported.
      Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>
      This method is not yet supported.
      Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp>
      offsetsForTimes(Map<org.apache.kafka.common.TopicPartition,Long> timestampsToSearch)
      This method is not yet supported.
      Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp>
      offsetsForTimes(Map<org.apache.kafka.common.TopicPartition,Long> timestampsToSearch, Duration timeout)
      This method is not yet supported.
      List<org.apache.kafka.common.PartitionInfo>
      This method is not yet supported.
      List<org.apache.kafka.common.PartitionInfo>
      partitionsFor(String topic, Duration timeout)
      This method is not yet supported.
      void
      pause(Collection<org.apache.kafka.common.TopicPartition> partitions)
      This method is not yet supported.
      Set<org.apache.kafka.common.TopicPartition>
      This method is not yet supported.
      org.apache.kafka.clients.consumer.ConsumerRecords<K,V>
      poll(long timeout)
      Deprecated.
      org.apache.kafka.clients.consumer.ConsumerRecords<K,V>
      poll(Duration timeout)
      Fetch data for the topic specified using subscribe(Collection) APIs.
      long
      position(org.apache.kafka.common.TopicPartition partition)
      This method is not yet supported.
      long
      position(org.apache.kafka.common.TopicPartition partition, Duration timeout)
      This method is not yet supported.
      void
      resume(Collection<org.apache.kafka.common.TopicPartition> partitions)
      This method is not yet supported.
      void
      seek(org.apache.kafka.common.TopicPartition partition, long offset)
      Overrides the fetch offset that the consumer will use on the next poll(timeout).
      void
      seek(org.apache.kafka.common.TopicPartition partition, org.apache.kafka.clients.consumer.OffsetAndMetadata offsetAndMetadata)
       
      void
      seekToBeginning(Collection<org.apache.kafka.common.TopicPartition> partitions)
      Seek to the first available offset for each of the given partitions.
      void
      seekToEnd(Collection<org.apache.kafka.common.TopicPartition> partitions)
      Seek to the last offset for each of the given partitions.
      void
      Subscribe to the given list of topics to get partitions assigned dynamically.
      void
      subscribe(Collection<String> topics, org.apache.kafka.clients.consumer.ConsumerRebalanceListener listener)
      Subscribe to the given list of topics to get partitions assigned dynamically.
      void
      subscribe(Pattern pattern)
      This method is not yet supported.
      void
      subscribe(Pattern pattern, org.apache.kafka.clients.consumer.ConsumerRebalanceListener listener)
      This method is not yet supported.
      Get the current subscription.
      void
      Unsubscribe from topic currently subscribed with subscribe(Collection).
      void
      This method is not yet supported.

      Methods inherited from class java.lang.Object

      clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Details

      • KafkaConsumer

        public KafkaConsumer(Map<String,Object> configs)
        A consumer is instantiated by providing a set of key-value pairs as configuration. Values can be either strings or objects of the appropriate type (for example a numeric configuration would accept either the string "42" or the integer 42).

        Valid configuration strings are documented at ConsumerConfig.

        Note: after creating a KafkaConsumer you must always close() it to avoid resource leaks.

        Parameters:
        configs - The consumer configs
      • KafkaConsumer

        public KafkaConsumer(Map<String,Object> configs, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
        A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value Deserializer.

        Valid configuration strings are documented at ConsumerConfig.

        Note: after creating a KafkaConsumer you must always close() it to avoid resource leaks.

        Parameters:
        configs - The consumer configs
        keyDeserializer - The deserializer for key that implements Deserializer. The configure() method won't be called in the consumer when the deserializer is passed in directly.
        valueDeserializer - The deserializer for value that implements Deserializer. The configure() method won't be called in the consumer when the deserializer is passed in directly.
      • KafkaConsumer

        public KafkaConsumer(Properties properties)
        A consumer is instantiated by providing a Properties object as configuration.

        Valid configuration strings are documented at ConsumerConfig.

        Note: after creating a KafkaConsumer you must always close() it to avoid resource leaks.

        Parameters:
        properties - The consumer configuration properties
      • KafkaConsumer

        public KafkaConsumer(Properties properties, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
        A consumer is instantiated by providing a Properties object as configuration, and a key and a value Deserializer.

        Valid configuration strings are documented at ConsumerConfig.

        Note: after creating a KafkaConsumer you must always close() it to avoid resource leaks.

        Parameters:
        properties - The consumer configuration properties
        keyDeserializer - The deserializer for key that implements Deserializer. The configure() method won't be called in the consumer when the deserializer is passed in directly.
        valueDeserializer - The deserializer for value that implements Deserializer. The configure() method won't be called in the consumer when the deserializer is passed in directly.
    • Method Details

      • getDBConnection

        public Connection getDBConnection() throws org.apache.kafka.common.KafkaException
        Get the Oracle database connection used to consume records from Oracle Transactional Event Queue.
        Throws:
        org.apache.kafka.common.KafkaException
      • assignment

        public Set<org.apache.kafka.common.TopicPartition> assignment()
        Get the set of partitions currently assigned to this consumer using topic subscription. (which may be none if the assignment hasn't happened yet, or the partitions are in the process of getting reassigned).
        Specified by:
        assignment in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • subscription

        public Set<String> subscription()
        Get the current subscription. Will return the same topics used in the most recent call to subscribe(Collection, ConsumerRebalanceListener) , or an empty set if no such call has been made.
        Specified by:
        subscription in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • subscribe

        public void subscribe(Collection<String> topics, org.apache.kafka.clients.consumer.ConsumerRebalanceListener listener)
        Subscribe to the given list of topics to get partitions assigned dynamically. However OKafka 23.4.0.0 supports subscription to only a single topic. Partitions are assigned dynamically to consumer based on partition.assignment.strategy.

        This method takes an object of ConsumerRebalanceListener. Its onPartitionsAssigned method will be invoked when partitions are assigned to this consumer. Similarly onPartitionsRevoked will be invoked when partitions are revoked.

        Topic subscriptions are not incremental. This list will replace the current assignment (if there is one). . If the given list of topics is empty, it is treated the same as unsubscribe(). This call has effect only when poll is invoked.

        Specified by:
        subscribe in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Parameters:
        topics - The list of topics to subscribe to
        listener - null if not null is ignored
        Throws:
        IllegalArgumentException - If topics is null or contains null or empty elements or size of topics is greater than one.
      • subscribe

        public void subscribe(Collection<String> topics)
        Subscribe to the given list of topics to get partitions assigned dynamically. However OKafka 23.4.0.0 supports subscription to only a single topic. Partitions are assigned dynamically to consumer based on partition.assignment.strategy. Topic subscriptions are not incremental. This list will replace the current assignment (if there is one). . If the given list of topics is empty, it is treated the same as unsubscribe(). This call has effect only when poll is invoked.

        This is a short-hand for subscribe(Collection, ConsumerRebalanceListener), which uses a no-op listener be default.

        Specified by:
        subscribe in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Parameters:
        topics - The list of topics to subscribe to
        Throws:
        IllegalArgumentException - If topics is null or contains null or empty elements or size of topics is greater than one.
      • subscribe

        public void subscribe(Pattern pattern, org.apache.kafka.clients.consumer.ConsumerRebalanceListener listener)
        This method is not yet supported.
        Specified by:
        subscribe in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • subscribe

        public void subscribe(Pattern pattern)
        This method is not yet supported.
        Specified by:
        subscribe in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • unsubscribe

        public void unsubscribe()
        Unsubscribe from topic currently subscribed with subscribe(Collection).
        Specified by:
        unsubscribe in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • assign

        public void assign(Collection<org.apache.kafka.common.TopicPartition> partitions)
        This method is not yet supported.
        Specified by:
        assign in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • poll

        @Deprecated public org.apache.kafka.clients.consumer.ConsumerRecords<K,V> poll(long timeout)
        Deprecated.
        Fetch data for the topic specified using subscribe(Collection) APIs. It is an error to not have subscribed to any topic before polling for data Consumer maintains a single connection/session to any one of the oracle database instances. Each consumer in a consumer group is dynamically assigned partition(s) of subscribed topic.

        On each poll, consumer consumes messages from the last fetch position(offset). If consumer is closed or application crashes without committing the consumed records then all uncommitted consumed messages are made available again. Next consumer who consumed from these partitions will start consuming records from the last committed offset. This method returns immediately if there are records available. Otherwise, it will await the passed timeout. If the timeout expires, an empty record set will be returned. Note that this method may block beyond the timeout in order to complete partition rebalancing.

        Specified by:
        poll in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Parameters:
        timeout - The time, in milliseconds, spent waiting in poll.
        Returns:
        map of topic to records since the last fetch for the subscribed list of topic.
        Throws:
        org.oracle.okafka.common.KafkaException - for any other unrecoverable errors (e.g. errors deserializing key/value pairs, or any new error cases in future versions)
        IllegalArgumentException - if the timeout value is negative
        IllegalStateException - if the consumer is not subscribed to any topic.
        See Also:
        • "Section 'Consumer Groups and Topic Subscriptions' in org.oracle.okafka.clients.consumer.KafkaConsumer documentation to understand 'rebalancing'"
      • poll

        public org.apache.kafka.clients.consumer.ConsumerRecords<K,V> poll(Duration timeout)
        Fetch data for the topic specified using subscribe(Collection) APIs. It is an error to not have subscribed to any topic before polling for data. Each consumer(session )in a group is assigned a single unique partition of subscribed topic. Hence, Poll fetches data from its assigned partition till connection/session exists. If existing connection lost and connected to any instance of database then consumer(session) might be assigned with new partition of subscribed topic.

        On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially.

        Specified by:
        poll in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Parameters:
        timeout - The time, in milliseconds, spent waiting in poll.
        Returns:
        map of topic to records since the last fetch for the subscribed list of topic.
        Throws:
        org.oracle.okafka.common.KafkaException - for any other unrecoverable errors (e.g. errors deserializing key/value pairs)
        IllegalArgumentException - if the timeout value is negative
        IllegalStateException - if the consumer is not subscribed to any topic.
        ArithmeticException - if the timeout is greater than Long.MAX_VALUE milliseconds.
      • commitSync

        public void commitSync()
        Commits messages(offsets) consumed using poll() since last commit in this session. Commit on session is either successfull or rollback. Commit fails only in rare cases like shutdown. Commit failure results in rollback. If rollback occurs then consumed messages since last commit are considered not consumed. If process restarts after failure then it starts consuming from this position.

        This is a synchronous commit and will block until either the commit succeeds or rollback happens. Commit does not take any timeout into account for completion of call.

        Specified by:
        commitSync in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Throws:
        org.oracle.okafka.common.KafkaException - for any other unrecoverable errors (i.e topic doesn't exist, session rolled back as db shutdown).
      • commitSync

        public void commitSync(Duration timeout)
        Commits messages(offsets) consumed using poll() since last commit in this session. Commit on session is either successfull or rollback. Commit fails only in rare cases like shutdown. Commit failure results in rollback. If rollback occurs then consumed messages since last commit are considered not consumed. If process restarts after failure then it starts consuming from this position.

        This is a synchronous commit and will block until either the commit succeeds or rollback happens. Commit does not take any timeout into account for completion of call. This call is equivalent to commitSync().

        Specified by:
        commitSync in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Throws:
        org.oracle.okafka.common.KafkaException - for any other unrecoverable errors .
      • commitSync

        public void commitSync(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
        This method not yet supported.
        Specified by:
        commitSync in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • commitSync

        public void commitSync(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, Duration timeout)
        This method not yet supported.
        Specified by:
        commitSync in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • commitAsync

        public void commitAsync()
        Commit mesages consumed using poll(Duration) for the subscribed topic and assigned partition in this session since last commit. This call is equivalent to commitAsync(OffsetCommitCallback) with null callback.
        Specified by:
        commitAsync in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • commitAsync

        public void commitAsync(org.apache.kafka.clients.consumer.OffsetCommitCallback callback)
        Commits messages(offsets) consumed using ) since last commit in this session. Commit on session is either successfull or rollback. Commit fails only in rare cases like shutdown. Commit failure results in rollback. If rollback occurs then consumed messages since last commit are considered not consumed. If process restarts after failure then it starts consuming from this rollback position.

        Internally this is an synchronous call and blocks until either commit is successful or rolled back. Any errors encountered are either passed to the callback (if provided) or discarded.

        Specified by:
        commitAsync in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Parameters:
        callback - Callback to invoke when the commit completes
      • commitAsync

        public void commitAsync(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.OffsetCommitCallback callback)
        This method is not yet supported.
        Specified by:
        commitAsync in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • seek

        public void seek(org.apache.kafka.common.TopicPartition partition, long offset)
        Overrides the fetch offset that the consumer will use on the next poll(timeout). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Seeking to already consumed offset/message , in current or previous sessions, doesn't reconsume the message.
        Specified by:
        seek in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Throws:
        IllegalArgumentException - if the provided offset is negative
      • seekToBeginning

        public void seekToBeginning(Collection<org.apache.kafka.common.TopicPartition> partitions)
        Seek to the first available offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only when poll(Duration) is called. Seeking to already consumed offset/message , in current or previous sessions, doesn't reconsume the message.
        Specified by:
        seekToBeginning in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Throws:
        IllegalArgumentException - if partitions is null
      • seekToEnd

        public void seekToEnd(Collection<org.apache.kafka.common.TopicPartition> partitions)
        Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the final offset in all partitions only when poll(Duration) is called. Seeking to already consumed offset/message , in current or previous sessions, doesn't reconsume the message.
        Specified by:
        seekToEnd in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Throws:
        IllegalArgumentException - if partitions is null
      • position

        public long position(org.apache.kafka.common.TopicPartition partition)
        This method is not yet supported.
        Specified by:
        position in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • position

        public long position(org.apache.kafka.common.TopicPartition partition, Duration timeout)
        This method is not yet supported.
        Specified by:
        position in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • committed

        public org.apache.kafka.clients.consumer.OffsetAndMetadata committed(org.apache.kafka.common.TopicPartition partition)
        This method is not yet supported.
        Specified by:
        committed in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • committed

        public org.apache.kafka.clients.consumer.OffsetAndMetadata committed(org.apache.kafka.common.TopicPartition partition, Duration timeout)
        This method is not yet supported.
        Specified by:
        committed in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • metrics

        public Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
        This method is not yet supported.
        Specified by:
        metrics in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • partitionsFor

        public List<org.apache.kafka.common.PartitionInfo> partitionsFor(String topic)
        This method is not yet supported.
        Specified by:
        partitionsFor in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • partitionsFor

        public List<org.apache.kafka.common.PartitionInfo> partitionsFor(String topic, Duration timeout)
        This method is not yet supported.
        Specified by:
        partitionsFor in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • listTopics

        public Map<String,List<org.apache.kafka.common.PartitionInfo>> listTopics()
        This method is not yet supported.
        Specified by:
        listTopics in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • listTopics

        public Map<String,List<org.apache.kafka.common.PartitionInfo>> listTopics(Duration timeout)
        This method is not yet supported.
        Specified by:
        listTopics in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • pause

        public void pause(Collection<org.apache.kafka.common.TopicPartition> partitions)
        This method is not yet supported.
        Specified by:
        pause in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • resume

        public void resume(Collection<org.apache.kafka.common.TopicPartition> partitions)
        This method is not yet supported.
        Specified by:
        resume in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • paused

        public Set<org.apache.kafka.common.TopicPartition> paused()
        This method is not yet supported.
        Specified by:
        paused in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • offsetsForTimes

        public Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp> offsetsForTimes(Map<org.apache.kafka.common.TopicPartition,Long> timestampsToSearch)
        This method is not yet supported.
        Specified by:
        offsetsForTimes in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • offsetsForTimes

        public Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp> offsetsForTimes(Map<org.apache.kafka.common.TopicPartition,Long> timestampsToSearch, Duration timeout)
        This method is not yet supported.
        Specified by:
        offsetsForTimes in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • beginningOffsets

        public Map<org.apache.kafka.common.TopicPartition,Long> beginningOffsets(Collection<org.apache.kafka.common.TopicPartition> partitions)
        This method is not yet supported.
        Specified by:
        beginningOffsets in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • beginningOffsets

        public Map<org.apache.kafka.common.TopicPartition,Long> beginningOffsets(Collection<org.apache.kafka.common.TopicPartition> partitions, Duration timeout)
        This method is not yet supported.
        Specified by:
        beginningOffsets in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • endOffsets

        public Map<org.apache.kafka.common.TopicPartition,Long> endOffsets(Collection<org.apache.kafka.common.TopicPartition> partitions)
        This method is not yet supported.
        Specified by:
        endOffsets in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • endOffsets

        public Map<org.apache.kafka.common.TopicPartition,Long> endOffsets(Collection<org.apache.kafka.common.TopicPartition> partitions, Duration timeout)
        This method is not yet supported.
        Specified by:
        endOffsets in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • close

        public void close()
        Tries to close the consumer cleanly. If auto-commit is enabled, this will commit the current offsets . Close doen't take timeout into consideration.
        Specified by:
        close in interface AutoCloseable
        Specified by:
        close in interface Closeable
        Specified by:
        close in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Throws:
        org.apache.kafka.common.KafkaException - for any other error during close
      • close

        @Deprecated public void close(long timeout, TimeUnit timeUnit)
        Deprecated.
        Tries to close the consumer cleanly. If auto-commit is enabled, this will commit the current offsets . Close doen't take timeout into consideration.
        Parameters:
        timeout - Not used
        timeUnit - Not used
        Throws:
        IllegalArgumentException - If the timeout is negative.
        org.oracle.okafka.common.KafkaException - for any other error during close
      • close

        public void close(Duration timeout)
        Tries to close the consumer cleanly. If auto-commit is enabled, this will commit the current offsets . Close doen't take timeout into consideration.
        Specified by:
        close in interface org.apache.kafka.clients.consumer.Consumer<K,V>
        Parameters:
        timeout - not used
        Throws:
        IllegalArgumentException - If the timeout is negative.
        org.oracle.okafka.common.KafkaException - for any other error during close
      • wakeup

        public void wakeup()
        This method is not yet supported.
        Specified by:
        wakeup in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • seek

        public void seek(org.apache.kafka.common.TopicPartition partition, org.apache.kafka.clients.consumer.OffsetAndMetadata offsetAndMetadata)
        Specified by:
        seek in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • committed

        public Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> committed(Set<org.apache.kafka.common.TopicPartition> partitions)
        Specified by:
        committed in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • committed

        public Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> committed(Set<org.apache.kafka.common.TopicPartition> partitions, Duration timeout)
        Specified by:
        committed in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • groupMetadata

        public org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata()
        Specified by:
        groupMetadata in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • enforceRebalance

        public void enforceRebalance()
        Specified by:
        enforceRebalance in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • enforceRebalance

        public void enforceRebalance(String reason)
        Specified by:
        enforceRebalance in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • clientInstanceId

        public org.apache.kafka.common.Uuid clientInstanceId(Duration timeout)
        Specified by:
        clientInstanceId in interface org.apache.kafka.clients.consumer.Consumer<K,V>
      • currentLag

        public OptionalLong currentLag(org.apache.kafka.common.TopicPartition topicPartition)
        Specified by:
        currentLag in interface org.apache.kafka.clients.consumer.Consumer<K,V>