How to consume custom objects from Kafka Topic ?

In the last post i shared an example on how to send custom java object to Kafka. This blog will share details on how to consume the data from Kafka Topic.

  • We will consume the customer object that we sent to Kafka topic in the last post.
  • Below is a depiction of the use case to be implemented
  • Sample Customer record type
  • Schema for the data will contain 2 files for
    • Address
    • Customer
  • Address Schema
{
  "type": "object",
  "javaType": "com.raj.nola.types.Address",
  "properties": {
    "streetAddress": {"type": "string"},
    "city": {"type": "string"},
    "state": {"type": "string"},
    "country": {"type": "string"},
    "zipCode": {"type": "integer"},
    "contactNumber": {"type": "long"}
  }
}
  • Customer Schema
{
  "type": "object",
  "javaType": "com.raj.nola.types.Customer",
  "properties": {
    "id": {
      "type": "integer"
    },
    "name": {
      "type": "string"
    },
    "email": {
      "type": "string"
    },
    "address": {
      "type": "object",
      "$ref": "Address.json"
    }
  }
}
  • We will use jsonschema2pojo-maven-plugin to generate java classes from the schema
  • javaType field value will be the java file name to be generated
  • address field in customer schema refers to Address, so the generated customer java object will have address data type for address
  • To generate POJOs add following plugin and dependency in pom.xml
<build>
<plugins>
    <plugin>
        <groupId>org.jsonschema2pojo</groupId>
        <artifactId>jsonschema2pojo-maven-plugin</artifactId>
        <version>0.5.1</version>
        <executions>
            <execution>
                <goals>
                    <goal>generate</goal>
                </goals>
                <configuration>
                    <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>               <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    <includeAdditionalProperties>false</includeAdditionalProperties>
                    <includeHashcodeAndEquals>false</includeHashcodeAndEquals>
        <generateBuilders>true</generateBuilders>
            </configuration>
            </execution>
        </executions>
    </plugin>
</plugins>
</build>
  • Other required dependencies for the project are
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>

    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.5.4</version>
    </dependency>

</dependencies>
  • At this stage when you execute – mvn compile, you will have the POJO classes generated
  • At this stage , we have generated the POJO from the schema, now the next steps are
    • Deserialize the records stored in topic to Customer object
    • Print the customer records or objects to console
  • The code to serialize the data is – JsonDeserializer. java ,which is in the package – com.raj.nola.util and it uses jackson libraries to serialize the JSON Object
  • With POJOs generated for Customer, Address and JSON Deserializer in place, next step is to create the consumer program
  • The configurations of topic name, group id etc are in SystemConfig.java file under the package- com.raj.nola.config
public final static String consumerApplicationID = "CustomerConsumerApp";
public final static String bootstrapServers = "localhost:9092,localhost:9093";
public final static String topicName = "customer-demo-topic";
public final static String groupID = "CustomerReaderGroup";
  • Let us now start building the consumer program , Configurations for consumer will be
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, SystemConfig.consumerApplicationID);
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SystemConfig.bootstrapServers);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.VALUE_CLASS_NAME_CONFIG, Customer.class);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, SystemConfig.groupID);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  • Value serializer is our custom –JsonDeserializer.class and its defined in the configuration ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
  • The JSON will be mapped to Customer object and that is defined in the configuration JsonDeserializer.VALUE_CLASS_NAME_CONFIG
  • We want to read all the records when we execute this and therefore we have configured ConsumerConfig.AUTO_OFFSET_RESET_CONFIG as “earliest
  • However when we run it next time, it will read from the offset which has been already read.
  • The code to seek for records is below
consumer.subscribe(Arrays.asList(SystemConfig.topicName));

while (true) {
    ConsumerRecords<Integer, Customer>  records = consumer.poll(Duration.ofMillis(200));
    records.forEach(record -> System.out.println(record.value()));
}
  • Below is the result of executing the program

Share your comments / feedback. Thanks !

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s