Kafka Streams is a library for developing applications for processing records from topics in Apache Kafka. A Kafka Streams application processes record streams through a topology in real-time, processing data continuously, concurrently, and in a record-by-record manner. The Kafka Streams DSL provides a range of stream processing operations such as a map, filter, join, and aggregate.

Non-Key Joins in Kafka Streams

Apache Kafka 2.4 introduced the foreign key join (KIP-213) feature, in order "to close the gap between the semantics of KTables in streams and tables in relational databases". Before KIP-213, in order to join messages from two Debezium change event topics, you’d have to manually re-key the topic(s); so to make sure the same key is used on both sides of the join. Thanks to KIP-213, this isn’t needed any longer, as it makes relational data liberated by connection mechanisms far easier to use, smoothing a transition to natively-built event-driven services. This resolved the problem of out-of-order processing due to foreign key changes in tables. Here is an interesting post by Hans-Peter Grahsl and Gunnar Morling on creating aggregated events from Debezium’s CDC events.

Debezium’s CDC source connectors makes it easy to capture data changes in databases and push them towards Elasticsearch in near real-time. This results in a 1:1 relationship between tables in the source database and a corresponding search index in Elasticsearch. Incase of 1:many relationship, say two tables with different primary keys and a foreign key relationship, Debezium provides a message.key.columns option. By choosing that foreign key column as the message key for change events, the two table streams could be joined without the need for re-keying the topic manually. The message.key.columns option can be used as:

message.key.columns=dbserver1.inventory.customerdetails:CustomerId

The join operations available in the Kafka Streams DSL differ based on which kinds of streams or tables that are being joined; for example, KStream-KStream joins, KTable-KTable joins or KTable-KStream joins, etc. Joining streams often involves defining input streams that are read from Kafka topics, delivering transformations on the joined streams to produce results, and finally writing the results back to Kafka topics.

Non-key joins or rather foreign-key joins are analogous to joins in SQL. For example:

SELECT * FROM CUSTOMER JOIN ADDRESS ON CUSTOMER.ID = ADDRESS.CUSTOMER_ID

The output of the join is a new KTable containing the join result.

Database Overview

As an example let’s consider an application with the following data model:

Database Overview

The picture above shows the ER diagram of the database used in our application. We are going to focus on two entities:

  • customer

  • address

They share a foreign key relationship from address to customer i.e. a customer can have multiple addresses.

Debezium’s CDC allows to it emits events for each table(s) on distinct topics, and consumers often are interested in a view of such aggregate spanning multiple tables as a single document. Using Kafka Streams, the change event topics for both tables will be loaded into two KTables, which are joined on the customer id. The KStreams application is going to process data from the two Kafka topics. These topics receive CDC events based on the customers and addresses, each of which has its corresponding Jackson-annotated POJO (Customer and Address), enriched by a field holding the CDC event type (i.e. UPSERT/DELETE). Each insertion, update, or deletion of a record on either side will re-trigger the join.

As a runtime for the Kafka Streams application, we’re going to use Quarkus, a stack for building cloud-native microservices, which (amongst many others) also provides an extension for Kafka Streams. While it’s general possible to run a Kafka Streams topology via a plain main() method, using Quarkus and this extension as a foundation has a number of advantages:

  • Management Topology

  • Health Checks

  • Metrics

  • Dev Mode

Change Event Overview

This picture shows the change events overview of the application.

Creating an Application using the Quarkus Kafka Streams Extension

To create a new Quarkus project with this extension, run the following:

mvn io.quarkus:quarkus-maven-plugin:1.11.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-streams-quickstart-aggregator \
    -Dextensions="kafka-streams" \
     && mv kafka-streams-quickstart-aggregator aggregator

Understanding the Stream Processing Topology

We have an aggregator that will read from the two Kafka topics and process them in a streaming pipeline:

  • the two topics are joined on customer id

  • each customer is mapped with its addresses depending upon upsert/delete

  • this aggregated data is written out to a third topic (customersWithAddressesTopic)

When using the Quarkus extension for Kafka Streams all we need to do for that is to declare a CDI producer method, which returns the Topology of our stream processing application. A CDI producer method must be always annotated with @Produces, and it must return a Topology instance. The Quarkus extension will take care of configuring, starting and stopping the actual Kafka Streams engine. Now let’s take a look at the actual streaming query implementation itself.

@ApplicationScoped
public class TopologyProducer {

    @ConfigProperty(name = "customers.topic") (1)
    String customersTopic;

    @ConfigProperty(name = "addresses.topic")
    String addressesTopic;

    @ConfigProperty(name = "customers.with.addresses.topic")
    String customersWithAddressesTopic;

    @Produces
    public Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder(); (2)

        Serde<Long> adressKeySerde = DebeziumSerdes.payloadJson(Long.class);
        adressKeySerde.configure(Collections.emptyMap(), true);
        Serde<Address> addressSerde = DebeziumSerdes.payloadJson(Address.class);
        addressSerde.configure(Collections.singletonMap("from.field", "after"), false);

        Serde<Integer> customersKeySerde = DebeziumSerdes.payloadJson(Integer.class);
        customersKeySerde.configure(Collections.emptyMap(), true);
        Serde<Customer> customersSerde = DebeziumSerdes.payloadJson(Customer.class);
        customersSerde.configure(Collections.singletonMap("from.field", "after"), false);

        JsonbSerde<AddressAndCustomer> addressAndCustomerSerde = new JsonbSerde<>(AddressAndCustomer.class); (3)
        JsonbSerde<CustomerWithAddresses> customerWithAddressesSerde = new JsonbSerde<>(CustomerWithAddresses.class);

        KTable<Long, Address> addresses = builder.table( (4)
                addressesTopic,
                Consumed.with(adressKeySerde, addressSerde)
        );

        KTable<Integer, Customer> customers = builder.table(
                customersTopic,
                Consumed.with(customersKeySerde, customersSerde)
        );

        KTable<Integer, CustomerWithAddresses> customersWithAddresses = addresses.join( (5)
                customers,
                address -> address.customer_id,
                AddressAndCustomer::new,
                Materialized.with(Serdes.Long(), addressAndCustomerSerde)
            )
            .groupBy( (6)
                (addressId, addressAndCustomer) -> KeyValue.pair(addressAndCustomer.customer.id, addressAndCustomer),
                Grouped.with(Serdes.Integer(), addressAndCustomerSerde)
            )
            .aggregate( (7)
                CustomerWithAddresses::new,
                (customerId, addressAndCustomer, aggregate) -> aggregate.addAddress(addressAndCustomer),
                (customerId, addressAndCustomer, aggregate) -> aggregate.removeAddress(addressAndCustomer),
                Materialized.with(Serdes.Integer(), customerWithAddressesSerde)
            );

        customersWithAddresses.toStream() (8)
        .to(
                customersWithAddressesTopic,
                Produced.with(Serdes.Integer(), customerWithAddressesSerde)
        );

        return builder.build();
    }
}
1 The topic names are injected using the MicroProfile Config API, with the values being provided in the Quarkus application.properties configuration file.
2 Create an instance of StreamsBuilder, which is the helper object that lets us build our topology.
3 For serializing and deserializing Java types used in the streaming pipeline into/from JSON, Kafka provides the class io.quarkus.kafka.client.serialization.JsonbSerde. The Serde implementation based is on JSON-B.
4 KTable-KTable foreign-key join functionality is used to extract the customer#id and perform the join. StreamsBuilder#table() to read a Kafka topic into a KTable i.e. addresses and customers respectively.
5 The message from the addresses topic is joined with the corresponding customers topic, using the topic’s key; the join result contains the data of the customer with its addresses.
6 groupBy() operation will have the records to be grouped by customer#id.
7 The first characteristic of aggregations in Kafka is that all aggregations are computed per key. That’s why we must group a KTable prior to the actual aggregation step in Kafka Streams via groupBy(). The aggregation operation is applied to records of the same key. It is possible to store the aggregation results in a local state store. The aggregate() operation will modify addresses per customer#id depending upon the upsert or delete i.e. addAddress() or removeAddress() operation.
8 The results of the pipeline are written out to the customersWithAddressesTopic topic.

The CustomerWithAddresses class keeps track of the aggregated values while the events are processed in the streaming pipeline.

public class CustomerWithAddresses {

    public Customer customer;
    public List<Address> addresses = new ArrayList<>();

    public CustomerWithAddresses addAddress(AddressAndCustomer addressAndCustomer) {

        customer = addressAndCustomer.customer;
        addresses.add(addressAndCustomer.address);

        return this;
    }

    public CustomerWithAddresses removeAddress(AddressAndCustomer addressAndCustomer) {

        Iterator<Address> it = addresses.iterator();
        while (it.hasNext()) {
            Address a = it.next();
            if (a.id == addressAndCustomer.address.id) {
                it.remove();
                break;
            }
        }

        return this;
    }
}

The Kafka Streams extension is configured via the Quarkus configuration file application.properties. Along with the topic names, this file also has the information about the Kafka bootstrap server, stream options as follows:

customers.topic=dbserver1.inventory.customers
addresses.topic=dbserver1.inventory.addresses
customers.with.addresses.topic=customers-with-addresses

quarkus.kafka-streams.bootstrap-servers=localhost:9092
quarkus.kafka-streams.application-id=kstreams-fkjoin-aggregator
quarkus.kafka-streams.application-server=${hostname}:8080
quarkus.kafka-streams.topics=${customers.topic},${addresses.topic}

# streams options
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.metrics.recording.level=DEBUG
kafka-streams.consumer.session.timeout.ms=150
kafka-streams.consumer.heartbeat.interval.ms=100

Building and Running the Application

You can now build the aggregator application using:

mvn clean package -f aggregator/pom.xml

Setup the necessary environment variable:

export DEBEZIUM_VERSION=1.4

To launch all the containers and building aggregator container images run the docker-compose.yaml. You can find this in the debezium-examples/kstreams-fk-join repo.

docker-compose up --build

To connect with the Debezium Connector you have to specify the configuration properties like name of the connector, database hostname, user, password, port, logical name of the database, the database you want to monitor and, the type of converter in a register-postgres.json file.

{
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "postgres",
    "database.server.name": "dbserver1",
    "schema.include": "inventory",
    "decimal.handling.mode" : "string",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
}

Configure the Debezium Connector:

http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json

Now run an instance of the debezium/tooling image, attaching to the same network all the other containers run in:

docker run --tty --rm \
    --network kstreams-fk-join-network \
    debezium/tooling:1.1 \

This image provides several useful tools such as kafkacat. Within the tooling container, run kafkacat to examine the results of the streaming pipeline:

kafkacat -b kafka:9092 -C -o beginning -q \
    -t customers-with-addresses | jq .

Running Natively

Kafka Streams applications can easily be scaled out i.e. the load will be shared amongst multiple instances of the application, each processing just a subset of the partitions of the input topic(s). Running Quarkus applications in native code via GraalVM has a very fast start-up time, the applications use significantly less memory when compiled to native code. This allows you to start as many instances of a Kafka Streams pipeline in parallel in a very memory-efficient way. If you want to run this application in native mode, set the QUARKUS_MODE as native and run:

mvn clean package -f aggregator/pom.xml -Pnative

Here is a detailed guide for you.

More Insights on the Kafka Streams Extension

This extension can help you address some of the common requirements when building microservices. For running your Kafka Streams application in production, you can also add health checks and metrics for the data pipeline.

Micrometer Metrics allow applications to gather various metrics and statistics that provide insights into what is happening inside the application. Using the MicroProfile Metrics API, these metrics can be exposed via HTTP using JSON format or the OpenMetrics format. From there they can be scraped by tools such as Prometheus and stored for analysis and visualization. Apart from application-specific metrics, you can utilize built-in metrics exposed by various Quarkus extensions.

To monitor your application using Prometheus registry for an existing Quarkus application you can add micrometer-registry-prometheus extension. This extension loads core micrometer extension as well as additional dependencies required to support Prometheus.

mvn quarkus:add-extension -Dextensions="micrometer-registry-prometheus"

On the other hand, we have MicroProfile Health, which provides information about the liveness of the application i.e. states whether your application is running or not and whether your application is able to process requests.

To monitor the health status of your existing Quarkus application you can add the smallrye-health extension.

mvn quarkus:add-extension -Dextensions="smallrye-health"

Using this extension will expose a health check via HTTP under /health/live.

Along with Kafka Streams Interactive Queries, you can directly query the underlying state store of the pipeline for the value associated with a given key. By exposing a simple REST endpoint that queries the state store, the latest aggregation result can be retrieved without having to subscribe to any Kafka topic.

Summary

The Quarkus extension for Kafka Streams comes with everything needed to run stream processing pipelines on the JVM as well as in Native mode, along with additional bonuses of performing health checks, metrics and interactive queries.

In this article we have discussed stream processing topology of foreign key joins in Kafka Streams and how to use the Quarkus Kafka Streams extension for running and building your application in JVM mode.

You can find the complete source code of the implementation in the Debezium examples repo. If you got any questions or feedback, please let us know in the comments below. We’re looking forward to your suggestions!

Thanks a lot Gunnar Morling for your feedback while working on this post!

Anisha Mohanty

Anisha is an Associate Software Engineer at Red Hat. Currently working with the Debezium Team. She lives in Bangalore, India.

   


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.