You are viewing documentation for an outdated version of Debezium.
If you want to view the latest stable version of this page, please go here.

Tutorial

This tutorial walks you through running Debezium 0.9.5.Final for change data capture (CDC). You will use Docker (1.9 or later) to start the Debezium services, run a MySQL database server with a simple example database, use Debezium to monitor the database, and see the resulting event streams respond as the data in the database changes.

You have already completed the tutorial? Try the fast track using Docker Compose, including example set-ups for all the databases supported by Debezium (MySQL, Postgres, MongoDB, SQL Server and Oracle).

What is Debezium?

Debezium is a distributed platform that turns your existing databases into event streams, so applications can see and respond immediately to each row-level change in the databases. Debezium is built on top of Apache Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, from where your application consumes them. This makes it possible for your application to easily consume all of the events correctly and completely. Even if your application stops (or crashes), upon restart it will start consuming the events where it left off so it misses nothing.

Debezium 0.9.5.Final includes support for monitoring MySQL database servers with its MySQL connector, and this is what we’ll use in this demonstration. Support for other DBMSes will be added in future releases.

Running Debezium with Docker

Running Debezium involves three major services: ZooKeeper, Kafka, and Debezium’s connector service. This tutorial walks you through starting a single instance of these services using Docker and Debezium’s Docker images. Production environments, on the other hand, require running multiple instances of each service to provide the performance, reliability, replication, and fault tolerance. This can be done with a platform like OpenShift and Kubernetes that manages multiple Docker containers running on multiple hosts and machines, but often you’ll want to install on dedicated hardware.

Starting Docker

Make sure that Docker is installed and running on Linux, OS X, or Windows. We highly recommend using the latest version of Docker on these platforms, and we’ve written these instructions with this in mind. (Running Docker in a virtual machine via Docker Machine is no longer the preferred approach, and Docker recommends you upgrade.)

Starting simple with Debezium

For simple evaluation and experimentation, this tutorial will walk you through starting a single instance of each service in a separate container on your local machine. ZooKeeper and Kafka both store data locally inside the container, and normal usage requires mounting directories on the host machines as volumes so that when the containers stop the persisted data will remain. We’re skipping that in this tutorial, although the documentation for our Docker images describes how to do that. This means that when a container is removed, all persisted data will be lost. That’s actually ideal for our experiment, since nothing will be left on your computer when we’re finished, and you can run this experiment many times without having to clean anything up in between.

Running multiple services locally can be confusing, so we’re going to use a separate terminal to run each container in the foreground. This way all of the output of a container will be displayed in the terminal used to run it.

This is not the only way to run Docker containers. Rather than running a container in the foreground (with -it), Docker lets you run a container in detached mode (with -d), where the container is started and the Docker command returns immediately. Detached mode containers don’t display their output in the terminal, though you can always see the output by using docker logs --follow --name <container-name>. This is one reason we name each of the containers we run. See the Docker documentation for more detail.

Start ZooKeeper

Of all the different services/processes that make up Debezium, the first one to start is ZooKeeper. Start a new terminal and start a container with ZooKeeper by running:

$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9

This runs a new container using version 0.9 of the debezium/zookeeper image, and assigns the name zookeeper to this container. The -it flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The --rm flag instructs Docker to remove the container when it is stopped. The three -p options map three of the container’s ports (e.g., 2181, 2888, and 3888) to the same ports on the Docker host so that other containers (and software outside the container) can talk with ZooKeeper.

You should see in your terminal the typical output of ZooKeeper:

Starting up in standalone mode
ZooKeeper JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,417 - INFO  [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,419 - INFO  [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2017-09-21 07:15:55,419 - INFO  [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
2017-09-21 07:15:55,420 - WARN  [main:QuorumPeerMain@113] - Either no config or no quorum defined in config, running  in standalone mode
2017-09-21 07:15:55,420 - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2017-09-21 07:15:55,425 - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
2017-09-21 07:15:55,427 - INFO  [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,427 - INFO  [main:ZooKeeperServerMain@96] - Starting server
2017-09-21 07:15:55,432 - INFO  [main:Environment@100] - Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
2017-09-21 07:15:55,432 - INFO  [main:Environment@100] - Server environment:host.name=51b46dd211d0
2017-09-21 07:15:55,432 - INFO  [main:Environment@100] - Server environment:java.version=1.8.0_131
2017-09-21 07:15:55,432 - INFO  [main:Environment@100] - Server environment:java.vendor=Oracle Corporation
2017-09-21 07:15:55,432 - INFO  [main:Environment@100] - Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-3.b12.el7_3.x86_64/jre
2017-09-21 07:15:55,432 - INFO  [main:Environment@100] - Server environment:java.class.path=/zookeeper/bin/../build/classes:/zookeeper/bin/../build/lib/*.jar:/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/zookeeper/bin/../lib/netty-3.10.5.Final.jar:/zookeeper/bin/../lib/log4j-1.2.16.jar:/zookeeper/bin/../lib/jline-0.9.94.jar:/zookeeper/bin/../zookeeper-3.4.10.jar:/zookeeper/bin/../src/java/lib/*.jar:/zookeeper/conf:
2017-09-21 07:15:55,432 - INFO  [main:Environment@100] - Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2017-09-21 07:15:55,432 - INFO  [main:Environment@100] - Server environment:java.io.tmpdir=/tmp
2017-09-21 07:15:55,433 - INFO  [main:Environment@100] - Server environment:java.compiler=<NA>
2017-09-21 07:15:55,433 - INFO  [main:Environment@100] - Server environment:os.name=Linux
2017-09-21 07:15:55,433 - INFO  [main:Environment@100] - Server environment:os.arch=amd64
2017-09-21 07:15:55,433 - INFO  [main:Environment@100] - Server environment:os.version=4.4.0-93-generic
2017-09-21 07:15:55,433 - INFO  [main:Environment@100] - Server environment:user.name=zookeeper
2017-09-21 07:15:55,433 - INFO  [main:Environment@100] - Server environment:user.home=/zookeeper
2017-09-21 07:15:55,433 - INFO  [main:Environment@100] - Server environment:user.dir=/zookeeper
2017-09-21 07:15:55,435 - INFO  [main:ZooKeeperServer@829] - tickTime set to 2000
2017-09-21 07:15:55,435 - INFO  [main:ZooKeeperServer@838] - minSessionTimeout set to -1
2017-09-21 07:15:55,435 - INFO  [main:ZooKeeperServer@847] - maxSessionTimeout set to -1
2017-09-21 07:15:55,440 - INFO  [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181

The last line is important and reports that ZooKeeper is ready and listening on port 2181. The terminal will continue to show additional output as ZooKeeper generates it.

Start Kafka

Open a new terminal, and use it to start Kafka in a new container by running:

$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9

In this tutorial we’re always connecting to Kafka from within a Docker container, and they’ll always be able to see and communicate with the kafka container as long as we link to the kafka container. If we wanted to connect to Kafka from outside of a Docker container, then we’d want Kafka to advertise its address via the Docker host, which we could do by adding -e ADVERTISED_HOST_NAME= followed by the IP address or resolvable hostname of the Docker host, which on Linux or Docker on Mac this is the IP address of the host computer (not localhost).

This runs a new container using version 0.9 of the debezium/kafka image, and assigns the name kafka to this container. The -it flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The --rm flag instructs Docker to remove the container when it is stopped. The command maps port 9092 in the container to the same port on the Docker host so that software outside of the container can talk with Kafka. Finally, the command uses the --link zookeeper:zookeeper argument to tell the container that it can find ZooKeeper in the container named zookeeper running on the same Docker host.

You should see in your terminal the typical output of Kafka, ending with:

...
2017-09-21 07:16:59,085 - INFO  [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
2017-09-21 07:16:59,218 - INFO  [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA
2017-09-21 07:16:59,221 - WARN  [main:Logging$class@85] - No meta.properties file under dir /kafka/data/1/meta.properties
2017-09-21 07:16:59,247 - INFO  [ThrottledRequestReaper-Fetch:Logging$class@70] - [ThrottledRequestReaper-Fetch]: Starting
2017-09-21 07:16:59,247 - INFO  [ThrottledRequestReaper-Produce:Logging$class@70] - [ThrottledRequestReaper-Produce]: Starting
2017-09-21 07:16:59,248 - INFO  [ThrottledRequestReaper-Request:Logging$class@70] - [ThrottledRequestReaper-Request]: Starting
2017-09-21 07:16:59,308 - INFO  [main:Logging$class@70] - Loading logs.
2017-09-21 07:16:59,312 - INFO  [main:Logging$class@70] - Logs loading complete in 4 ms.
2017-09-21 07:16:59,349 - INFO  [main:Logging$class@70] - Starting log cleanup with a period of 300000 ms.
2017-09-21 07:16:59,353 - INFO  [main:Logging$class@70] - Starting log flusher with a default period of 9223372036854775807 ms.
2017-09-21 07:16:59,385 - INFO  [main:Logging$class@70] - Awaiting socket connections on 172.17.0.4:9092.
2017-09-21 07:16:59,387 - INFO  [main:Logging$class@70] - [Socket Server on Broker 1], Started 1 acceptor threads
2017-09-21 07:16:59,394 - INFO  [ExpirationReaper-1-Produce:Logging$class@70] - [ExpirationReaper-1-Produce]: Starting
2017-09-21 07:16:59,395 - INFO  [ExpirationReaper-1-Fetch:Logging$class@70] - [ExpirationReaper-1-Fetch]: Starting
2017-09-21 07:16:59,395 - INFO  [ExpirationReaper-1-DeleteRecords:Logging$class@70] - [ExpirationReaper-1-DeleteRecords]: Starting
2017-09-21 07:16:59,435 - INFO  [ExpirationReaper-1-topic:Logging$class@70] - [ExpirationReaper-1-topic]: Starting
2017-09-21 07:16:59,441 - INFO  [ExpirationReaper-1-Heartbeat:Logging$class@70] - [ExpirationReaper-1-Heartbeat]: Starting
2017-09-21 07:16:59,442 - INFO  [controller-event-thread:Logging$class@70] - Creating /controller (is it secure? false)
2017-09-21 07:16:59,447 - INFO  [ExpirationReaper-1-Rebalance:Logging$class@70] - [ExpirationReaper-1-Rebalance]: Starting
2017-09-21 07:16:59,456 - INFO  [controller-event-thread:Logging$class@70] - Result of znode creation is: OK
2017-09-21 07:16:59,458 - INFO  [main:Logging$class@70] - [GroupCoordinator 1]: Starting up.
2017-09-21 07:16:59,459 - INFO  [main:Logging$class@70] - [GroupCoordinator 1]: Startup complete.
2017-09-21 07:16:59,460 - INFO  [group-metadata-manager-0:Logging$class@70] - [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds.
2017-09-21 07:16:59,487 - INFO  [main:Logging$class@70] - [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1
2017-09-21 07:16:59,530 - INFO  [main:Logging$class@70] - [Transaction Coordinator 1]: Starting up.
2017-09-21 07:16:59,532 - INFO  [TxnMarkerSenderThread-1:Logging$class@70] - [Transaction Marker Channel Manager 1]: Starting
2017-09-21 07:16:59,532 - INFO  [main:Logging$class@70] - [Transaction Coordinator 1]: Startup complete.
2017-09-21 07:16:59,551 - INFO  [main:Logging$class@70] - Will not load MX4J, mx4j-tools.jar is not in the classpath
2017-09-21 07:16:59,590 - INFO  [main:Logging$class@70] - Creating /brokers/ids/1 (is it secure? false)
2017-09-21 07:16:59,604 - INFO  [main:Logging$class@70] - Result of znode creation is: OK
2017-09-21 07:16:59,605 - INFO  [main:Logging$class@70] - Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(172.17.0.4,9092,ListenerName(PLAINTEXT),PLAINTEXT)
2017-09-21 07:16:59,606 - WARN  [main:Logging$class@85] - No meta.properties file under dir /kafka/data/1/meta.properties
2017-09-21 07:16:59,648 - INFO  [main:AppInfoParser$AppInfo@83] - Kafka version : 0.11.0.0
2017-09-21 07:16:59,648 - INFO  [main:AppInfoParser$AppInfo@84] - Kafka commitId : cb8625948210849f
2017-09-21 07:16:59,649 - INFO  [main:Logging$class@70] - [Kafka Server 1], started

The last line shown above reports that the Kafka broker has successfully started and is ready for client connections. The terminal will continue to show additional output as Kafka generates it.

Debezium 0.9.5.Final requires Kafka Connect 2.2.0, and in this tutorial we also use version 2.2.0 of the Kafka broker. Check the Kafka documentation about compatibility between different versions of Kafka Connect and the Kafka broker.

Start a MySQL database

At this point, we’ve started ZooKeeper and Kafka, but we don’t yet have a database server from which Debezium can capture changes. Now, let’s start a MySQL server with an example database.

Open a new terminal, and use it to start a new container that runs a MySQL database server preconfigured with an inventory database:

$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9

This runs a new container using version 0.9 of the debezium/example-mysql image, which is based on the mysql:5.7 image, defines and populate a sample "inventory" database, and creates a debezium user with password dbz that has the minimum privileges required by Debezium’s MySQL connector. The command assigns the name mysql to the container so that it can be easily referenced later. The -it flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The --rm flag instructs Docker to remove the container when it is stopped. The command maps port 3306 (the default MySQL port) in the container to the same port on the Docker host so that software outside of the container can connect to the database server. And finally, it also uses the -e option three times to set the MYSQL_ROOT_PASSWORD, MYSQL_USER, and MYSQL_PASSWORD environment variables to specific values.

You should see in your terminal something like the following:

...
017-09-21T07:18:50.824629Z 0 [Note] mysqld: ready for connections.
Version: '5.7.19-log'  socket: '/var/run/mysqld/mysqld.sock'  port: 3306  MySQL Community Server (GPL)

Notice that the MySQL server starts and stops a few times as the configuration is modified. The last line listed above reports that the MySQL server is running and ready for use.

Start a MySQL command line client

Open a new terminal, and use it to start a new container for the MySQL command line client and connect it to the MySQL server running in the mysql container:

$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

Here we start the container using the mysql:5.7 image, name the container mysqlterm and link it to the mysql container where the database server is running. The --rm option tells Docker to remove the container when it stops, and the rest of the command defines the shell command that the container should run. This shell command runs the MySQL command line client and specifies the correct options so that it can connect properly.

The container should output lines similar to the following:

mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 3
Server version: 5.7.17-log MySQL Community Server (GPL)

Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql>

Unlike the other containers, this container runs a process that produces a prompt. We’ll use the prompt to interact with the database. First, switch to the "inventory" database:

mysql> use inventory;

and then list the tables in the database:

mysql> show tables;

which should then display:

+---------------------+
| Tables_in_inventory |
+---------------------+
| customers           |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
4 rows in set (0.00 sec)

Use the MySQL command line client to explore the database and view the pre-loaded data in the database. For example:

mysql> SELECT * FROM customers;

Start Kafka Connect

Open a new terminal, and use it to start the Kafka Connect service in a new container by running:

$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.9

This runs a new Docker container named connect using version 0.9 of the debezium/connect image. The -it flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The --rm flag instructs Docker to remove the container when it is stopped. The command maps port 8083 in the container to the same port on the Docker host so that software outside of the container can use Kafka Connect’s REST API to set up and manage new connector instances. The command uses the --link zookeeper:zookeeper, --link kafka:kafka, and --link mysql:mysql, arguments to tell the container that it can find ZooKeeper running in the container named zookeeper, the Kafka broker running in the container named kafka, and the MySQL server running in the container named mysql, all running on the same Docker host. And finally, it also uses the -e option three times to set the GROUP_ID, CONFIG_STORAGE_TOPIC, OFFSET_STORAGE_TOPIC, and STATUS_STORAGE_TOPIC environment variables, which are all required by this Debezium image (though you can use different values as desired).

You should see in your terminal the typical output of Kafka, ending with:

...
2017-09-21 07:21:14,912 INFO   ||  Kafka version : 0.11.0.0   [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:21:14,912 INFO   ||  Kafka commitId : cb8625948210849f   [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:21:14,929 INFO   ||  Discovered coordinator 172.17.0.4:9092 (id: 2147483646 rack: null) for group 1.   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:21:14,931 INFO   ||  Finished reading KafkaBasedLog for topic my_connect_configs   [org.apache.kafka.connect.util.KafkaBasedLog]
2017-09-21 07:21:14,932 INFO   ||  Started KafkaBasedLog for topic my_connect_configs   [org.apache.kafka.connect.util.KafkaBasedLog]
2017-09-21 07:21:14,932 INFO   ||  Started KafkaConfigBackingStore   [org.apache.kafka.connect.storage.KafkaConfigBackingStore]
2017-09-21 07:21:14,932 INFO   ||  Herder started   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:21:14,938 INFO   ||  Discovered coordinator 172.17.0.4:9092 (id: 2147483646 rack: null) for group 1.   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:21:14,940 INFO   ||  (Re-)joining group 1   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:21:15,022 INFO   ||  Successfully joined group 1 with generation 1   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:21:15,022 INFO   ||  Joined group and got assignment: Assignment{error=0, leader='connect-1-4d60cb71-cb93-4388-8908-6f0d299a9d94', leaderUrl='http://172.17.0.7:9092/', offset=-1, connectorIds=[], taskIds=[]}   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:21:15,023 INFO   ||  Starting connectors and tasks using config offset -1   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:21:15,023 INFO   ||  Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]

The last few line shown above reports that the service has started and is ready for connections. The terminal will continue to show additional output as the Kafka Connect service generates it.

Using the Kafka Connect REST API

The Kafka Connect service exposes a RESTful API to manage the set of connectors, so let’s use that API using the curl command line tool. Because we mapped port 8083 in the connect container (where the Kafka Connect service is running) to port 8083 on the Docker host, we can communicate to the service by sending the request to port 8083 on the Docker host, which then forwards the request to the Kafka Connect service. We are using localhost in our examples but users of non-native Docker platforms (like Docker Toolbox users on Windows and OS X) should replace localhost with the IP address of their Docker host.

Open a new terminal, and use it to check the status of the Kafka Connect service:

$ curl -H "Accept:application/json" localhost:8083/

The Kafka Connect service should return a JSON response message similar to the following:

{"version":"2.2.0","commit":"cb8625948210849f"}

This shows that we’re running Kafka Connect version 2.2.0. Next, check the list of connectors, again using your IP address in place of localhost:

$ curl -H "Accept:application/json" localhost:8083/connectors/

which should return the following:

[]

This confirms that the Kafka Connect service is running, that we can talk with it, and that it currently has no connectors. Let’s remedy that by starting a connector that will capture changes from our MySQL database.

Monitor the MySQL database

At this point we are running the Debezium services, a MySQL database server with a sample inventory database, and the MySQL command line client that is connected to our database. The next step is to register a connector that will begin monitoring the MySQL database server’s binlog and generate change events for each row that has been (or will be) changed. Since this is a new connector, when it starts it will start reading from the beginning of the MySQL binlog, which records all of the transactions, including individual row changes and changes to the schemas.

Normally we’d likely want to use the Kafka tools to manually create the necessary topics, including specifying the number of replicas. However, for this tutorial, Kafka is configured to automatically create the topics with just 1 replica.

Using the same terminal, we’ll use curl to submit to our Kafka Connect service a JSON request message with information about the connector we want to start. Since this command will not be in a Docker container, we need to use the IP address of our Docker host (so Docker Toolbox users on Windows and OS X should replace localhost with their IP address):

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

Windows users may need to escape the double-quotes, like so:

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ \"name\": \"inventory-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\", \"tasks.max\": \"1\", \"database.hostname\": \"mysql\", \"database.port\": \"3306\", \"database.user\": \"debezium\", \"database.password\": \"dbz\", \"database.server.id\": \"184054\", \"database.server.name\": \"dbserver1\", \"database.whitelist\": \"inventory\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.inventory\" } }'

to avoid this error:

{"error_code":500,"message":"Unexpected character ('n' (code 110)): was expecting double-quote to start field name\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 4]"}

This command uses the Kafka Connect service’s RESTful API to submit a POST request against /connectors resource with a JSON document that describes our new connector. Here’s the same JSON message in a more readable format:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}

The JSON message specifies the connector name as inventory-connector, and provides the detailed configuration properties for our MySQL connector:

  • Exactly one task should operate at any one time. Since the MySQL connect reads the MySQL server’s binlog, and using a single connector task is the only way to ensure the proper order and that all events are handled properly.

  • The database host is specified as mysql, which is the name of our Docker container running the MySQL server. Recall that Docker manipulates the network stack within our containers so that each linked container can be resolved via the /etc/hosts using the container name for the hostname. If MySQL were running on a normal network, we’d simply specify the IP address or resolvable hostname for this value.

  • The MySQL server’s port is specified.

  • The MySQL database we’re running has a debezium user set up expressly for our purposes, so we specify that username and password here.

  • A unique server ID and name are given. The server name is the logical identifier for the MySQL server or cluster of servers, and will be used as the prefix for all Kafka topics.

  • We only want to detect changes in the inventory database, so we use a whitelist.

  • The connector should store the history of the database schemas in Kafka using the named broker (the same broker to which we’re sending events) and topic name. Upon restart, the connector will recover the schemas of the database(s) that existed at the point in time in the binlog when the connector should begin reading.

This command should produce a response similar to the following (perhaps a bit more compact):

HTTP/1.1 201 Created
Date: Tue, 07 Feb 2017 20:49:34 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 471
Server: Jetty(9.2.15.v20160210)

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.inventory",
    "name": "inventory-connector"
  },
  "tasks": []
}

This response describes the connector resource /connectors/inventory-connector that the service just created and includes the connector’s configuration and information about the tasks. Since the connector was just created, the service hasn’t yet finished starting tasks.

We can even use the RESTful API to verify that our connector is included in the list of connectors:

$ curl -H "Accept:application/json" localhost:8083/connectors/

which should return the following:

["inventory-connector"]

Recall that the Kafka Connect service uses connectors to start one or more tasks that do the work, and that it will automatically distribute the running tasks across the cluster of Kafka Connect services. Should any of the services stop or crash, those tasks will be redistributed to running services. We can see the tasks when we get the state of the connector:

$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

which returns:

HTTP/1.1 200 OK
Date: Mon, 27 Mar 2017 17:09:28 GMT
Content-Type: application/json
Content-Length: 515
Server: Jetty(9.2.15.v20160210)

{
  "name": "inventory-connector",
  "config": {
    "name": "inventory-connector",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.inventory"
  },
  "tasks": [
    {
      "connector": "inventory-connector",
      "task": 0
    }
  ]
}

Here, we can see that the connector is running a single task (e.g., task 0) to do its work. The MySQL connector only supports a single task, since MySQL records all of its activities in one sequential binlog and so the MySQL connector needs only one reader to get a consistent and totally ordered view of all of those events.

If we look at the output of our connect container, we see that the connector has generated a lot of output. The first few lines related to our connector are output by Kafka Connect, and start with:

...
2017-09-21 07:23:59,051 INFO   ||  Connector inventory-connector config updated   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,550 INFO   ||  Rebalance started   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,550 INFO   ||  Finished stopping tasks in preparation for rebalance   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,550 INFO   ||  (Re-)joining group 1   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:23:59,556 INFO   ||  Successfully joined group 1 with generation 2   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:23:59,556 INFO   ||  Joined group and got assignment: Assignment{error=0, leader='connect-1-4d60cb71-cb93-4388-8908-6f0d299a9d94', leaderUrl='http://172.17.0.7:9092/', offset=1, connectorIds=[inventory-connector], taskIds=[]}   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,557 INFO   ||  Starting connectors and tasks using config offset 1   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,557 INFO   ||  Starting connector inventory-connector   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
...

followed by a lot of output from Kafka Connect about starting this connector and the various producer and consumer configurations. Eventually, we see output like the following from our MySQL connector:

...
2017-09-21 07:24:01,151 INFO   MySQL|dbserver1|task  Kafka version : 0.11.0.0   [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:24:01,151 INFO   MySQL|dbserver1|task  Kafka commitId : cb8625948210849f   [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:24:01,584 INFO   MySQL|dbserver1|task  Found no existing offset, so preparing to perform a snapshot   [io.debezium.connector.mysql.MySqlConnectorTask]
2017-09-21 07:24:01,614 INFO   ||  Source task WorkerSourceTask{id=inventory-connector-0} finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2017-09-21 07:24:01,615 INFO   MySQL|dbserver1|snapshot  Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull with user 'debezium'   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,617 INFO   MySQL|dbserver1|snapshot  Snapshot is using user 'debezium' with these MySQL grants:   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,618 INFO   MySQL|dbserver1|snapshot         GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%'   [io.debezium.connector.mysql.SnapshotReader]
...

First, Debezium log output makes use of mapped diagnostic contexts, or MDC, which allow the log messages to include thread-specific information like the connector type (e.g., MySQL in the above log messages after "INFO" or "WARN" fields), the logical name of the connector (e.g., dbserver1 above), and the connector’s activity (e.g., task, snapshot and binlog). Hopefully these will make it easier to understand what is going on in the multi-threaded Kafka Connect service.

The first few lines involve the task activity of the connector, and basically report some bookkeeping information such that the connector was started with no prior offset. The new three lines involve the snapshot activity of the connector, specifically that a snapshot is being started using the debezium MySQL user and the MySQL grants associated with that user.

If the connector is not able to connect or does not see any tables or the binlog, check these grants to ensure that all of those listed above are included.

The next messages output by the connector are the following:

...
2017-09-21 07:24:01,618 INFO   MySQL|dbserver1|snapshot  MySQL server variables related to change data capture:   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO   MySQL|dbserver1|snapshot  	binlog_cache_size                             = 32768                                           [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO   MySQL|dbserver1|snapshot  	binlog_checksum                               = CRC32                                           [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO   MySQL|dbserver1|snapshot  	binlog_direct_non_transactional_updates       = OFF                                             [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO   MySQL|dbserver1|snapshot  	binlog_error_action                           = ABORT_SERVER                                    [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	binlog_format                                 = ROW                                             [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	binlog_group_commit_sync_delay                = 0                                               [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	binlog_group_commit_sync_no_delay_count       = 0                                               [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	binlog_gtid_simple_recovery                   = ON                                              [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	binlog_max_flush_queue_time                   = 0                                               [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	binlog_order_commits                          = ON                                              [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	binlog_row_image                              = FULL                                            [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	binlog_rows_query_log_events                  = OFF                                             [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	binlog_stmt_cache_size                        = 32768                                           [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	character_set_client                          = utf8                                            [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	character_set_connection                      = utf8                                            [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	character_set_database                        = latin1                                          [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	character_set_filesystem                      = binary                                          [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	character_set_results                         = utf8                                            [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	character_set_server                          = latin1                                          [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	character_set_system                          = utf8                                            [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	character_sets_dir                            = /usr/share/mysql/charsets/                      [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	collation_connection                          = utf8_general_ci                                 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	collation_database                            = latin1_swedish_ci                               [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	collation_server                              = latin1_swedish_ci                               [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	enforce_gtid_consistency                      = OFF                                             [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO   MySQL|dbserver1|snapshot  	gtid_executed_compression_period              = 1000                                            [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	gtid_mode                                     = OFF                                             [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	gtid_next                                     = AUTOMATIC                                       [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	gtid_owned                                    =                                                 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	gtid_purged                                   =                                                 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	innodb_api_enable_binlog                      = OFF                                             [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	innodb_locks_unsafe_for_binlog                = OFF                                             [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	innodb_version                                = 5.7.19                                          [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	log_statements_unsafe_for_binlog              = ON                                              [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	max_binlog_cache_size                         = 18446744073709547520                            [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	max_binlog_size                               = 1073741824                                      [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	max_binlog_stmt_cache_size                    = 18446744073709547520                            [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	protocol_version                              = 10                                              [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	session_track_gtids                           = OFF                                             [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	slave_type_conversions                        =                                                 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	sync_binlog                                   = 1                                               [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	system_time_zone                              = UTC                                             [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	time_zone                                     = SYSTEM                                          [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	tls_version                                   = TLSv1,TLSv1.1                                   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	tx_isolation                                  = REPEATABLE-READ                                 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	tx_read_only                                  = OFF                                             [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	version                                       = 5.7.19-log                                      [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	version_comment                               = MySQL Community Server (GPL)                    [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO   MySQL|dbserver1|snapshot  	version_compile_machine                       = x86_64                                          [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,628 INFO   MySQL|dbserver1|snapshot  	version_compile_os                            = Linux                                           [io.debezium.connector.mysql.SnapshotReader]
...

This reports the relevant MySQL server settings found by our MySQL connector. One of the most important is binlog_format, which is set to ROW. These lines are followed by the output of the 9 steps that make up the snapshot operation:

...
2017-09-21 07:24:01,628 INFO   MySQL|dbserver1|snapshot  Step 0: disabling autocommit and enabling repeatable read transactions   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,631 INFO   MySQL|dbserver1|snapshot  Step 1: start transaction with consistent snapshot   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,634 INFO   MySQL|dbserver1|snapshot  Step 2: flush and obtain global read lock to prevent writes to database   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,636 INFO   MySQL|dbserver1|snapshot  Step 3: read binlog position of MySQL master   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,638 INFO   MySQL|dbserver1|snapshot  	 using binlog 'mysql-bin.000003' at position '154' and gtid ''   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,638 INFO   MySQL|dbserver1|snapshot  Step 4: read list of available databases   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,638 INFO   MySQL|dbserver1|snapshot  	 list of available databases is: [information_schema, inventory, mysql, performance_schema, sys]   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,639 INFO   MySQL|dbserver1|snapshot  Step 5: read list of available tables in each database   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO   MySQL|dbserver1|snapshot  	 including 'inventory.customers'   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO   MySQL|dbserver1|snapshot  	 including 'inventory.orders'   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO   MySQL|dbserver1|snapshot  	 including 'inventory.products'   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO   MySQL|dbserver1|snapshot  	 including 'inventory.products_on_hand'   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,642 INFO   MySQL|dbserver1|snapshot  	 'mysql.columns_priv' is filtered out, discarding   [io.debezium.connector.mysql.SnapshotReader]
...
2017-09-21 07:24:01,670 INFO   MySQL|dbserver1|snapshot  	snapshot continuing with database(s): [inventory]   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,670 INFO   MySQL|dbserver1|snapshot  Step 6: generating DROP and CREATE statements to reflect current database schemas:   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,679 INFO   MySQL|dbserver1|snapshot  	SET character_set_server=latin1, collation_server=latin1_swedish_ci;   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,724 WARN   MySQL|dbserver1|task  Error while fetching metadata with correlation id 1 : {dbhistory.inventory=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:01,853 INFO   MySQL|dbserver1|snapshot  	DROP TABLE IF EXISTS `inventory`.`products_on_hand`   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,861 INFO   MySQL|dbserver1|snapshot  	DROP TABLE IF EXISTS `inventory`.`customers`   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,864 INFO   MySQL|dbserver1|snapshot  	DROP TABLE IF EXISTS `inventory`.`orders`   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,866 INFO   MySQL|dbserver1|snapshot  	DROP TABLE IF EXISTS `inventory`.`products`   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,881 INFO   MySQL|dbserver1|snapshot  	DROP DATABASE IF EXISTS `inventory`   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,889 INFO   MySQL|dbserver1|snapshot  	CREATE DATABASE `inventory`   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,893 INFO   MySQL|dbserver1|snapshot  	USE `inventory`   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,914 INFO   MySQL|dbserver1|snapshot  	CREATE TABLE `customers` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `first_name` varchar(255) NOT NULL,
  `last_name` varchar(255) NOT NULL,
  `email` varchar(255) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `email` (`email`)
) ENGINE=InnoDB AUTO_INCREMENT=1005 DEFAULT CHARSET=latin1   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,932 INFO   MySQL|dbserver1|snapshot  	CREATE TABLE `orders` (
  `order_number` int(11) NOT NULL AUTO_INCREMENT,
  `order_date` date NOT NULL,
  `purchaser` int(11) NOT NULL,
  `quantity` int(11) NOT NULL,
  `product_id` int(11) NOT NULL,
  PRIMARY KEY (`order_number`),
  KEY `order_customer` (`purchaser`),
  KEY `ordered_product` (`product_id`),
  CONSTRAINT `orders_ibfk_1` FOREIGN KEY (`purchaser`) REFERENCES `customers` (`id`),
  CONSTRAINT `orders_ibfk_2` FOREIGN KEY (`product_id`) REFERENCES `products` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10005 DEFAULT CHARSET=latin1   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,937 INFO   MySQL|dbserver1|snapshot  	CREATE TABLE `products` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  `description` varchar(512) DEFAULT NULL,
  `weight` float DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=110 DEFAULT CHARSET=latin1   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,941 INFO   MySQL|dbserver1|snapshot  	CREATE TABLE `products_on_hand` (
  `product_id` int(11) NOT NULL,
  `quantity` int(11) NOT NULL,
  PRIMARY KEY (`product_id`),
  CONSTRAINT `products_on_hand_ibfk_1` FOREIGN KEY (`product_id`) REFERENCES `products` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,947 INFO   MySQL|dbserver1|snapshot  Step 7: releasing global read lock to enable MySQL writes   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,949 INFO   MySQL|dbserver1|snapshot  Step 7: blocked writes to MySQL for a total of 00:00:00.312   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,950 INFO   MySQL|dbserver1|snapshot  Step 8: scanning contents of 4 tables while still in transaction   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,953 INFO   MySQL|dbserver1|snapshot  Step 8: - scanning table 'inventory.customers' (1 of 4 tables)   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,958 INFO   MySQL|dbserver1|snapshot  Step 8: - Completed scanning a total of 4 rows from table 'inventory.customers' after 00:00:00.005   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,959 INFO   MySQL|dbserver1|snapshot  Step 8: - scanning table 'inventory.orders' (2 of 4 tables)   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,014 INFO   MySQL|dbserver1|snapshot  Step 8: - Completed scanning a total of 4 rows from table 'inventory.orders' after 00:00:00.055   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,016 INFO   MySQL|dbserver1|snapshot  Step 8: - scanning table 'inventory.products' (3 of 4 tables)   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,017 INFO   MySQL|dbserver1|snapshot  Step 8: - Completed scanning a total of 9 rows from table 'inventory.products' after 00:00:00.001   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,018 INFO   MySQL|dbserver1|snapshot  Step 8: - scanning table 'inventory.products_on_hand' (4 of 4 tables)   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,019 INFO   MySQL|dbserver1|snapshot  Step 8: - Completed scanning a total of 9 rows from table 'inventory.products_on_hand' after 00:00:00.001   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,020 INFO   MySQL|dbserver1|snapshot  Step 8: scanned 26 rows in 4 tables in 00:00:00.069   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,020 INFO   MySQL|dbserver1|snapshot  Step 9: committing transaction   [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,021 INFO   MySQL|dbserver1|snapshot  Completed snapshot in 00:00:00.405   [io.debezium.connector.mysql.SnapshotReader]
...

Each of these steps reports what the connector is doing to perform the consistent snapshot. For example, Step 6 involves reverse engineering the DDL create statements for the tables that are being captured; Step 7 releases the global write lock just 0.3 seconds after acquiring it, and Step 8 reads all of the rows in each of the tables and reports the time taken and number of rows found. Note that in our example database, the MySQL connector completed its consistent snapshot in just 0.38 seconds.

This process will take longer with your databases, but the connector outputs enough log messages so that you can track what it is working on, even when the tables have very large numbers of rows. And although an exclusive write lock is used at the beginning of the snapshot process, this should be short even for large databases; this lock is released before any data is copied. See the MySQL connector documentation for more details.

The new five lines from Kafka Connect sound ominous, but basically tell us that new Kafka topics were created and Kafka had to assign a new leader for each:

...
2017-09-21 07:24:02,632 WARN   ||  Error while fetching metadata with correlation id 1 : {dbserver1=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:02,775 WARN   ||  Error while fetching metadata with correlation id 5 : {dbserver1.inventory.customers=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:02,910 WARN   ||  Error while fetching metadata with correlation id 9 : {dbserver1.inventory.orders=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:03,045 WARN   ||  Error while fetching metadata with correlation id 13 : {dbserver1.inventory.products=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:03,179 WARN   ||  Error while fetching metadata with correlation id 17 : {dbserver1.inventory.products_on_hand=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
...

Finally, we see a line reporting that the connector has transitioned from its snapshot mode into continuously reading the MySQL server’s binlog:

...
Sep 21, 2017 7:24:03 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:7)
2017-09-21 07:24:03,373 INFO   MySQL|dbserver1|binlog  Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows   [io.debezium.connector.mysql.BinlogReader]
2017-09-21 07:25:01,096 INFO   ||  Finished WorkerSourceTask{id=inventory-connector-0} commitOffsets successfully in 18 ms   [org.apache.kafka.connect.runtime.WorkerSourceTask]
...

Viewing the change events

We saw in the connector’s output that events were written to five topics:

  • dbserver1

  • dbserver1.inventory.products

  • dbserver1.inventory.products_on_hand

  • dbserver1.inventory.customers

  • dbserver1.inventory.orders

As described in the MySQL connector documentation, each topic names start with dbserver1, which is the logical name we gave our connector. The first is our schema change topic to which all of the DDL statements are written. The remaining four topics are used to capture the change events for each of our four tables, and their topic names include the database name (e.g., inventory) and the table name.

Let’s look at all of the data change events in the dbserver1.inventory.customers topic. We’ll use the debezium/kafka Docker image to start a new container that runs one of Kafka’s utilities to watch the topic from the beginning of the topic:

$ docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k dbserver1.inventory.customers

Again, we use the --rm flag since we want the container to be removed when it stops, and we use the -a flag on watch-topic to signal that we want to see all events since the beginning of the topic. (If we were to remove the -a flag, we’d see only the events that are recorded in the topic after we start watching.) The -k flag specifies that the output should include the event’s key, which in our case contains the row’s primary key. Here’s the output:

Using ZOOKEEPER_CONNECT=172.17.0.3:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.8:9092
Contents of topic dbserver1.inventory.customers:
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.9.5.Final","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1002}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"0.9.5.Final","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1003}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"0.9.5.Final","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}

This utility keeps watching, so any new events would automatically appear as long as the utility keeps running. And this watch-topic utility is very simple and is limited in functionality and usefulness - we use it here simply to get an understanding of the kind of events that our connector generates. Applications that want to consume events would instead use Kafka consumers, and those consumer libraries offer far more flexibility and power. In fact, properly configured clients enable our applications to never miss any events, even when those applications crash or shutdown gracefullly.

These events happen to be encoded in JSON, since that’s how we configured our Kafka Connect service. Each event includes one JSON document for the key, and one for the value. Let’s look at the last event in more detail, by first reformatting the event’s key to be easier to read:

{
  "schema": {
    "type": "struct",
    "name": "dbserver1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

The event’s key has two parts: a schema and payload. The schema contains a Kafka Connect schema describing what is in the payload, and in our case that means that the payload is a struct named dbserver1.inventory.customers.Key that is not optional and has one required field named id of type int32.

If we look at the value of the key’s payload field, we’ll see that it is indeed a structure (which in JSON is just an object) with a single id field, whose value is 1004.

Therefore, we interpret this event as applying to the row in the inventory.customers table (output from the connector named dbserver1) whose id primary key column had a value of 1004.

Now let’s look at the same event’s value, which again we reformat to be easier to read:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_sec"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": {
      "version": "0.9.5.Final",
      "name": "dbserver1",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": "inventory",
      "table": "customers"
    },
    "op": "c",
    "ts_ms": 1486500577691
  }
}

This portion of the event is much larger, but like the event’s key this, too, has a schema and a payload. The schema contains a Kafka Connect schema named dbserver1.inventory.customers.Envelope (version 1) that can contain 5 fields:

  • op is a mandatory field that contains a string value describing the type of operation. Values for the MySQL connector are c for create (or insert), u for update, d for delete, and r for read (in the case of a non-initial snapshot).

  • before is an optional field that if present contains the state of the row before the event occurred. The structure will be described by the dbserver1.inventory.customers.Value Kafka Connect schema, which the dbserver1 connector uses for all rows in the inventory.customers table.

  • after is an optional field that if present contains the state of the row after the event occurred. The structure is described by the same dbserver1.inventory.customers.Value Kafka Connect schema used in before.

  • source is a mandatory field that contains a structure describing the source metadata for the event, which in the case of MySQL contains several fields: the connector name, the name of the binlog file where the event was recorded, the position in that binlog file where the event appeared, the row within the event (if there is more than one), the names of the affected database and table, the MySQL thread ID that made the change, whether this event was part of a snapshot, and if available the MySQL server ID, and the timestamp in seconds.

  • ts_ms is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.

If we look at the payload of the event’s value, we can see the information in the event, namely that it is describing that the row was created, contains the id, first_name, last_name, and email of the inserted row.

You may have noticed that the JSON representations of the events are much larger than the rows they describe. This is because Kafka Connect ships with every event key and value the schema that describes the payload. Over time, this structure may change, and having the schemas for the key and value in the event itself makes it much easier for consuming applications to understand the messages, especially as they evolve over time.

The Debezium MySQL connector constructs these schemas based upon the structure of the database tables. If you use DDL statements to alter the table definitions in the MySQL databases, the connector reads these DDL statements and updates its Kafka Connect schemas. This is the only way that each event is structured exactly like the table from where it originated at the time the event occurred. But the Kafka topic containing all of the events for a single table might have events that correspond to each state of the table definition.

The JSON converter does produce very verbose events since it includes the key and value schemas in every message. The Avro converter, on the other hand, is far smarter and results in far smaller event messages. The Avro converter transforms each Kafka Connect schema into an Avro schema and stores the Avro schemas in a separate Schema Registry service. Thus when the Avro converter serializes an event message, it places only an unique identifier for the schema along with an Avro-encoded binary representation of the value. Thus, the serialized messages transferred over the wire and stored in Kafka are far smaller than they appear above. In fact, the Avro Converter is able to use Avro schema evolution techniques to maintain the history of each schema in the Schema Registry.

We can compare these to the state of the database. Go back to the terminal that is running the MySQL command line client, and run the following statement:

mysql> SELECT * FROM customers;

which produces the following output:

+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)

As we can see, all of our event records match the database.

Now that we’re monitoring changes, what happens when we change one of the records in the database? Run the following statement in the MySQL command line client:

mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;

which produces the following output:

Query OK, 1 row affected (0.05 sec)
Rows matched: 1  Changed: 1  Warnings: 0

Rerun the select …​ statement to see the updated table:

mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne Marie | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)

Now, go back to the terminal running watch-topic and we should see a new fifth event:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635059,"gtid":null,"file":"mysql-bin.000003","pos":364,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"u","ts_ms":1490635059389}}

Let’s reformat the new event’s key to be easier to read:

{
  "schema": {
    "type": "struct",
    "name": "dbserver1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

This key is exactly the same key as what we saw in the fourth record. Here’s that new event’s value formatted to be easier to read:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_sec"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope",
    "version": 1
  },
  "payload": {
    "before": {
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": {
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": {
      "name": "0.9.5.Final",
      "name": "dbserver1",
      "server_id": 223344,
      "ts_sec": 1486501486,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 364,
      "row": 0,
      "snapshot": null,
      "thread": 3,
      "db": "inventory",
      "table": "customers"
    },
    "op": "u",
    "ts_ms": 1486501486308
  }
}

When we compare this to the value in the fourth event, we see no changes in the schema section and a couple of changes in the payload section:

  • The op field value is now u, signifying that this row changed because of an update

  • The before field now has the state of the row with the values before the database commit

  • The after field now has the updated state of the row, and here was can see that the first_name value is now Anne Marie.

  • The source field structure has many of the same values as before, except the ts_sec and pos fields have changed (and the file might have changed in other circumstances).

  • The ts_ms shows the timestamp that Debezium processed this event.

There are several things we can learn by just looking at this payload section. We can compare the before and after structures to determine what actually changed in this row because of the commit. The source structure tells us information about MySQL’s record of this change (providing traceability), but more importantly this has information we can compare to other events in this and other topics to know whether this event occurred before, after, or as part of the same MySQL commit as other events.

So far we’ve seen samples of create and update events. Now, let’s look at delete events. Since Anne Marie has not placed any orders, we can remove her record from our database using the MySQL command line client:

mysql> DELETE FROM customers WHERE id=1004;

In our terminal running watch-topic, we see two new events:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":null,"source":{"version":"0.9.5.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635100,"gtid":null,"file":"mysql-bin.000003","pos":725,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"d","ts_ms":1490635100301}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}}	{"schema":null,"payload":null}

What happened? We only deleted one row, but we now have two events. To understand what the MySQL connector does, let’s look at the first of our two new messages. Here’s the key reformatted to be easier to read:

{
  "schema": {
    "type": "struct",
    "name": "dbserver1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

Once again, this key is exactly the same key as in the previous two events we looked at. Here’s the value of the first new event, formatted to be easier to read:

{
  "schema": {...},
  "payload": {
    "before": {
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": null,
    "source": {
      "name": "0.9.5.Final",
      "name": "dbserver1",
      "server_id": 223344,
      "ts_sec": 1486501558,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 725,
      "row": 0,
      "snapshot": null,
      "thread": 3,
      "db": "inventory",
      "table": "customers"
    },
    "op": "d",
    "ts_ms": 1486501558315
}

Again, the schema is identical to the previous messages, but the payload fragment has a few things of note:

  • The op field value is now d, signifying that this row was deleted

  • The before field now has the state of the row that was deleted with the database commit

  • The after field is null, signifying that the row no longer exists

  • The source field structure has many of the same values as before, except the ts_sec and pos fields have changed (and the file might have changed in other circumstances).

  • The ts_ms shows the timestamp that Debezium processed this event.

This event gives a consumer all kinds of information that it can use to process the removal of this row. We include the old values because some consumers might require them in order to properly handle the removal, and without it they may have to resort to far more complex behavior.

Remember that we saw two events when we deleted the row? Let’s look at that second event. Here’s the key for the event:

{
  "schema": {
    "type": "struct",
    "name": "dbserver1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

Once again, this key is exactly the same key as in the previous three events we looked at. Here’s the value of that same event:

{
  "schema": null,
  "payload": null
}

What gives? Well, all of the Kafka topics that the MySQL connector writes to can be set up to be log compacted, which means that Kafka can remove older messages from the topic as long as there is at least one message later in the topic with the exact same key. This is Kafka’s way to collect the garbage. This last event is what Debezium calls a tombstone event, and because it has a key and an empty value Kafka understands it can remove all prior messages with this same key.

Kafka log compaction is great, because it still allows consumers to read the topic from the very beginning and not miss any events.

Restart the Kafka Connect service

One feature of the Kafka Connect service is that it automatically manages tasks for the registered connectors. And, because it stores its data in Kafka, if a running service stops or goes away completely, upon restart (perhaps on another host) the server will start any non-running tasks. To demostrate this, let’s stop our Kafka Connect service, change some data in the database, and restart our service.

In a new terminal, use the following Docker commands to stop the connect container that is running our Kafka Connect service:

$ docker stop connect

Stopping the container like this stops the process running inside of it, but the Kafka Connect service handles this by gracefully shutting down. And because we ran the container with the --rm flag, Docker removed the container after it stopped it.

While the service is down, let’s go back to the MySQL command line client and add a few records:

mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");

Notice that in the terminal where we’re running watch-topic, there’s been no update. Also, we’re still able to watch the topic because Kafka is still running.

In a production system, you would have enough brokers to handle the producers and consumers, and to maintain a minimum number of in sync replicas for each topic. So if enough brokers fail such that there are not the minimum number of ISRs, Kafka should become unavailable. Producers, like the Debezium connectors, and consumers will simply wait patiently for the Kafka cluster or network to recover. Yes, that means that your consumers might temporarily see no change events as data is changed in the databases, but that’s because none are being produced. As soon as the Kafka cluster is restarted or the network recovers, Debezium will continue producing change events and your consumers will continue consuming events where they left off.

Now, in a new terminal, start a new container using the same command we used before:

$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.9

This creates a whole new container that runs the Kafka Connect distributed service, and since we’ve intialized it with the same topic information, the new service connects to Kafka, read the previous service’s configuration, and starts the registered connectors that will continue exactly where they last left off.

Here’s the last few lines from this restarted service:

...
2017-09-21 07:38:48,385 INFO   MySQL|dbserver1|task  Kafka version : 0.11.0.0   [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:38:48,386 INFO   MySQL|dbserver1|task  Kafka commitId : cb8625948210849f   [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:38:48,390 INFO   MySQL|dbserver1|task  Discovered coordinator 172.17.0.4:9092 (id: 2147483646 rack: null) for group inventory-connector-dbhistory.   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,390 INFO   MySQL|dbserver1|task  Revoking previously assigned partitions [] for group inventory-connector-dbhistory   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2017-09-21 07:38:48,391 INFO   MySQL|dbserver1|task  (Re-)joining group inventory-connector-dbhistory   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,402 INFO   MySQL|dbserver1|task  Successfully joined group inventory-connector-dbhistory with generation 1   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,403 INFO   MySQL|dbserver1|task  Setting newly assigned partitions [dbhistory.inventory-0] for group inventory-connector-dbhistory   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2017-09-21 07:38:48,888 INFO   MySQL|dbserver1|task  Step 0: Get all known binlogs from MySQL   [io.debezium.connector.mysql.MySqlConnectorTask]
2017-09-21 07:38:48,903 INFO   MySQL|dbserver1|task  MySQL has the binlog file 'mysql-bin.000003' required by the connector   [io.debezium.connector.mysql.MySqlConnectorTask]
Sep 21, 2017 7:38:49 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:10)
2017-09-21 07:38:49,045 INFO   MySQL|dbserver1|binlog  Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows   [io.debezium.connector.mysql.BinlogReader]
2017-09-21 07:38:49,046 INFO   ||  Source task WorkerSourceTask{id=inventory-connector-0} finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSourceTask]

As you can see, these lines show that the service finds the offsets previously recorded by the last task before it was shut down, and that it then connects to the MySQL database, starts reading the binlog from that position, and generates events from any changes in the MySQL database since that point in time.

Jump back to the terminal running watch-topic, and you should now see events for our two new records:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"version":"0.9.5.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635153,"gtid":null,"file":"mysql-bin.000003","pos":1046,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181455}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1006}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"version":"0.9.5.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635160,"gtid":null,"file":"mysql-bin.000003","pos":1356,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181456}}

These events are create events that are similar to what we saw before. The important point to understand, though, is that Debezium will still report all of the changes in a database even when it is not running, as long as it is restarted before the MySQL database starts purging those commits we missed from its binlog.

Exploration

Go ahead and use the MySQL command line client to add, modify, and remove rows to the database tables, and see the effect on the topics. You may need to run a separate watch-topic command for each topic. And remember that you can’t remove a row that is referenced by a foreign key. Have fun!

Clean up

You can use Docker to stop all of the running containers:

$ docker stop mysqlterm watcher connect mysql kafka zookeeper

Again, since we used the --rm flag when starting the connectors, Docker should remove them right after it stops them. We can verify that all of the other processes are stopped and removed:

$ docker ps -a

Of course, if any are still running, simply stop them using docker stop <name> or docker stop <containerId>.

Docker Compose setup

If you have already completed the tutorial and you would like to go again through the setup quickly, then you can use a Docker Compose version of this tutorial located in our examples repository. We provide Docker Compose files for running the tutorial with MySQL, Postgres, MongoDB, SQL Server and Oracle. Please follow the steps described in the readme file.