In the last post i shared an example on how to send custom java object to Kafka. This blog will share details on how to consume the data from Kafka Topic.
- We will consume the customer object that we sent to Kafka topic in the last post.
- Below is a depiction of the use case to be implemented

- Sample Customer record type

- Schema for the data will contain 2 files for
- Address
- Customer
- Address Schema
{
"type": "object",
"javaType": "com.raj.nola.types.Address",
"properties": {
"streetAddress": {"type": "string"},
"city": {"type": "string"},
"state": {"type": "string"},
"country": {"type": "string"},
"zipCode": {"type": "integer"},
"contactNumber": {"type": "long"}
}
}
- Customer Schema
{
"type": "object",
"javaType": "com.raj.nola.types.Customer",
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string"
},
"email": {
"type": "string"
},
"address": {
"type": "object",
"$ref": "Address.json"
}
}
}
- We will use jsonschema2pojo-maven-plugin to generate java classes from the schema
- javaType field value will be the java file name to be generated
- address field in customer schema refers to Address, so the generated customer java object will have address data type for address
- To generate POJOs add following plugin and dependency in pom.xml
<build>
<plugins>
<plugin>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<version>0.5.1</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<includeAdditionalProperties>false</includeAdditionalProperties>
<includeHashcodeAndEquals>false</includeHashcodeAndEquals>
<generateBuilders>true</generateBuilders>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
- Other required dependencies for the project are
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.5.4</version>
</dependency>
</dependencies>
- At this stage when you execute – mvn compile, you will have the POJO classes generated

- At this stage , we have generated the POJO from the schema, now the next steps are
- Deserialize the records stored in topic to Customer object
- Print the customer records or objects to console
- The code to serialize the data is – JsonDeserializer. java ,which is in the package – com.raj.nola.util and it uses jackson libraries to serialize the JSON Object
- With POJOs generated for Customer, Address and JSON Deserializer in place, next step is to create the consumer program
- The configurations of topic name, group id etc are in SystemConfig.java file under the package- com.raj.nola.config
public final static String consumerApplicationID = "CustomerConsumerApp";
public final static String bootstrapServers = "localhost:9092,localhost:9093";
public final static String topicName = "customer-demo-topic";
public final static String groupID = "CustomerReaderGroup";
- Let us now start building the consumer program , Configurations for consumer will be
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, SystemConfig.consumerApplicationID);
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SystemConfig.bootstrapServers);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.VALUE_CLASS_NAME_CONFIG, Customer.class);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, SystemConfig.groupID);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- Value serializer is our custom –JsonDeserializer.class and its defined in the configuration ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
- The JSON will be mapped to Customer object and that is defined in the configuration JsonDeserializer.VALUE_CLASS_NAME_CONFIG
- We want to read all the records when we execute this and therefore we have configured ConsumerConfig.AUTO_OFFSET_RESET_CONFIG as “earliest“
- However when we run it next time, it will read from the offset which has been already read.
- The code to seek for records is below
consumer.subscribe(Arrays.asList(SystemConfig.topicName));
while (true) {
ConsumerRecords<Integer, Customer> records = consumer.poll(Duration.ofMillis(200));
records.forEach(record -> System.out.println(record.value()));
}
- Below is the result of executing the program

- Link to git hub repo – https://github.com/rajeshsgr/json-consumer-example
Share your comments / feedback. Thanks !
very interesting points you have noted , thankyou for posting .
This blog is definitely rather handy since I’m at the moment creating an internet floral website – although I am only starting out therefore it’s really fairly small, nothing like this site. Can link to a few of the posts here as they are quite. Thanks much. Zoey Olsen