public class KafkaProducer<K,V> extends java.lang.Object implements Producer<K,V>
The producer internally stores these records in batches in a buffer pool. And IO thread running in the background sends these batches synchronously one at a time.
The producer is thread safe i.e multiple threads can use same producer instance to publish records.
Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.
Properties props = new Properties();
props.put("oracle.instance.name", "instancename");
props.put("oracle.service.name", "serviceid.regress.rdbms.dev.us.oracle.com");
props.put("oracle.user.name", "username");
props.put("oracle.password", "pwd");
props.put("bootstrap.servers", "localhost:9092");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("retries", 0);
props.put("key.serializer", "org.oracle.okafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.oracle.okafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.
The send()
method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
The acks
config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. OKafka supports only default setting "all"
The retries
config resends the request if request fails with retriable excpetion.
batch.size
config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition).
By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms
to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that records that arrive close together in time will generally batch together even with linger.ms=0
so under heavy load batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency.
The buffer.memory
controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms
after which it throws a TimeoutException.
The key.serializer
and value.serializer
instruct how to turn the key and value objects the user provides with their ProducerRecord
into bytes. You can use the included ByteArraySerializer
or StringSerializer
for simple string or byte types.
The producer doesn't support idempotency and transactional behaviour yet.
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
NETWORK_THREAD_PREFIX |
Constructor and Description |
---|
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs)
A producer is instantiated by providing a set of key-value pairs as configuration.
|
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer)
A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value
Serializer . |
KafkaProducer(java.util.Properties properties)
A producer is instantiated by providing a set of key-value pairs as configuration.
|
KafkaProducer(java.util.Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer)
A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value
Serializer . |
Modifier and Type | Method and Description |
---|---|
void |
abortTransaction()
This api is not yet supported
|
void |
beginTransaction()
This api is not yet supported
|
void |
close()
Close this producer.
|
void |
close(long timeout, java.util.concurrent.TimeUnit timeUnit)
This method waits up to
timeout for the producer to complete the sending of all incomplete requests. |
void |
commitTransaction()
This api is not yet supported
|
void |
flush()
This api is not yet supported
|
void |
initTransactions()
This api is not yet supported
|
java.util.Map<MetricName,? extends Metric> |
metrics()
This api is not yet supported
|
java.util.List<PartitionInfo> |
partitionsFor(java.lang.String topic)
Get the partition metadata for the given topic.
|
java.util.concurrent.Future<RecordMetadata> |
send(ProducerRecord<K,V> record)
Asynchronously send a record to a topic.
|
java.util.concurrent.Future<RecordMetadata> |
send(ProducerRecord<K,V> record, Callback callback)
Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
|
void |
sendOffsetsToTransaction(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.lang.String consumerGroupId)
This api is not yet supported
|
public static final java.lang.String NETWORK_THREAD_PREFIX
public KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs)
Note: after creating a KafkaProducer
you must always close()
it to avoid resource leaks.
configs
- The producer configspublic KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer)
Serializer
. Valid configuration strings are documented here. Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept either the string "42" or the integer 42).
Note: after creating a KafkaProducer
you must always close()
it to avoid resource leaks.
configs
- The producer configskeySerializer
- The serializer for key that implements Serializer
. The configure() method won't be called in the producer when the serializer is passed in directly.valueSerializer
- The serializer for value that implements Serializer
. The configure() method won't be called in the producer when the serializer is passed in directly.public KafkaProducer(java.util.Properties properties)
Note: after creating a KafkaProducer
you must always close()
it to avoid resource leaks.
properties
- The producer configspublic KafkaProducer(java.util.Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer)
Serializer
. Valid configuration strings are documented here.
Note: after creating a KafkaProducer
you must always close()
it to avoid resource leaks.
properties
- The producer configskeySerializer
- The serializer for key that implements Serializer
. The configure() method won't be called in the producer when the serializer is passed in directly.valueSerializer
- The serializer for value that implements Serializer
. The configure() method won't be called in the producer when the serializer is passed in directly.public void initTransactions()
initTransactions
in interface Producer<K,V>
public void beginTransaction() throws ProducerFencedException
beginTransaction
in interface Producer<K,V>
ProducerFencedException
public void sendOffsetsToTransaction(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.lang.String consumerGroupId) throws ProducerFencedException
sendOffsetsToTransaction
in interface Producer<K,V>
ProducerFencedException
public void commitTransaction() throws ProducerFencedException
commitTransaction
in interface Producer<K,V>
ProducerFencedException
public void abortTransaction() throws ProducerFencedException
abortTransaction
in interface Producer<K,V>
ProducerFencedException
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)
send(record, null)
. See send(ProducerRecord, Callback)
for details.public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
The send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent. If buffer memory is full then send call blocks for a maximum of time max.block.ms
.This allows sending many records in parallel without blocking to wait for the response after each one.
The result of the send is a RecordMetadata
specifying the partition the record was sent to, the offset it was assigned and the timestamp of the record. If CreateTime
is used by the topic, the timestamp will be the user provided timestamp or the record send time if the user did not specify a timestamp for the record. If LogAppendTime
is used for the topic, the timestamp will be the TEQ local time when the message is appended. OKafka currently supports only LogAppendTime.
Send call returns a Future
for the RecordMetadata
that will be assigned to this record. Invoking get()
on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
If you want to simulate a simple blocking call you can call the get()
method immediately:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
Fully non-blocking usage can make use of the Callback
parameter to provide a callback that will be invoked when the request is complete.
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the following example callback1
is guaranteed to execute before callback2
:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor
in the callback body to parallelize processing.send
in interface Producer<K,V>
record
- The record to sendcallback
- A user-supplied callback to execute when the record has been acknowledged by the server (null indicates no callback)InterruptException
- If the thread is interrupted while blockedSerializationException
- If the key or value are not valid objects given the configured serializersTimeoutException
- If the time taken for fetching metadata or allocating memory for the record has surpassed max.block.ms
.KafkaException
- If a Kafka related error occurs that does not belong to the public API exceptions.public void flush()
public java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)
partitionsFor
in interface Producer<K,V>
InterruptException
- if the thread is interrupted while blockedTimeoutException
- if metadata could not be refreshed within max.block.ms
KafkaException
- for all Kafka-related exceptions, including the case where this method is called after producer closepublic java.util.Map<MetricName,? extends Metric> metrics()
public void close()
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.
If close() is called from Callback
, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) will be called instead. We do this because the sender thread would otherwise try to join itself and block forever.
close
in interface java.io.Closeable
close
in interface java.lang.AutoCloseable
close
in interface Producer<K,V>
InterruptException
- If the thread is interrupted while blockedpublic void close(long timeout, java.util.concurrent.TimeUnit timeUnit)
timeout
for the producer to complete the sending of all incomplete requests.
If the producer is unable to complete all requests before the timeout expires, this method will fail any unsent and unacknowledged records immediately.
If invoked from within a Callback
this method will not block and will be equivalent to close(0, TimeUnit.MILLISECONDS)
. This is done since no further sending will happen while blocking the I/O thread of the producer.
close
in interface Producer<K,V>
timeout
- The maximum time to wait for producer to complete any pending requests. The value should be non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.timeUnit
- The time unit for the timeout
InterruptException
- If the thread is interrupted while blockedjava.lang.IllegalArgumentException
- If the timeout
is negative.