This blog will share an example on how to send custom java object or data to Kafka Producer
- Use-case
- Read a data file with customer details
- Map the data to a customer java object
- Send the data to a Kafka Topic
- To build this use case , we will do following steps
- Create a Kafka topic
- Build a json schema for the customer data
- Generate java classes from the schema
- Create Kafka producer to send data to Kafka Topic

- The data file that we will use will have records in below format. It primarily consists of customer contact details and address

- To start , let us create a Java Maven project.
- Inside the resources folder , create a directory with the name schema to store the schema for customer data
- Schema for the data will contain 2 files for
- Address
- Customer
- Address Schema
{
"type": "object",
"javaType": "com.raj.nola.types.Address",
"properties": {
"streetAddress": {"type": "string"},
"city": {"type": "string"},
"state": {"type": "string"},
"country": {"type": "string"},
"zipCode": {"type": "integer"},
"contactNumber": {"type": "long"}
}
}
- Customer Schema
{
"type": "object",
"javaType": "com.raj.nola.types.Customer",
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string"
},
"email": {
"type": "string"
},
"address": {
"type": "object",
"$ref": "Address.json"
}
}
}
- We will use jsonschema2pojo-maven-plugin to generate java classes from the schema
- javaType field value will be the java file name to be generated
- address field in customer schema refers to Address, so the generated customer java object will have address data type for address
- To generate POJOs add following plugin and dependency in pom.xml
<build>
<plugins>
<plugin>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<version>0.5.1</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<includeAdditionalProperties>false</includeAdditionalProperties>
<includeHashcodeAndEquals>false</includeHashcodeAndEquals>
<generateBuilders>true</generateBuilders>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
- Other required dependencies for the project are
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.5.4</version>
</dependency>
</dependencies>
- At this stage when you execute – mvn compile, you will have the POJO classes generated

- Copy the sample data file to resources/data folder. Link to copy file – https://github.com/rajeshsgr/json-producer-example/tree/master/src/main/resources/data
- In order to send the Object to Kafka, we will have to serialize the record.
- Serialization is the process of converting an object into a stream of bytes that are used for transmission.
- The code to serialize the data is – JsonSerializer. java ,which is in the package – com.raj.nola.util and it uses jackson libraries to serialize the JSON Object
- Now create a program to read the data file and map the records in the file to the POJO classes generated and store the file data in an array.
- Program for this example is – CustomerFileReader which is in com.raj.nola.reader package.
String DATAFILE = "src/main/resources/data/data.json";
private Customer[] customers;
private CustomerFileReader() {
final ObjectMapper mapper;
mapper = new ObjectMapper();
;
try {
customers = mapper.readValue(new File(DATAFILE), Customer[].class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- Next step is to create a Kafka Producer program to send the data read from the file to a topic named –customer-demo-topic.
- Required configurations are mentioned in SystemConfig.java file under the package- com.raj.nola.config
public final static String producerApplicationID = "CustomerProducerApp"; public final static String bootstrapServers = "localhost:9092,localhost:9093"; public final static String topicName = "customer-demo-topic";
- Now we will create the Producer program to send data to the configured topic.
- Configurations for the producer will be as below
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, SystemConfig.producerApplicationID);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SystemConfig.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
- The key for the record is an integer and therefore the serialization class is IntegerSerializer
- The value contains the customer Java object and to send that to Kafka topic we have to provide a value serializer.
- The value serializer is our custom JsonSerializer class which is in util package.
- In the producer class, we will loop through the array of customer objects created and will send it to customer-demo topic
for(int i = 0; i < customers.length; i++) {
ProducerRecord<Integer, Customer> record=
new ProducerRecord<Integer, Customer>(SystemConfig.topicName, i,customers[i]);
try {
RecordMetadata recordMetadata = producer.send(record).get();
String message = String.format("sent message to topic:%s partition:%s offset:%s",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
System.out.println(message);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
- Upon executing the program, a Topic will be created and if the records were sent successfully it will print partition number and offset.
- To verify, you can use the kafka-console-consumer command line.

- To summarize, this is what the program does

- Link to git hub repo – https://github.com/rajeshsgr/json-producer-example
Share your comments / feedback. Thanks !
Thanks for sharing,
LikeLiked by 1 person
Insightful, Thanks for sharing.
LikeLiked by 1 person
Very useful article , very neat and clear thank you Rajeshsgr
LikeLiked by 1 person