Kafka Connect Concepts & Example

  • Kafka Connect is used to connect Kafka with external systems such as database, storage, and messaging systems for data import and export in a scalable and reliable way.
  • The framework executes so-called “connectors” that implement the actual logic to read/write data from other systems.
  • Example: In an organization, we have a custom application for customer management and it uses PostgreSQL as its database. This information is consumed by another application and it uses elastic search to index the customer data. In this scenario we can leverage Kafka connect which will give a source connector to consume data from PostgreSQL database and send it to Kafka cluster. From the Kafka cluster , we can send the data to elastic search by using another Sink connector. All this can be done without writing any code and by just using Kafka Connect
  • We have 2 types of Kafka Connectors and together they support a bunch of systems and offer data integration capability without writing any code.
    • Source Connector
    • Sink Connector
  • Kafka connect supports wide variety of systems and this is possible because many companies / developers have leveraged the open source Kafka Connect framework and has built connectors that can be reused. Therefore by using Kafka Connect you can use existing connector implementations for common data sources and sinks to move data into and out of Kafka.
  • Kafka Connect ships with Apache Kafka, so there is no need to install it separately.
  • So, to summarize Kafka connect helps to copy the data from external systems into Kafka and then propagate the messages from Kafka to external systems by leveraging already available connectors and configuring it for our use case.
  • If the use case is to copy data from a MySQL table , then the steps that we need to perform are -finding the right MYSQL JDBC connector, installing it on Kafka connect , providing configuration details like DB connection parameters, table name and thats all. This will copy the data from the tables to Kafka topic. Now From the Kafka topic if we need to move this to Snowflake, then we need to find the Snowflake connector , provide the configurations and thats all.
  • To understand how it works , it is important to understand a few concepts.
    • Connectors – the high level abstraction that coordinates data streaming by managing task
    • Tasks – the implementation of how data is copied to or from Kafka
    • Workers – the running processes that execute connectors and tasks
    • Converters – the code used to translate data between Connect and the system sending or receiving data
  • Connector is the jar file that has the logic of how to integrate with a particular technology like how to get the data from tables using JDBC drivers or how to connect to a file system
  • Kafka connect cluster can have multiple connectors. Each connector is a re-usable piece of java jar
  •  A connector instance is a logical job that is responsible for managing the copying of data between Kafka and another system
  • Tasks are the main actor in the data model for Connect. Each connector instance coordinates a set of tasks that actually copy the data. 
  • By allowing the connector to break a single job into many tasks, Kafka Connect provides built-in support for parallelism and scalable data copying with very little configuration. 
  • These tasks have no state stored within them. Task state is stored in Kafka in special topics config.storage.topic and status.storage.topic and managed by the associated connector. 
  • Connectors and tasks are logical units of work and must be scheduled to execute in a process. Kafka Connect calls these processes workers and has two types of workers: standalone and distributed.
  • Standalone mode is the simplest mode, where a single process is responsible for executing all connectors and tasks.
  • Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, you start many worker processes using the same group.id and they automatically coordinate to schedule execution of connectors and tasks across all available workers
../_images/worker-model-basics.png
Image Source : confluent.io

A three-node Kafka Connect distributed mode cluster. Connectors (monitoring the source or sink system for changes that require reconfiguring tasks) and tasks (copying a subset of a connector’s data) are automatically balanced across the active workers. The division of work between tasks is shown by the partitions that each task is assigned.

Converters help to change the format of data from one format into another format. Converters are decoupled from connectors to allow reuse of converters between connectors naturally. The Converter used at Source and Sink can take input and output to different set of formats. 

The following graphic shows how converters are used when reading from a database using a JDBC Source Connector, writing to Kafka, and finally, writing to HDFS with an HDFS Sink Connector.

How converters are used for a source and sink data transfer
Image source : confluent.io

Kafka connect supports Single Message Transformations (SMT) as well.

  •  SMTs transform inbound messages after a source connector has produced them, but before they are written to Kafka.
  • SMTs transform outbound messages before they are sent to a sink connector. 
  • Example of common SMTs
    • Filter or rename fields
    • Add a new field in record using metadata
    • Mask some fields with a null value
  • Now lets use Kafka connect to demonstrate a use case to get automatic updates from a PostgreSQL Table with a frequency of 1 minute
  • At the source, we have a customer table. I have only created customer table. You can leverage the scripts from here to create either these tables
  • Step 1 : Start Zookeeper & Kafka Broker cluster
  • Step 2: Get the required connector libraries. In this case, we will need kafka-connect-jdbc and postgreSQL driver. Download the Kafka Connect JDBC plugin from Confluent hub and extract the zip file.
  • Create kafka-connect-jdbc at any path and copy the jars under it
  • Step 3: Create worker properties file and save it . I named it as connect-postgres-demo.properties. plugin.path properties should contain the path of the location where the jars are downloaded
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-pg-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. 
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
status.storage.topic=connect-status
status.storage.replication.factor=1

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

#plugin.path=/usr/local/share/kafka/plugins/kafka-connect-jdbc
#plugin.path=/usr/share/java
plugin.path=/Users/rajeshp/Kafka-Confluent/confluent-6.0.0/share/java/kafka-connect-jdbc
  • Step 4: Run Kafka connect using the below command and passing the above properties file name as parameter. Update plugin.path properties value to the folder where the jars are downloaded in connect-distributed.properties file
connect-distributed.sh  connect-postgres-demo.properties
  • Step 5: Configure the details of the JDBC source and save the file. I named it as connect-demo-jdbc-source.json. In the below configuration i have configured the connect to poll the database every 1 minute for
{
    "name": "jdbc_source_connector_postgresql_01",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://localhost:5432/connect-demoDB",
        "connection.user": "postgres",
        "connection.password": "admin",
        "topic.prefix": "postgres-01-",
        "poll.interval.ms" : 60000,
        "mode":"bulk"
    }
}
  • It will create kafka topic per table. Topics are named with the topic.prefix + <table_name>
  • Step 6: Start the JDBC connector
curl -d @"connect-demo-jdbc-source.json" \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
  • With this now you will see topics created in Kafka with all the table records copied in the topic automatically
  • Let us update a record and see if the data gets automatically published to postgres-01-customers. example update a record
update CUSTOMERS set contact_title = 'Sr. Sales Representative' where customer_id='ALFKI'
  • Now let us check the topic and see if the change has been reflected in the topic or not . Below is a snapshot of the topic where i searched for ALFKI using Kafka Tool and it shows that the changes are automatically propagated to topic

Thanks, Please share your comments / feedback

12 thoughts on “Kafka Connect Concepts & Example

  1. Hi Rajesh,
    Nice blog. Can we think apache nifi is also similar tool of connecting different systems with built in procesdors to transform data.

    1. Not really, NiFi is built on the idea of flowfiles and processors. it has more than 100 processors and you can add the custom logic for data transformation etc. Connect has built in connectors to connect to various source systems and based on the change in the data it triggers moving the data from external system to a Kafka topic. It doesn’t offer any great data transformation capability. Once we have the data in Kafka topic, instead of writing custom code of moving it to for example MySQL or Elastic Search , it offers a fault tolerant and reliable way to do this without writing code. Hope it helps 🙂

  2. Hello Rajesh,

    You have written a really very good article. However, still I have some questions. Is source connector mandatory when I only want to read data from kafka topics into postgres database automatically, I mean like a pipeline? Can’t we just use a sink connector?

    1. Hey thanks for the kind words. No it is not mandatory to use source connector when the source is kafka topic. In this case we need to use sink connector for the target .

Leave a Reply