How to send custom objects to Kafka

This blog will share an example on how to send custom java object or data to Kafka Producer

  • Use-case
    • Read a data file with customer details
    • Map the data to a customer java object
    • Send the data to a Kafka Topic
  • To build this use case , we will do following steps
    • Create a Kafka topic
    • Build a json schema for the customer data
    • Generate java classes from the schema
    • Create Kafka producer to send data to Kafka Topic
  • The data file that we will use will have records in below format. It primarily consists of customer contact details and address
  • To start , let us create a Java Maven project.
  • Inside the resources folder , create a directory with the name schema to store the schema for customer data
  • Schema for the data will contain 2 files for
    • Address
    • Customer
  • Address Schema
{
  "type": "object",
  "javaType": "com.raj.nola.types.Address",
  "properties": {
    "streetAddress": {"type": "string"},
    "city": {"type": "string"},
    "state": {"type": "string"},
    "country": {"type": "string"},
    "zipCode": {"type": "integer"},
    "contactNumber": {"type": "long"}
  }
}
  • Customer Schema
{
  "type": "object",
  "javaType": "com.raj.nola.types.Customer",
  "properties": {
    "id": {
      "type": "integer"
    },
    "name": {
      "type": "string"
    },
    "email": {
      "type": "string"
    },
    "address": {
      "type": "object",
      "$ref": "Address.json"
    }
  }
}
  • We will use jsonschema2pojo-maven-plugin to generate java classes from the schema
  • javaType field value will be the java file name to be generated
  • address field in customer schema refers to Address, so the generated customer java object will have address data type for address
  • To generate POJOs add following plugin and dependency in pom.xml
<build>
<plugins>
    <plugin>
        <groupId>org.jsonschema2pojo</groupId>
        <artifactId>jsonschema2pojo-maven-plugin</artifactId>
        <version>0.5.1</version>
        <executions>
            <execution>
                <goals>
                    <goal>generate</goal>
                </goals>
                <configuration>
                    <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>               <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    <includeAdditionalProperties>false</includeAdditionalProperties>
                    <includeHashcodeAndEquals>false</includeHashcodeAndEquals>
        <generateBuilders>true</generateBuilders>
            </configuration>
            </execution>
        </executions>
    </plugin>
</plugins>
</build>
  • Other required dependencies for the project are
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>

    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.5.4</version>
    </dependency>

</dependencies>
  • At this stage when you execute – mvn compile, you will have the POJO classes generated
  • In order to send the Object to Kafka, we will have to serialize the record.
  • Serialization is the process of converting an object into a stream of bytes that are used for transmission.
  • The code to serialize the data is – JsonSerializer. java ,which is in the package – com.raj.nola.util and it uses jackson libraries to serialize the JSON Object
  • Now create a program to read the data file and map the records in the file to the POJO classes generated and store the file data in an array.
  • Program for this example is – CustomerFileReader which is in com.raj.nola.reader package.
String DATAFILE = "src/main/resources/data/data.json";

 private Customer[] customers;

private CustomerFileReader() {
    final ObjectMapper mapper;
    mapper = new ObjectMapper();
    ;
    try {
        customers = mapper.readValue(new File(DATAFILE), Customer[].class);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
  • Next step is to create a Kafka Producer program to send the data read from the file to a topic named –customer-demo-topic.
  • Required configurations are mentioned in SystemConfig.java file under the package- com.raj.nola.config
public final static String producerApplicationID = "CustomerProducerApp";
public final static String bootstrapServers = "localhost:9092,localhost:9093";
public final static String topicName = "customer-demo-topic";
  • Now we will create the Producer program to send data to the configured topic.
  • Configurations for the producer will be as below
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, SystemConfig.producerApplicationID);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SystemConfig.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  • The key for the record is an integer and therefore the serialization class is IntegerSerializer
  • The value contains the customer Java object and to send that to Kafka topic we have to provide a value serializer.
  • The value serializer is our custom JsonSerializer class which is in util package.
  • In the producer class, we will loop through the array of customer objects created and will send it to customer-demo topic
for(int i = 0; i < customers.length; i++) {

ProducerRecord<Integer, Customer> record=
new ProducerRecord<Integer, Customer>(SystemConfig.topicName, i,customers[i]);
try {

RecordMetadata recordMetadata = producer.send(record).get();

String message = String.format("sent message to topic:%s partition:%s offset:%s",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
System.out.println(message);

} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
  • Upon executing the program, a Topic will be created and if the records were sent successfully it will print partition number and offset.
  • To verify, you can use the kafka-console-consumer command line.
  • To summarize, this is what the program does

Share your comments / feedback. Thanks !

3 thoughts on “How to send custom objects to Kafka

Leave a Reply to write2dc Cancel reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s