Debezium Blog
Apache Kafka 2.8 allows for a first glimpse into the ZooKeeper-less future of the widely used event streaming platform: shipping with a preview of KIP-500 ("Replace ZooKeeper with a Self-Managed Metadata Quorum"), you can now run Kafka clusters without the need for setting up and operating Apache ZooKeeper. This does not only simplify running Kafka from an operational perspective, the new metadata quorum implementation (named "KRaft", Kafka Raft metadata mode) also should provide much better scaling characteristics, for instance when it comes to large numbers of topics and partitions.
Kafka Streams is a library for developing stream processing applications based on Apache Kafka. Quoting its docs, "a Kafka Streams application processes record streams through a topology in real-time, processing data continuously, concurrently, and in a record-by-record manner". The Kafka Streams DSL provides a range of stream processing operations such as a map, filter, join, and aggregate.
Non-Key Joins in Kafka Streams
Debezium’s CDC source connectors make it easy to capture data changes in databases and push them towards sink systems such as Elasticsearch in near real-time. By default, this results in a 1:1 relationship between tables in the source database, the corresponding Kafka topics, and a representation of the data at the sink side, such as a search index in Elasticsearch.
In case of 1:n relationships, say between a table of customers and a table of addresses, consumers often are interested in a view of the data that is a single, nested data structure, e.g. a single Elasticsearch document representing a customer and all their addresses.
This is where KIP-213 ("Kafka Improvement Proposal") and its foreign key joining capabilities come in: it was introduced in Apache Kafka 2.4 "to close the gap between the semantics of KTables in streams and tables in relational databases". Before KIP-213, in order to join messages from two Debezium change event topics, you’d typically have to manually re-key at least one of the topics, so to make sure the same key is used on both sides of the join.
Thanks to KIP-213, this isn’t needed any longer, as it allows to join two Kafka topics on fields extracted from the Kafka message value, taking care of the required re-keying automatically, in a fully transparent way. Comparing to previous approaches, this drastically reduces the effort for creating aggregated events from Debezium’s CDC events.
Kafka Streams is a library for developing applications for processing records from topics in Apache Kafka. A Kafka Streams application processes record streams through a topology in real-time, processing data continuously, concurrently, and in a record-by-record manner. The Kafka Streams DSL provides a range of stream processing operations such as a map, filter, join, and aggregate.
Non-Key Joins in Kafka Streams
Apache Kafka 2.4 introduced the foreign key join (KIP-213) feature, in order "to close the gap between the semantics of KTables in streams and tables in relational databases". Before KIP-213, in order to join messages from two Debezium change event topics, you’d have to manually re-key the topic(s); so to make sure the same key is used on both sides of the join. Thanks to KIP-213, this isn’t needed any longer, as it makes relational data liberated by connection mechanisms far easier to use, smoothing a transition to natively-built event-driven services. This resolved the problem of out-of-order processing due to foreign key changes in tables. Here is an interesting post by Hans-Peter Grahsl and Gunnar Morling on creating aggregated events from Debezium’s CDC events.
Debezium’s CDC source connectors makes it easy to capture data changes in databases and push them towards Elasticsearch in near real-time.
This results in a 1:1 relationship between tables in the source database and a corresponding search index in Elasticsearch.
Incase of 1:many relationship, say two tables with different primary keys and a foreign key relationship, Debezium provides a message.key.columns
option.
By choosing that foreign key column as the message key for change events, the two table streams could be joined without the need for re-keying the topic manually.
The message.key.columns
option can be used as:
message.key.columns=dbserver1.inventory.customerdetails:CustomerId
Outbox as in that folder in my email client? No, not exactly but there are some similarities!
The term outbox describes a pattern that allows independent components or services to perform read your own write semantics while concurrently providing a reliable, eventually consistent view to those writes across component or service boundaries.
You can read more about the Outbox pattern and how it applies to microservices in our blog post, Reliable Microservices Data Exchange With the Outbox Patttern.
So what exactly is an Outbox Event Router?
In Debezium version 0.9.3.Final, we introduced a ready-to-use Single Message Transform (SMT) that builds on the Outbox pattern to propagate data change events using Debezium and Kafka. Please see the documentation for details on how to use this transformation.
As a follow up to the recent Building Audit Logs with Change Data Capture and Stream Processing blog post, we’d like to extend the example with admin features to make it possible to capture and fix any missing transactional data.
In the above mentioned blog post, there is a log enricher service used to combine data inserted or updated in the Vegetable database table with transaction context data such as
-
Transaction id
-
User name who performed the work
-
Use case that was behind the actual change e.g. "CREATE VEGETABLE"
This all works well as long as all the changes are done via the vegetable service. But is this always the case?
What about maintenance activities or migration scripts executed directly on the database level? There are still a lot of such activities going on, either on purpose or because that is our old habits we are trying to change…
Let’s talk about TOAST. Toast? No, TOAST!
So what’s that? TOAST (The Oversized-Attribute Storage Technique) is a mechanism in Postgres which stores large column values in multiple physical rows, circumventing the page size limit of 8 KB.
Typically, TOAST storage is transparent to the user, so you don’t really have to care about it. There’s an exception, though: if a table row has changed, any unchanged values that were stored using the TOAST mechanism are not included in the message that Debezium receives from the database, unless they are part of the table’s replica identity. Consequently, such unchanged TOAST column value will not be contained in Debezium data change events sent to Apache Kafka. In this post we’re going to discuss different strategies for dealing with this situation.
It is a common requirement for business applications to maintain some form of audit log, i.e. a persistent trail of all the changes to the application’s data. If you squint a bit, a Kafka topic with Debezium data change events is quite similar to that: sourced from database transaction logs, it describes all the changes to the records of an application. What’s missing though is some metadata: why, when and by whom was the data changed? In this post we’re going to explore how that metadata can be provided and exposed via change data capture (CDC), and how stream processing can be used to enrich the actual data change events with such metadata.
This is a guest post by Apache Pulsar PMC Member and Committer Jia Zhai.
Debezium is an open source project for change data capture (CDC). It is built on Apache Kafka Connect and supports multiple databases, such as MySQL, MongoDB, PostgreSQL, Oracle, and SQL Server. Apache Pulsar includes a set of built-in connectors based on Pulsar IO framework, which is counter part to Apache Kafka Connect.
As of version 2.3.0, Pulsar IO comes with support for the Debezium source connectors out of the box, so you can leverage Debezium to stream changes from your databases into Apache Pulsar. This tutorial walks you through setting up the Debezium connector for MySQL with Pulsar IO.
Last week’s announcement of Quarkus sparked a great amount of interest in the Java community: crafted from the best of breed Java libraries and standards, it allows to build Kubernetes-native applications based on GraalVM & OpenJDK HotSpot. In this blog post we are going to demonstrate how a Quarkus-based microservice can consume Debezium’s data change events via Apache Kafka. For that purpose, we’ll see what it takes to convert the shipment microservice from our recent post about the outbox pattern into Quarkus-based service.
As part of their business logic, microservices often do not only have to update their own local data store, but they also need to notify other services about data changes that happened. The outbox pattern describes an approach for letting services execute these two tasks in a safe and consistent manner; it provides source services with instant "read your own writes" semantics, while offering reliable, eventually consistent data exchange across service boundaries.
The second-level cache of Hibernate ORM / JPA is a proven and efficient way to increase application performance: caching read-only or rarely modified entities avoids roundtrips to the database, resulting in improved response times of the application.
Unlike the first-level cache, the second-level cache is associated with the session factory (or entity manager factory in JPA terms), so its contents are shared across transactions and concurrent sessions. Naturally, if a cached entity gets modified, the corresponding cache entry must be updated (or purged from the cache), too. As long as the data changes are done through Hibernate ORM, this is nothing to worry about: the ORM will update the cache automatically.
Things get tricky, though, when bypassing the application, e.g. when modifying records directly in the database. Hibernate ORM then has no way of knowing that the cached data has become stale, and it’s necessary to invalidate the affected items explicitly. A common way for doing so is to foresee some admin functionality that allows to clear an application’s caches. For this to work, it’s vital to not forget about calling that invalidation functionality, or the application will keep working with outdated cached data.
In the following we’re going to explore an alternative approach for cache invalidation, which works in a reliable and fully automated way: by employing Debezium and its change data capture (CDC) capabilities, you can track data changes in the database itself and react to any applied change. This allows to invalidate affected cache entries in near-realtime, without the risk of stale data due to missed changes. If an entry has been evicted from the cache, Hibernate ORM will load the latest version of the entity from the database the next time is requested.
Updating external full text search indexes (e.g. Elasticsearch) after data changes is a very popular use case for change data capture (CDC).
As we’ve discussed in a blog post a while ago, the combination of Debezium’s CDC source connectors and Confluent’s sink connector for Elasticsearch makes it straight forward to capture data changes in MySQL, Postgres etc. and push them towards Elasticsearch in near real-time. This results in a 1:1 relationship between tables in the source database and a corresponding search index in Elasticsearch, which is perfectly fine for many use cases.
It gets more challenging though if you’d like to put entire aggregates into a single index. An example could be a customer and all their addresses; those would typically be stored in two separate tables in an RDBMS, linked by a foreign key, whereas you’d like to have just one index in Elasticsearch, containing documents of customers with their addresses embedded, allowing you to efficiently search for customers based on their address.
Following up to the KStreams-based solution to this we described recently, we’d like to present in this post an alternative for materializing such aggregate views driven by the application layer.
Most of the times Debezium is used to stream data changes into Apache Kafka. What though if you’re using another streaming platform such as Apache Pulsar or a cloud-based solution such as Amazon Kinesis, Azure Event Hubs and the like? Can you still benefit from Debezium’s powerful change data capture (CDC) capabilities and ingest changes from databases such as MySQL, Postgres, SQL Server etc.?
Turns out, with just a bit of glue code, you can! In the following we’ll discuss how to use Debezium to capture changes in a MySQL database and stream the change events into Kinesis, a fully-managed data streaming service available on the Amazon cloud.
Microservice-based architectures can be considered an industry trend and are thus often found in enterprise applications lately. One possible way to keep data synchronized across multiple services and their backing data stores is to make us of an approach called change data capture, or CDC for short.
Essentially CDC allows to listen to any modifications which are occurring at one end of a data flow (i.e. the data source) and communicate them as change events to other interested parties or storing them into a data sink. Instead of doing this in a point-to-point fashion, it’s advisable to decouple this flow of events between data sources and data sinks. Such a scenario can be implemented based on Debezium and Apache Kafka with relative ease and effectively no coding.
As an example, consider the following microservice-based architecture of an order management system: