Setting up change data capture (CDC) pipelines with Debezium typically is a matter of configuration, without any programming being involved. It’s still a very good idea to have automated tests for your CDC set-up, making sure that everything is configured correctly and that your Debezium connectors are set up as intended.

There’s two main components involved whose configuration need consideration:

  • The source database: it must be set up so that Debezium can connect to it and retrieve change events; details depend on the specific database, e.g. for MySQL the binlog must be in "row" mode, for Postgres, one of the supported logical decoding plug-ins must be installed, etc.

  • The Debezium connector: it must be configured using the right database host and credentials, possibly using SSL, applying table and column filters, potentially one or more single message transformations (SMTs), etc.

This is where the newly added Debezium support for integration tests with Testcontainers comes in. It allows to set up all the required components (Apache Kafka, Kafka Connect etc.) using Linux container images, configure and deploy a Debezium connector and run assertions against produced change data events.

Let’s take a look at how it’s done.

Project Set-Up

Assuming you’re working with Apache Maven for dependency management, add the following dependencies to your pom.xml, pulling in the Debezium Testcontainers integration and the Testcontainers module for Apache Kafka:

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-testing-testcontainers</artifactId>
  <version>1.1.0.CR1</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <scope>test</scope>
</dependency>

Also add the Testcontainers dependency for your database, e.g. Postgres:

<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <scope>test</scope>
</dependency>

You can find an example project with the complete configuration in the debezium-examples repo on GitHub.

Initializing Testcontainers

Having declared all the required dependencies, it’s time to write a CDC integration test. With Testcontainers, integration tests are implemented using Linux containers and Docker. It provides a Java API for starting and managing the resources needed by a test. We can use this to fire up Apache Kafka, Kafka Connect and a Postgres database:

public class CdcTest {

  private static Network network = Network.newNetwork(); (1)

  private static KafkaContainer kafkaContainer = new KafkaContainer()
      .withNetwork(network); (2)

  public static PostgreSQLContainer<?> postgresContainer =
      new PostgreSQLContainer<>("debezium/postgres:11")
          .withNetwork(network)
          .withNetworkAliases("postgres"); (3)

  public static DebeziumContainer debeziumContainer =
      new DebeziumContainer("1.1.0.CR1")
          .withNetwork(network)
          .withKafka(kafkaContainer)
          .dependsOn(kafkaContainer); (4)

  @BeforeClass
  public static void startContainers() { (5)
    Startables.deepStart(Stream.of(
        kafkaContainer, postgresContainer, debeziumContainer))
            .join();
  }
}
1 Define a Docker network to be used by all the services
2 Set up a container for Apache Kafka
3 Set up a container for Postgres 11 (using Debezium’s Postgres container image)
4 Set up a container for Kafka Connect with Debezium
5 Start all three containers in a @BeforeClass method

Note that you need to have Docker installed in order to use Testcontainers.

The Test Implementation

With the needed infrastructure in place, we can write a test for our CDC set-up. The overall flow of the test is this:

  • Configure a Debezium connector for the Postgres database

  • Execute a few SQL statements to change some data

  • Retrieve the resulting change data events from the corresponding Kafka topic using a Kafka consumer

  • Run some assertions against these events

Here’s the shell for the test:

@Test
public void canObtainChangeEventsFromPostgres() throws Exception {
  try (Connection connection = getConnection(postgresContainer);
      Statement statement = connection.createStatement();
      KafkaConsumer<String, String> consumer =
          getConsumer(kafkaContainer)) {

      // TODO ...
  }
}

The credentials for the database connection can be obtained from the Postgres container started via Testcontainers, nicely avoiding any redundancies:

private Connection getConnection(PostgreSQLContainer<?> postgresContainer)
    throws SQLException {

  return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
      postgresContainer.getUsername(),
      postgresContainer.getPassword());
}

The same goes for the Kafka consumer:

private KafkaConsumer<String, String> getConsumer(
    KafkaContainer kafkaContainer) {

  return new KafkaConsumer<>(
      ImmutableMap.of(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
          kafkaContainer.getBootstrapServers(),

          ConsumerConfig.GROUP_ID_CONFIG,
          "tc-" + UUID.randomUUID(),

          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
          "earliest"),
      new StringDeserializer(),
      new StringDeserializer());
}

Now let’s implement the actual test logic:

statement.execute("create schema todo"); (1)
statement.execute("create table todo.Todo (" +
                    "id int8 not null, " +
                    "title varchar(255), " +
                    "primary key (id))");
statement.execute("alter table todo.Todo replica identity full");
statement.execute("insert into todo.Todo values (1, 'Learn CDC')");
statement.execute("insert into todo.Todo values (2, 'Learn Debezium')");

ConnectorConfiguration connector = ConnectorConfiguration
        .forJdbcContainer(postgresContainer)
        .with("database.server.name", "dbserver1");

debeziumContainer.registerConnector("my-connector",
        connector); (2)

consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));

List<ConsumerRecord<String, String>> changeEvents =
        drain(consumer, 2); (3)

ConsumerRecord<String, String> changeEvent = changeEvents.get(0);
assertThat(JsonPath.<Integer> read(changeEvent.key(), "$.id"))
  .isEqualTo(1);
assertThat(JsonPath.<String> read(changeEvent.value(), "$.op"))
  .isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvent.value(), "$.after.title"))
  .isEqualTo("Learn CDC");

changeEvent = changeEvents.get(1);
assertThat(JsonPath.<Integer> read(changeEvent.key(), "$.id"))
  .isEqualTo(2);
assertThat(JsonPath.<String> read(changeEvent.value(), "$.op"))
  .isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvent.value(), "$.after.title"))
  .isEqualTo("Learn Debezium");

consumer.unsubscribe();
1 Create a table in the Postgres database and insert two records
2 Register an instance of the Debezium Postgres connector
3 Read two records from the change event topic in Kafka and assert their attributes

Note how Debezium’s Testcontainers support allows to seed the connector configuration from the database container, avoiding the need to give the database connection properties explicitly. Only the unique database.server.name must be given, and of course you could apply other configuration options such as table or column filters, SMTs and more.

The source code for the drain() method for reading a given number of records from a Kafka topic is omitted for the sake of brevity. You can find it in the full example on GitHub.

JsonPath-based assertions come in handy for asserting the attributes of the expecting data change events, but of course you could also use any other JSON API for the job. When using Apache Avro instead of JSON as a serialization format, you’d have to use the Avro APIs instead.

Wrap-Up

Testcontainers and Debezium’s support for it make it fairly easy to write automated integration tests for your CDC set-up.

The testing approach discussed in this post could be expanded in multiple ways. E.g. it might be desirable to put your connector configuration under revision control (so you can manage and track any configuration changes) and drive the test using these configuration files. You also might take things one step further and test your entire data streaming pipeline. To do so, you’d have to deploy not only the Debezium connector(s), but also a sink connector, e.g. for your data warehouse or search server. You could then run assertions against the data in those sink systems, ensuring the correctness of your data pipeline end-to-end.

What’s your take on testing CDC set-ups and pipelines? Let us know in the comments below!

Gunnar Morling

Gunnar is a software engineer at Red Hat and open-source enthusiast by heart. A long-time Hibernate core team member, he's now the project lead of Debezium. Gunnar is the spec lead for Bean Validation 2.0 (JSR 380). He’s based in Hamburg, Germany.

   


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.