I’m very happy to announce the release of Debezium 1.8.0.Beta1!

This release is packed with exciting new features like support for MongoDB 5.0, an outbox event router for the MongoDB connector and support for Postgres logical decoding messages, as well as tons of bugfixes and other improvements. Overall, not less than 63 issues have been fixed for this release.

Let’s take a closer look at some of them.

MongoDB Outbox Event Router

The outbox pattern is becoming more and more popular for exchanging data between microservices in a reliable way, without using unsafe dual writes to a service’s database and Apache Kafka.

With the outbox pattern, instead of capturing changes from your actual business tables, you write messages to be sent to external consumers into a dedicated outbox table. This nicely decouples your internal data model from the message contracts used for communicating with external services, allowing you to develop and evolve these independently. Updates to your business tables and inserts into the outbox table are done within one database transaction, so that either both of these things are done, or none of them. Once a message has been persisted in the outbox table, Debezium can capture it from there and propagate it to any consumers using the usual at-least-once semantics.

Debezium provides support for implementing the outbox pattern via a special single message transform (SMT), the outbox event router. This takes care of routing events from the single outbox table to different topics, based on a configurable column representing the aggregate type (in the parlance of domain driven design) the event is for. In addition, there is an extension for emitting outbox events from services built using Quarkus, a stack for building cloud-native microservices.

These things are complemented now by a new event routing SMT which works with the Debezium connector for MongoDB. As the MongoDB connector’s event format differs from the format of the Debezium connectors for relational databases, creating this separate SMT became necessary. Here’s an example for configuring the SMT:

{
  "name": "outbox-connector",
  "config": {
    "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max" : "1",
    "mongodb.hosts" : "rs0/mongodb:27017",
    "mongodb.name" : "dbserver1",
    "mongodb.user" : "debezium",
    "mongodb.password" : "dbz",
    "collection.include.list": "inventory.outboxevent",
    "database.history.kafka.bootstrap.servers" : "kafka:9092",

    "transforms" : "outbox",
    "transforms.outbox.type" :
        "io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter",
    "transforms.outbox.route.topic.replacement" : "${routedByValue}.events",
    "transforms.outbox.collection.expand.json.payload" : "true",
    "transforms.outbox.collection.field.event.timestamp" : "timestamp",
    "transforms.outbox.collection.fields.additional.placement" : "type:header:eventType",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}

Here we’re using the MongoEventRouter SMT for capturing changes from the inventory.outboxevent collection. Events could be written like so, using the MongoDB CLI as an example:

new_order = { "_id" : ObjectId("000000000000000000000002"), "order_date" : ISODate("2021-11-22T00:00:00Z"), "purchaser_id" : NumberLong(1004), "quantity" : 1, "product_id" : NumberLong(107) }

s = db.getMongo().startSession()
s.startTransaction()

s.getDatabase("inventory").orders.insert(new_order)
s.getDatabase("inventory").outboxevent.insert({ _id : ObjectId("000000000000000000000001"), aggregateid : new_order._id, aggregatetype : "Order", type : "OrderCreated", timestamp: NumberLong(1556890294484), payload : new_order })

s.commitTransaction()

Note how we’re doing the inserts into a business collection ("orders") and into the outbox collection ("outboxevent") within a transaction, as supported by MongoDB since version 4.0. While we are using the actual order object in the outbox message itself in this particular case, we also could separate these things and choose another representation of the purchase orders in the outbox events.

The id of the order aggregate is used as the message key in Kafka, ensuring consistent ordering of all outbox events pertaining to a given purchase order. The aggregate type is used for determining the name of the topic to route events to, Order.events in this example. The unique id of the message itself is propagated as a header in the Kafka message, for instance allowing consumers to identify duplicated messages.

You can find a complete example for using this new MongoDB outbox event routing SMT in our demos repository. A massive thank you to Sungho Hwang, who not only provided the actual feature implementation itself, but also created this example.

Potential next steps around outbox support for the Debezium MongoDB connector may be adding support for MongoDB to the Quarkus outbox extension, and having an option to capture outbox events from sub-documents attached to an entity like Order. That way, your application’s data and the outbox message could be written as a single document (the application would otherwise ignore the outbox sub-document itself) and not requiring cross-document transactions. This idea is tracked via DBZ-4319; please let us know if you think that’d be a useful addition or if you’d even be interested in implementing it.

Support for Postgres' pg_logical_emit_message()

The versatility and flexibility of Postgres is legend; one of the interesting and lesser known features is the ability to write messages into the database’s transaction log (WAL), without writing to a table actually. This is done via the pg_logical_emit_message() function. As of Postgres 14, these logical decoding messages can be captured using the pgoutput plug-in, and Debezium also supports this event type as of this release.

Logical decoding messages are great for propagating contextual information associated to your transactions, without having to store this data in a table. This could for instance be auditing metadata such as a business user who triggered some data change. Another potential use case is the outbox pattern mentioned above, which could be implemented without a dedicated outbox table, solely by writing outbox events to the WAL. That’s advantageous for instance when thinking about house-keeping: there’d be no need for removing messages from an outbox table after they have been propagated to Kafka.

"Sending" a logical decoding message is as simple as that:

SELECT pg_logical_emit_message(true, 'some-prefix', 'some text');

This emits a message which is transactional (true), with the "some-prefix" prefix and "some text" as the message contents. The prefix can be used for grouping messages into logical contexts. Debezium uses the prefix as the Kafka message key, i.e. all messages with the same prefix will go into the same partition of the corresponding Kafka topic and thus will be propagated in the same order to downstream consumers as they were created.

Logical decoding messages are emitted by the Debezium Postgres connector using a new event type ("m") and look like so (the message content is binary-encoded, using Base64 in this example):

{
  "source": {
    "version": "1.8.0.Beta1",
    "connector": "postgresql",
    "name": "PostgreSQL_server",
    "ts_ms": 1559033904863,
    "snapshot": false,
    "db": "postgres",
    "schema": "",
    "table": "",
    "txId": 556,
    "lsn": 46523128,
    "xmin": null
  },
  "op": "m",
  "ts_ms": 1559033904961,
  "message": {
    "prefix": "some-prefix",
    "content": "c29tZSB0ZXh0"
  }
}

The message contents is an arbitrary payload, besides the textual represention you also can insert binary data here. It is the responsibility of the event producer to document the format, evolve it with backwards compatibility in mind, and exchange schema information with any clients. One great way of doing so would be to take advantage of a schema registry such as Apicurio. You also could think of using a standard like CloudEvents for your logical decoding messages, which then for instance would allow an SMT such as the aforementioned outbox event router to take action based on defined attributes in the event structure.

To learn more about support for logical decoding messages in Debezium, please refer to the connector documentation. Thanks a lot to Lairen Hightower for implementing this feature!

Other Fixes and Changes

Further fixes and improvements in the 1.8.0.Beta1 release include:

  • Support for configuring SMTs and topic creation settings in the Debezium UI; you can see the former in a quick video in this post, and we’ll share another demo of the topic creation UI later this week

  • Transaction metadata events in the Vitess connector (DBZ-4355); we also simplified its configuration by removing the dependency to vtctld (DBZ-4324), added support for the stop_on_reshard flag (DBZ-4295), and provided the ability to specify a VGTID as the starting point for streaming (DBZ-4297). All these changes were contributed by Yang Wu and Shichao from the Stripe engineering team, who agreed to step up as maintainers of this connector. Thanks a lot, and welcome!

  • More flexible configuration of the Infinispan-based transaction buffer of the Debezium connector for Oracle (DBZ-4169)

  • Improved type mappings for MONEY columns in Postgres (DBZ-1931) and INTERVAL columns in Oracle (DBZ-1539)

  • Support for schema changes while doing an incremental snapshot with the Debezium connector for MySQL (DBZ-4196); thanks to Kate Galieva for this very useful improvement!

Please refer to the release notes to learn more about these and further fixes in this release.

As always, a big thank you to everyone contributing to this release:

Outlook

With the Beta1 release out, we’re approaching the final phase of the 1.8 release cycle. You can expect a CR1 sometime next week, and depending on incoming issue reports, we may decide to cut the Final release either in the week before Christmas, or in the first week of 2022. In terms of features to be added, one thing we’d love to get to is incremental snapshotting support for the MongoDB connector. We’ll have to see whether this will make it in the remaining time, or whether this will have to wait for the Debezium 1.9 release. While the 1.8 release line is maturing, you also can look forward to the release of Debezium 1.7.2.

Going forward, we’re also continuing our planning around Debezium 2.0, which should be released sometime next year. Please join the discussion on this topic on the mailing list.

Gunnar Morling

Gunnar is a software engineer at Red Hat and open-source enthusiast by heart. A long-time Hibernate core team member, he's now the project lead of Debezium. Gunnar is the spec lead for Bean Validation 2.0 (JSR 380). He’s based in Hamburg, Germany.

   


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.