- The Kafka Producer API allows applications to send messages to the Kafka cluster.
- Producer APIs are simple to use, but this post will talk about what goes on under the hood of the producer when we send data
- Refer this code for a working example of sending messages to a Kafka topic named “time-series”. Example code generates random integers and sends that as the value and the time as key.
- In order to send messages to Kafka , we need to create a ProducerRecord object. The 2 mandatory fields of this object are the topic name and message value. The other optional fields are message key, partition and message timestamp. In the below code snippet we have a Producer Record object with time-series as topic name , system time as key and a random number as the message value.
ProducerRecord<Long, Integer> record= new ProducerRecord<Long, Integer>("time-series", System.nanoTime(),randomNumber);
- Message key serves many purposes such as partitioning, grouping and joining of messages, but while sending the message to Kafka cluster it is used to determine the partition to which the message should be sent in a Topic.
- Producer record takes an optional timestamp field but every message in Kafka is automatically timestamped even if we wont set it . Kafka allows to set one of the 2 timestamp mechanism
- Create Time : Time when the message was produced
- Log append Time: Time when the message was received at the Kafka Broker
- Default message timestamp is the create time .
- This can be configured by setting by the following key for the Topic
- message.timestamp.type=0 for create time
- message.timestamp.type=1 for log append time
- Once the producer record is created, Kafka producer sends it over the network to Kafka Cluster
- The messages are not immediately sent to Kafka cluster . Kafka internally serializes the record, partitions it and buffer it in batches to be picked by a I/O Thread to send those batches of records to the appropriate Kafka brokers.
- First step is serialization and it is necessary to send the data over the network. We need to specify how the key and the value of the message has to be serialized by providing key and value serializer class as part of configuration. In the example code key is Long and value is integer, however in real life scenarios this will be java objects
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
- Once the messages are serialized , now the next step for producer is to determine which partition the messages should be sent to.
- If we specified a partition in the ProducerRecord, the partitioner doesn’t do anything and simply uses the partition we specified. If we didn’t, the partitioner will choose a partition for us, usually based on the ProducerRecord key.
- Once serialized and assigned a target partition number , the message is not immediately sent by the message sits in the buffer . Buffer memory is set as per partition.
- Producer runs a background thread to transfer messages from buffer to Kafka cluster. Default size of the buffer is 32 MB but this can be configured by the following configuration.
- This sets the amount of memory the producer will use to buffer messages waiting to be sent to brokers. If messages are sent by the application faster than they can be delivered to the server, the producer may run out of space and additional send() calls will either block or throw an exception, based on the block.on.buffer.full parameter
- A separate I/O thread sends the serialized messaged in buffer to Kafka Broker.
- When the broker receives the messages, it sends back a response. If the messages were successfully written to Kafka, it will return a RecordMetadata object with the topic, partition, and the offset of the record within the partition.
- If the broker failed to write the messages, it will return an error. When the producer receives an error, it may retry sending the message a few more times before giving up and returning an error.
Thanks. Share your comments and feedback .