Kafka Streams

  • Kafka Streams is a client library (API) to build applications that analyze and process data stored in Kafka in real-time.
  • Streams application takes input from a Kafka topic and stores the output also in a Kafka topic and a stream is an ordered, replay-able, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
  • It is capable of doing stateful and/or stateless processing on real-time data.
  • The operations which look at only one individual event at a time are stateless and the operations which requires information from past events are stateful.
  • Stream processing application doesn’t run inside a Kafka broker. Instead, it runs in a separate JVM instance, or in a separate cluster entirely
  • stream processing application is any program that makes use of the Kafka Streams library. It defines its computational logic through one or more processor topologies, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).
  • stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.
  • Topology is a directed acyclic graph of stream processing nodes that represents the stream processing logic of a Kafka Streams application. It defines the computational logic of the data processing that needs to be performed by a stream processing application. 
  • There are two special processors in the topology:
    • Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.
    • Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.
  • Developers can define topologies either via the low-level Processor API or via the functional DSL.
    • The declarative, functional DSL is the recommended API for most users – and notably for starters – because most data processing use cases can be expressed in just a few lines of DSL code. 
    • The imperative, lower-level Processor API provides you with even more flexibility than the DSL but at the expense of requiring more manual coding work.
  • The lower level API allows you to create your own transformations, but this is rarely required.
  • An application that uses DSL API does the following
    • Use StreamBuilder to create a processing topology—a directed graph (DAG) of transformations that are applied to the events in the streams.
    • Create a create a KafkaStreams execution object from the topology. execution object from the topology.
    • Start the KafkaStreams object which will will multiple threads, each applying the processing topology to events in the stream.
    • Processing will conclude when you close the KafkaStreams object.
  • We will now build a streams application for the below given use case
    • Consume the sentences stored in Kafka topic and split it by word
    • Store the word in another Kafka topic
  • The topology for above use case will look like this
  • We will start by creating a java.util.Properties map to specify different Streams execution configuration values as defined in StreamsConfig.
  • A couple of important configuration values you need to set are: StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, which specifies a list of host/port pairs to use for establishing the initial connection to the Kafka cluster, and StreamsConfig.APPLICATION_ID_CONFIG, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster and Serdes for Key and Value . In this example both key and value are String type
Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  • Next step is to define the processing logic for streams application. This is defined as a topology of connected processor nodes and we can use a topology builder to construct such a topology. As depicted in the picture above , the topology below shows , we have an input source processor which takes input from “text-input” topic , then using flatMapValue function we do a stateless transformation which splits the sentence into words and stores the output to “words-output” topic.
StreamsBuilder builder = new StreamsBuilder();

builder.<String, String>stream("text-input")
               .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
  • We can inspect what kind of topology is created from this builder by doing the following:
Topology topology = builder.build();
  • By calling its start() function on streams object we can trigger the execution of this client.
  • The execution won’t stop until close() is called on this client. We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    public void run() {
try {
} catch (Throwable e) {
  • To execute the program, create a topic
kafka-topics --create --topic text-input --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
  • Run the java program
  • Produce messages using below command
kafka-console-producer -broker-list localhost:9092 -topic text-input
  • In another window, if you open a consumer , you will see sentence being transformed as individual words
kafka-console-consumer --bootstrap-server localhost:9092 \
    --topic words-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer\
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • The java application also prints the topology and you will see a below output
  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [text-input])
      --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
      --> KSTREAM-SINK-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Sink: KSTREAM-SINK-0000000002 (topic: words-output)
      <-- KSTREAM-FLATMAPVALUES-0000000001

Global Stores:
  • This output shows that the topology has 3 processor nodes .
    1. source node: KSTREAM-SOURCE-0000000000
    2. processor node: KSTREAM-FLATMAPVALUES-0000000001
    3. sink node: KSTREAM-SINK-0000000001
  • KSTREAM-SOURCE-0000000000 continuously read records from Kafka topic text-input
  • This is sent to KSTREAM-FLATMAPVALUES-0000000001 processor node and it generates one or more words as a result of processing logic
  • Data is then traversed down to the sink node KSTREAM-SINK-0000000001 which is written back to Kafka topic words-ouput
  • This processor node is “stateless” as it is not associated with any stores (i.e. (stores: [])).
  • This was a simple example of stream processing.

Kafka’s stream-processing library allows developers to consume, process, and produce events in their own apps, without relying on an external processing framework.

Thanks . Please share your comments / feedback.

2 thoughts on “Kafka Streams

Leave a Reply