How to send custom objects to Kafka

This blog will share an example on how to send custom java object or data to Kafka Producer

  • Use-case
    • Read a data file with customer details
    • Map the data to a customer java object
    • Send the data to a Kafka Topic
  • To build this use case , we will do following steps
    • Create a Kafka topic
    • Build a json schema for the customer data
    • Generate java classes from the schema
    • Create Kafka producer to send data to Kafka Topic
  • The data file that we will use will have records in below format. It primarily consists of customer contact details and address
  • To start , let us create a Java Maven project.
  • Inside the resources folder , create a directory with the name schema to store the schema for customer data
  • Schema for the data will contain 2 files for
    • Address
    • Customer
  • Address Schema
{
  "type": "object",
  "javaType": "com.raj.nola.types.Address",
  "properties": {
    "streetAddress": {"type": "string"},
    "city": {"type": "string"},
    "state": {"type": "string"},
    "country": {"type": "string"},
    "zipCode": {"type": "integer"},
    "contactNumber": {"type": "long"}
  }
}
  • Customer Schema
{
  "type": "object",
  "javaType": "com.raj.nola.types.Customer",
  "properties": {
    "id": {
      "type": "integer"
    },
    "name": {
      "type": "string"
    },
    "email": {
      "type": "string"
    },
    "address": {
      "type": "object",
      "$ref": "Address.json"
    }
  }
}
  • We will use jsonschema2pojo-maven-plugin to generate java classes from the schema
  • javaType field value will be the java file name to be generated
  • address field in customer schema refers to Address, so the generated customer java object will have address data type for address
  • To generate POJOs add following plugin and dependency in pom.xml
<build>
<plugins>
    <plugin>
        <groupId>org.jsonschema2pojo</groupId>
        <artifactId>jsonschema2pojo-maven-plugin</artifactId>
        <version>0.5.1</version>
        <executions>
            <execution>
                <goals>
                    <goal>generate</goal>
                </goals>
                <configuration>
                    <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>               <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    <includeAdditionalProperties>false</includeAdditionalProperties>
                    <includeHashcodeAndEquals>false</includeHashcodeAndEquals>
        <generateBuilders>true</generateBuilders>
            </configuration>
            </execution>
        </executions>
    </plugin>
</plugins>
</build>
  • Other required dependencies for the project are
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>

    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.5.4</version>
    </dependency>

</dependencies>
  • At this stage when you execute – mvn compile, you will have the POJO classes generated
  • In order to send the Object to Kafka, we will have to serialize the record.
  • Serialization is the process of converting an object into a stream of bytes that are used for transmission.
  • The code to serialize the data is – JsonSerializer. java ,which is in the package – com.raj.nola.util and it uses jackson libraries to serialize the JSON Object
  • Now create a program to read the data file and map the records in the file to the POJO classes generated and store the file data in an array.
  • Program for this example is – CustomerFileReader which is in com.raj.nola.reader package.
String DATAFILE = "src/main/resources/data/data.json";

 private Customer[] customers;

private CustomerFileReader() {
    final ObjectMapper mapper;
    mapper = new ObjectMapper();
    ;
    try {
        customers = mapper.readValue(new File(DATAFILE), Customer[].class);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
  • Next step is to create a Kafka Producer program to send the data read from the file to a topic named –customer-demo-topic.
  • Required configurations are mentioned in SystemConfig.java file under the package- com.raj.nola.config
public final static String producerApplicationID = "CustomerProducerApp";
public final static String bootstrapServers = "localhost:9092,localhost:9093";
public final static String topicName = "customer-demo-topic";
  • Now we will create the Producer program to send data to the configured topic.
  • Configurations for the producer will be as below
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, SystemConfig.producerApplicationID);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SystemConfig.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  • The key for the record is an integer and therefore the serialization class is IntegerSerializer
  • The value contains the customer Java object and to send that to Kafka topic we have to provide a value serializer.
  • The value serializer is our custom JsonSerializer class which is in util package.
  • In the producer class, we will loop through the array of customer objects created and will send it to customer-demo topic
for(int i = 0; i < customers.length; i++) {

ProducerRecord<Integer, Customer> record=
new ProducerRecord<Integer, Customer>(SystemConfig.topicName, i,customers[i]);
try {

RecordMetadata recordMetadata = producer.send(record).get();

String message = String.format("sent message to topic:%s partition:%s offset:%s",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
System.out.println(message);

} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
  • Upon executing the program, a Topic will be created and if the records were sent successfully it will print partition number and offset.
  • To verify, you can use the kafka-console-consumer command line.
  • To summarize, this is what the program does

Share your comments / feedback. Thanks !

4 thoughts on “How to send custom objects to Kafka

  1. This is the right blog for anyone who wants to find out about this topic. You realize so much its almost hard to argue with you (not that I actually would want…HaHa). You definitely put a new spin on a topic thats been written about for years. Great stuff, just great!

Leave a Reply