Tutorial
This tutorial walks you through running Debezium 0.10.0.Final for change data capture (CDC). You will use Docker (1.9 or later) to start the Debezium services, run a MySQL database server with a simple example database, use Debezium to monitor the database, and see the resulting event streams respond as the data in the database changes.
You have already completed the tutorial? Try the fast track using Docker Compose, including example set-ups for all the databases supported by Debezium (MySQL, Postgres, MongoDB, SQL Server and Oracle). |
What is Debezium?
Debezium is a distributed platform that turns your existing databases into event streams, so applications can see and respond immediately to each row-level change in the databases. Debezium is built on top of Apache Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, from where your application consumes them. This makes it possible for your application to easily consume all of the events correctly and completely. Even if your application stops (or crashes), upon restart it will start consuming the events where it left off so it misses nothing.
Debezium 0.10.0.Final includes support for monitoring MySQL database servers with its MySQL connector, and this is what we’ll use in this demonstration. Support for other DBMSes will be added in future releases.
Running Debezium with Docker
Running Debezium involves three major services: ZooKeeper, Kafka, and Debezium’s connector service. This tutorial walks you through starting a single instance of these services using Docker and Debezium’s Docker images. Production environments, on the other hand, require running multiple instances of each service to provide the performance, reliability, replication, and fault tolerance. This can be done with a platform like OpenShift and Kubernetes that manages multiple Docker containers running on multiple hosts and machines, but often you’ll want to install on dedicated hardware.
Starting Docker
Make sure that Docker is installed and running on Linux, OS X, or Windows. We highly recommend using the latest version of Docker on these platforms, and we’ve written these instructions with this in mind. (Running Docker in a virtual machine via Docker Machine is no longer the preferred approach, and Docker recommends you upgrade.)
Starting simple with Debezium
For simple evaluation and experimentation, this tutorial will walk you through starting a single instance of each service in a separate container on your local machine. ZooKeeper and Kafka both store data locally inside the container, and normal usage requires mounting directories on the host machines as volumes so that when the containers stop the persisted data will remain. We’re skipping that in this tutorial, although the documentation for our Docker images describes how to do that. This means that when a container is removed, all persisted data will be lost. That’s actually ideal for our experiment, since nothing will be left on your computer when we’re finished, and you can run this experiment many times without having to clean anything up in between.
Running multiple services locally can be confusing, so we’re going to use a separate terminal to run each container in the foreground. This way all of the output of a container will be displayed in the terminal used to run it.
This is not the only way to run Docker containers. Rather than running a container in the foreground (with |
Start ZooKeeper
Of all the different services/processes that make up Debezium, the first one to start is ZooKeeper. Start a new terminal and start a container with ZooKeeper by running:
$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.10
This runs a new container using version 0.10 of the debezium/zookeeper
image, and assigns the name zookeeper
to this container. The -it
flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The --rm
flag instructs Docker to remove the container when it is stopped. The three -p
options map three of the container’s ports (e.g., 2181, 2888, and 3888) to the same ports on the Docker host so that other containers (and software outside the container) can talk with ZooKeeper.
You should see in your terminal the typical output of ZooKeeper:
Starting up in standalone mode ZooKeeper JMX enabled by default Using config: /zookeeper/conf/zoo.cfg 2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg 2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3 2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1 2017-09-21 07:15:55,420 - WARN [main:QuorumPeerMain@113] - Either no config or no quorum defined in config, running in standalone mode 2017-09-21 07:15:55,420 - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started. 2017-09-21 07:15:55,425 - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed. 2017-09-21 07:15:55,427 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg 2017-09-21 07:15:55,427 - INFO [main:ZooKeeperServerMain@96] - Starting server 2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT 2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:host.name=51b46dd211d0 2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.version=1.8.0_131 2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.vendor=Oracle Corporation 2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-3.b12.el7_3.x86_64/jre 2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.class.path=/zookeeper/bin/../build/classes:/zookeeper/bin/../build/lib/*.jar:/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/zookeeper/bin/../lib/netty-3.10.5.Final.jar:/zookeeper/bin/../lib/log4j-1.2.16.jar:/zookeeper/bin/../lib/jline-0.9.94.jar:/zookeeper/bin/../zookeeper-3.4.10.jar:/zookeeper/bin/../src/java/lib/*.jar:/zookeeper/conf: 2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.io.tmpdir=/tmp 2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:java.compiler=<NA> 2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:os.name=Linux 2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:os.arch=amd64 2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:os.version=4.4.0-93-generic 2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:user.name=zookeeper 2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:user.home=/zookeeper 2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:user.dir=/zookeeper 2017-09-21 07:15:55,435 - INFO [main:ZooKeeperServer@829] - tickTime set to 2000 2017-09-21 07:15:55,435 - INFO [main:ZooKeeperServer@838] - minSessionTimeout set to -1 2017-09-21 07:15:55,435 - INFO [main:ZooKeeperServer@847] - maxSessionTimeout set to -1 2017-09-21 07:15:55,440 - INFO [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181
The last line is important and reports that ZooKeeper is ready and listening on port 2181. The terminal will continue to show additional output as ZooKeeper generates it.
Start Kafka
Open a new terminal, and use it to start Kafka in a new container by running:
$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.10
In this tutorial we’re always connecting to Kafka from within a Docker container, and they’ll always be able to see and communicate with the |
This runs a new container using version 0.10 of the debezium/kafka
image, and assigns the name kafka
to this container. The -it
flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The --rm
flag instructs Docker to remove the container when it is stopped. The command maps port 9092 in the container to the same port on the Docker host so that software outside of the container can talk with Kafka. Finally, the command uses the --link zookeeper:zookeeper
argument to tell the container that it can find ZooKeeper in the container named zookeeper
running on the same Docker host.
You should see in your terminal the typical output of Kafka, ending with:
... 2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected) 2017-09-21 07:16:59,218 - INFO [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA 2017-09-21 07:16:59,221 - WARN [main:Logging$class@85] - No meta.properties file under dir /kafka/data/1/meta.properties 2017-09-21 07:16:59,247 - INFO [ThrottledRequestReaper-Fetch:Logging$class@70] - [ThrottledRequestReaper-Fetch]: Starting 2017-09-21 07:16:59,247 - INFO [ThrottledRequestReaper-Produce:Logging$class@70] - [ThrottledRequestReaper-Produce]: Starting 2017-09-21 07:16:59,248 - INFO [ThrottledRequestReaper-Request:Logging$class@70] - [ThrottledRequestReaper-Request]: Starting 2017-09-21 07:16:59,308 - INFO [main:Logging$class@70] - Loading logs. 2017-09-21 07:16:59,312 - INFO [main:Logging$class@70] - Logs loading complete in 4 ms. 2017-09-21 07:16:59,349 - INFO [main:Logging$class@70] - Starting log cleanup with a period of 300000 ms. 2017-09-21 07:16:59,353 - INFO [main:Logging$class@70] - Starting log flusher with a default period of 9223372036854775807 ms. 2017-09-21 07:16:59,385 - INFO [main:Logging$class@70] - Awaiting socket connections on 172.17.0.4:9092. 2017-09-21 07:16:59,387 - INFO [main:Logging$class@70] - [Socket Server on Broker 1], Started 1 acceptor threads 2017-09-21 07:16:59,394 - INFO [ExpirationReaper-1-Produce:Logging$class@70] - [ExpirationReaper-1-Produce]: Starting 2017-09-21 07:16:59,395 - INFO [ExpirationReaper-1-Fetch:Logging$class@70] - [ExpirationReaper-1-Fetch]: Starting 2017-09-21 07:16:59,395 - INFO [ExpirationReaper-1-DeleteRecords:Logging$class@70] - [ExpirationReaper-1-DeleteRecords]: Starting 2017-09-21 07:16:59,435 - INFO [ExpirationReaper-1-topic:Logging$class@70] - [ExpirationReaper-1-topic]: Starting 2017-09-21 07:16:59,441 - INFO [ExpirationReaper-1-Heartbeat:Logging$class@70] - [ExpirationReaper-1-Heartbeat]: Starting 2017-09-21 07:16:59,442 - INFO [controller-event-thread:Logging$class@70] - Creating /controller (is it secure? false) 2017-09-21 07:16:59,447 - INFO [ExpirationReaper-1-Rebalance:Logging$class@70] - [ExpirationReaper-1-Rebalance]: Starting 2017-09-21 07:16:59,456 - INFO [controller-event-thread:Logging$class@70] - Result of znode creation is: OK 2017-09-21 07:16:59,458 - INFO [main:Logging$class@70] - [GroupCoordinator 1]: Starting up. 2017-09-21 07:16:59,459 - INFO [main:Logging$class@70] - [GroupCoordinator 1]: Startup complete. 2017-09-21 07:16:59,460 - INFO [group-metadata-manager-0:Logging$class@70] - [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. 2017-09-21 07:16:59,487 - INFO [main:Logging$class@70] - [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 2017-09-21 07:16:59,530 - INFO [main:Logging$class@70] - [Transaction Coordinator 1]: Starting up. 2017-09-21 07:16:59,532 - INFO [TxnMarkerSenderThread-1:Logging$class@70] - [Transaction Marker Channel Manager 1]: Starting 2017-09-21 07:16:59,532 - INFO [main:Logging$class@70] - [Transaction Coordinator 1]: Startup complete. 2017-09-21 07:16:59,551 - INFO [main:Logging$class@70] - Will not load MX4J, mx4j-tools.jar is not in the classpath 2017-09-21 07:16:59,590 - INFO [main:Logging$class@70] - Creating /brokers/ids/1 (is it secure? false) 2017-09-21 07:16:59,604 - INFO [main:Logging$class@70] - Result of znode creation is: OK 2017-09-21 07:16:59,605 - INFO [main:Logging$class@70] - Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(172.17.0.4,9092,ListenerName(PLAINTEXT),PLAINTEXT) 2017-09-21 07:16:59,606 - WARN [main:Logging$class@85] - No meta.properties file under dir /kafka/data/1/meta.properties 2017-09-21 07:16:59,648 - INFO [main:AppInfoParser$AppInfo@83] - Kafka version : 0.11.0.0 2017-09-21 07:16:59,648 - INFO [main:AppInfoParser$AppInfo@84] - Kafka commitId : cb8625948210849f 2017-09-21 07:16:59,649 - INFO [main:Logging$class@70] - [Kafka Server 1], started
The last line shown above reports that the Kafka broker has successfully started and is ready for client connections. The terminal will continue to show additional output as Kafka generates it.
Debezium 0.10.0.Final requires Kafka Connect 2.3.0, and in this tutorial we also use version 2.3.0 of the Kafka broker. Check the Kafka documentation about compatibility between different versions of Kafka Connect and the Kafka broker. |
Start a MySQL database
At this point, we’ve started ZooKeeper and Kafka, but we don’t yet have a database server from which Debezium can capture changes. Now, let’s start a MySQL server with an example database.
Open a new terminal, and use it to start a new container that runs a MySQL database server preconfigured with an inventory
database:
$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.10
This runs a new container using version 0.10 of the debezium/example-mysql
image, which is based on the mysql:5.7 image, defines and populates a sample "inventory" database, and creates a debezium
user with password dbz
that has the minimum privileges required by Debezium’s MySQL connector. The command assigns the name mysql
to the container so that it can be easily referenced later. The -it
flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The --rm
flag instructs Docker to remove the container when it is stopped. The command maps port 3306 (the default MySQL port) in the container to the same port on the Docker host so that software outside of the container can connect to the database server. And finally, it also uses the -e
option three times to set the MYSQL_ROOT_PASSWORD
, MYSQL_USER
, and MYSQL_PASSWORD
environment variables to specific values.
You should see in your terminal something like the following:
... 017-09-21T07:18:50.824629Z 0 [Note] mysqld: ready for connections. Version: '5.7.19-log' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL)
Notice that the MySQL server starts and stops a few times as the configuration is modified. The last line listed above reports that the MySQL server is running and ready for use.
Start a MySQL command line client
Open a new terminal, and use it to start a new container for the MySQL command line client and connect it to the MySQL server running in the mysql
container:
$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
Here we start the container using the mysql:5.7 image, name the container mysqlterm
and link it to the mysql
container where the database server is running. The --rm
option tells Docker to remove the container when it stops, and the rest of the command defines the shell command that the container should run. This shell command runs the MySQL command line client and specifies the correct options so that it can connect properly.
The container should output lines similar to the following:
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 3
Server version: 5.7.17-log MySQL Community Server (GPL)
Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
Unlike the other containers, this container runs a process that produces a prompt. We’ll use the prompt to interact with the database. First, switch to the "inventory" database:
mysql> use inventory;
and then list the tables in the database:
mysql> show tables;
which should then display:
+---------------------+
| Tables_in_inventory |
+---------------------+
| customers |
| orders |
| products |
| products_on_hand |
+---------------------+
4 rows in set (0.00 sec)
Use the MySQL command line client to explore the database and view the pre-loaded data in the database. For example:
mysql> SELECT * FROM customers;
Start Kafka Connect
Open a new terminal, and use it to start the Kafka Connect service in a new container by running:
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.10
This runs a new Docker container named connect
using version 0.10 of the debezium/connect
image. The -it
flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The --rm
flag instructs Docker to remove the container when it is stopped. The command maps port 8083 in the container to the same port on the Docker host so that software outside of the container can use Kafka Connect’s REST API to set up and manage new connector instances. The command uses the --link zookeeper:zookeeper
, --link kafka:kafka
, and --link mysql:mysql
, arguments to tell the container that it can find ZooKeeper running in the container named zookeeper
, the Kafka broker running in the container named kafka
, and the MySQL server running in the container named mysql
, all running on the same Docker host. And finally, it also uses the -e
option three times to set the GROUP_ID
, CONFIG_STORAGE_TOPIC
, OFFSET_STORAGE_TOPIC
, and STATUS_STORAGE_TOPIC
environment variables, which are all required by this Debezium image (though you can use different values as desired).
You should see in your terminal the typical output of Kafka Connect terminal, ending with:
... 2017-09-21 07:21:14,912 INFO || Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser] 2017-09-21 07:21:14,912 INFO || Kafka commitId : cb8625948210849f [org.apache.kafka.common.utils.AppInfoParser] 2017-09-21 07:21:14,929 INFO || Discovered coordinator 172.17.0.4:9092 (id: 2147483646 rack: null) for group 1. [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:21:14,931 INFO || Finished reading KafkaBasedLog for topic my_connect_configs [org.apache.kafka.connect.util.KafkaBasedLog] 2017-09-21 07:21:14,932 INFO || Started KafkaBasedLog for topic my_connect_configs [org.apache.kafka.connect.util.KafkaBasedLog] 2017-09-21 07:21:14,932 INFO || Started KafkaConfigBackingStore [org.apache.kafka.connect.storage.KafkaConfigBackingStore] 2017-09-21 07:21:14,932 INFO || Herder started [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2017-09-21 07:21:14,938 INFO || Discovered coordinator 172.17.0.4:9092 (id: 2147483646 rack: null) for group 1. [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:21:14,940 INFO || (Re-)joining group 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:21:15,022 INFO || Successfully joined group 1 with generation 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:21:15,022 INFO || Joined group and got assignment: Assignment{error=0, leader='connect-1-4d60cb71-cb93-4388-8908-6f0d299a9d94', leaderUrl='http://172.17.0.7:9092/', offset=-1, connectorIds=[], taskIds=[]} [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2017-09-21 07:21:15,023 INFO || Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2017-09-21 07:21:15,023 INFO || Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
The last few line shown above reports that the service has started and is ready for connections. The terminal will continue to show additional output as the Kafka Connect service generates it.
Using the Kafka Connect REST API
The Kafka Connect service exposes a RESTful API to manage the set of connectors, so let’s use that API using the curl
command line tool. Because we mapped port 8083 in the connect
container (where the Kafka Connect service is running) to port 8083 on the Docker host, we can communicate to the service by sending the request to port 8083 on the Docker host, which then forwards the request to the Kafka Connect service. We are using localhost
in our examples but users of non-native Docker platforms (like Docker Toolbox users on Windows and OS X) should replace localhost
with the IP address of their Docker host.
Open a new terminal, and use it to check the status of the Kafka Connect service:
$ curl -H "Accept:application/json" localhost:8083/
The Kafka Connect service should return a JSON response message similar to the following:
{"version":"2.3.0","commit":"cb8625948210849f"}
This shows that we’re running Kafka Connect version 2.3.0. Next, check the list of connectors, again using your IP address in place of localhost
:
$ curl -H "Accept:application/json" localhost:8083/connectors/
which should return the following:
[]
This confirms that the Kafka Connect service is running, that we can talk with it, and that it currently has no connectors. Let’s remedy that by starting a connector that will capture changes from our MySQL database.
Monitor the MySQL database
At this point we are running the Debezium services, a MySQL database server with a sample inventory
database, and the MySQL command line client that is connected to our database. The next step is to register a connector that will begin monitoring the MySQL database server’s binlog and generate change events for each row that has been (or will be) changed. Since this is a new connector, when it starts it will start reading from the beginning of the MySQL binlog, which records all of the transactions, including individual row changes and changes to the schemas.
Normally we’d likely want to use the Kafka tools to manually create the necessary topics, including specifying the number of replicas. However, for this tutorial, Kafka is configured to automatically create the topics with just 1 replica. |
Using the same terminal, we’ll use curl
to submit to our Kafka Connect service a JSON request message with information about the connector we want to start. Since this command will not be in a Docker container, we need to use the IP address of our Docker host (so Docker Toolbox users on Windows and OS X should replace localhost
with their IP address):
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
Windows users may need to escape the double-quotes, like so:
to avoid this error:
|
This command uses the Kafka Connect service’s RESTful API to submit a POST
request against /connectors
resource with a JSON document that describes our new connector. Here’s the same JSON message in a more readable format:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
The JSON message specifies the connector name as inventory-connector
, and provides the detailed configuration properties for our MySQL connector:
-
Exactly one task should operate at any one time. Since the MySQL connect reads the MySQL server’s binlog, and using a single connector task is the only way to ensure the proper order and that all events are handled properly.
-
The database host is specified as
mysql
, which is the name of our Docker container running the MySQL server. Recall that Docker manipulates the network stack within our containers so that each linked container can be resolved via the/etc/hosts
using the container name for the hostname. If MySQL were running on a normal network, we’d simply specify the IP address or resolvable hostname for this value. -
The MySQL server’s port is specified.
-
The MySQL database we’re running has a
debezium
user set up expressly for our purposes, so we specify that username and password here. -
A unique server ID and name are given. The server name is the logical identifier for the MySQL server or cluster of servers, and will be used as the prefix for all Kafka topics.
-
We only want to detect changes in the
inventory
database, so we use a whitelist. -
The connector should store the history of the database schemas in Kafka using the named broker (the same broker to which we’re sending events) and topic name. Upon restart, the connector will recover the schemas of the database(s) that existed at the point in time in the binlog when the connector should begin reading.
This command should produce a response similar to the following (perhaps a bit more compact):
HTTP/1.1 201 Created
Date: Tue, 07 Feb 2017 20:49:34 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 471
Server: Jetty(9.2.15.v20160210)
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"name": "inventory-connector"
},
"tasks": []
}
This response describes the connector resource /connectors/inventory-connector
that the service just created and includes the connector’s configuration and information about the tasks. Since the connector was just created, the service hasn’t yet finished starting tasks.
We can even use the RESTful API to verify that our connector is included in the list of connectors:
$ curl -H "Accept:application/json" localhost:8083/connectors/
which should return the following:
["inventory-connector"]
Recall that the Kafka Connect service uses connectors to start one or more tasks that do the work, and that it will automatically distribute the running tasks across the cluster of Kafka Connect services. Should any of the services stop or crash, those tasks will be redistributed to running services. We can see the tasks when we get the state of the connector:
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
which returns:
HTTP/1.1 200 OK
Date: Mon, 27 Mar 2017 17:09:28 GMT
Content-Type: application/json
Content-Length: 515
Server: Jetty(9.2.15.v20160210)
{
"name": "inventory-connector",
"config": {
"name": "inventory-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
},
"tasks": [
{
"connector": "inventory-connector",
"task": 0
}
]
}
Here, we can see that the connector is running a single task (e.g., task 0) to do its work. The MySQL connector only supports a single task, since MySQL records all of its activities in one sequential binlog and so the MySQL connector needs only one reader to get a consistent and totally ordered view of all of those events.
If we look at the output of our connect
container, we see that the connector has generated a lot of output. The first few lines related to our connector are output by Kafka Connect, and start with:
... 2017-09-21 07:23:59,051 INFO || Connector inventory-connector config updated [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2017-09-21 07:23:59,550 INFO || Rebalance started [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2017-09-21 07:23:59,550 INFO || Finished stopping tasks in preparation for rebalance [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2017-09-21 07:23:59,550 INFO || (Re-)joining group 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:23:59,556 INFO || Successfully joined group 1 with generation 2 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:23:59,556 INFO || Joined group and got assignment: Assignment{error=0, leader='connect-1-4d60cb71-cb93-4388-8908-6f0d299a9d94', leaderUrl='http://172.17.0.7:9092/', offset=1, connectorIds=[inventory-connector], taskIds=[]} [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2017-09-21 07:23:59,557 INFO || Starting connectors and tasks using config offset 1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2017-09-21 07:23:59,557 INFO || Starting connector inventory-connector [org.apache.kafka.connect.runtime.distributed.DistributedHerder] ...
followed by a lot of output from Kafka Connect about starting this connector and the various producer and consumer configurations. Eventually, we see output like the following from our MySQL connector:
... 2017-09-21 07:24:01,151 INFO MySQL|dbserver1|task Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser] 2017-09-21 07:24:01,151 INFO MySQL|dbserver1|task Kafka commitId : cb8625948210849f [org.apache.kafka.common.utils.AppInfoParser] 2017-09-21 07:24:01,584 INFO MySQL|dbserver1|task Found no existing offset, so preparing to perform a snapshot [io.debezium.connector.mysql.MySqlConnectorTask] 2017-09-21 07:24:01,614 INFO || Source task WorkerSourceTask{id=inventory-connector-0} finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask] 2017-09-21 07:24:01,615 INFO MySQL|dbserver1|snapshot Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull with user 'debezium' [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,617 INFO MySQL|dbserver1|snapshot Snapshot is using user 'debezium' with these MySQL grants: [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,618 INFO MySQL|dbserver1|snapshot GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%' [io.debezium.connector.mysql.SnapshotReader] ...
First, Debezium log output makes use of mapped diagnostic contexts, or MDC, which allow the log messages to include thread-specific information like the connector type (e.g., MySQL
in the above log messages after "INFO" or "WARN" fields), the logical name of the connector (e.g., dbserver1
above), and the connector’s activity (e.g., task
, snapshot
and binlog
). Hopefully these will make it easier to understand what is going on in the multi-threaded Kafka Connect service.
The first few lines involve the task
activity of the connector, and basically report some bookkeeping information such that the connector was started with no prior offset. The new three lines involve the snapshot
activity of the connector, specifically that a snapshot is being started using the debezium
MySQL user and the MySQL grants associated with that user.
If the connector is not able to connect or does not see any tables or the binlog, check these grants to ensure that all of those listed above are included. |
The next messages output by the connector are the following:
... 2017-09-21 07:24:01,618 INFO MySQL|dbserver1|snapshot MySQL server variables related to change data capture: [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_cache_size = 32768 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_checksum = CRC32 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_direct_non_transactional_updates = OFF [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_error_action = ABORT_SERVER [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_format = ROW [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_group_commit_sync_delay = 0 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_group_commit_sync_no_delay_count = 0 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_gtid_simple_recovery = ON [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_max_flush_queue_time = 0 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_order_commits = ON [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_row_image = FULL [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_rows_query_log_events = OFF [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_stmt_cache_size = 32768 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_client = utf8 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_connection = utf8 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_database = latin1 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_filesystem = binary [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_results = utf8 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_server = latin1 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_system = utf8 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_sets_dir = /usr/share/mysql/charsets/ [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot collation_connection = utf8_general_ci [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot collation_database = latin1_swedish_ci [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot collation_server = latin1_swedish_ci [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot enforce_gtid_consistency = OFF [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot gtid_executed_compression_period = 1000 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_mode = OFF [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_next = AUTOMATIC [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_owned = [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_purged = [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot innodb_api_enable_binlog = OFF [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot innodb_locks_unsafe_for_binlog = OFF [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot innodb_version = 5.7.19 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot log_statements_unsafe_for_binlog = ON [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot max_binlog_cache_size = 18446744073709547520 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot max_binlog_size = 1073741824 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot max_binlog_stmt_cache_size = 18446744073709547520 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot protocol_version = 10 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot session_track_gtids = OFF [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot slave_type_conversions = [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot sync_binlog = 1 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot system_time_zone = UTC [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot time_zone = SYSTEM [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot tls_version = TLSv1,TLSv1.1 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot tx_isolation = REPEATABLE-READ [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot tx_read_only = OFF [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot version = 5.7.19-log [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot version_comment = MySQL Community Server (GPL) [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot version_compile_machine = x86_64 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,628 INFO MySQL|dbserver1|snapshot version_compile_os = Linux [io.debezium.connector.mysql.SnapshotReader] ...
This reports the relevant MySQL server settings found by our MySQL connector. One of the most important is binlog_format
, which is set to ROW
. These lines are followed by the output of the 9 steps that make up the snapshot operation:
... 2017-09-21 07:24:01,628 INFO MySQL|dbserver1|snapshot Step 0: disabling autocommit and enabling repeatable read transactions [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,631 INFO MySQL|dbserver1|snapshot Step 1: start transaction with consistent snapshot [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,634 INFO MySQL|dbserver1|snapshot Step 2: flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,636 INFO MySQL|dbserver1|snapshot Step 3: read binlog position of MySQL master [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,638 INFO MySQL|dbserver1|snapshot using binlog 'mysql-bin.000003' at position '154' and gtid '' [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,638 INFO MySQL|dbserver1|snapshot Step 4: read list of available databases [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,638 INFO MySQL|dbserver1|snapshot list of available databases is: [information_schema, inventory, mysql, performance_schema, sys] [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,639 INFO MySQL|dbserver1|snapshot Step 5: read list of available tables in each database [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including 'inventory.customers' [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including 'inventory.orders' [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including 'inventory.products' [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including 'inventory.products_on_hand' [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,642 INFO MySQL|dbserver1|snapshot 'mysql.columns_priv' is filtered out, discarding [io.debezium.connector.mysql.SnapshotReader] ... 2017-09-21 07:24:01,670 INFO MySQL|dbserver1|snapshot snapshot continuing with database(s): [inventory] [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,670 INFO MySQL|dbserver1|snapshot Step 6: generating DROP and CREATE statements to reflect current database schemas: [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,679 INFO MySQL|dbserver1|snapshot SET character_set_server=latin1, collation_server=latin1_swedish_ci; [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,724 WARN MySQL|dbserver1|task Error while fetching metadata with correlation id 1 : {dbhistory.inventory=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient] 2017-09-21 07:24:01,853 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS `inventory`.`products_on_hand` [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,861 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS `inventory`.`customers` [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,864 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS `inventory`.`orders` [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,866 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS `inventory`.`products` [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,881 INFO MySQL|dbserver1|snapshot DROP DATABASE IF EXISTS `inventory` [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,889 INFO MySQL|dbserver1|snapshot CREATE DATABASE `inventory` [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,893 INFO MySQL|dbserver1|snapshot USE `inventory` [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,914 INFO MySQL|dbserver1|snapshot CREATE TABLE `customers` ( `id` int(11) NOT NULL AUTO_INCREMENT, `first_name` varchar(255) NOT NULL, `last_name` varchar(255) NOT NULL, `email` varchar(255) NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `email` (`email`) ) ENGINE=InnoDB AUTO_INCREMENT=1005 DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,932 INFO MySQL|dbserver1|snapshot CREATE TABLE `orders` ( `order_number` int(11) NOT NULL AUTO_INCREMENT, `order_date` date NOT NULL, `purchaser` int(11) NOT NULL, `quantity` int(11) NOT NULL, `product_id` int(11) NOT NULL, PRIMARY KEY (`order_number`), KEY `order_customer` (`purchaser`), KEY `ordered_product` (`product_id`), CONSTRAINT `orders_ibfk_1` FOREIGN KEY (`purchaser`) REFERENCES `customers` (`id`), CONSTRAINT `orders_ibfk_2` FOREIGN KEY (`product_id`) REFERENCES `products` (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=10005 DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,937 INFO MySQL|dbserver1|snapshot CREATE TABLE `products` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL, `description` varchar(512) DEFAULT NULL, `weight` float DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=110 DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,941 INFO MySQL|dbserver1|snapshot CREATE TABLE `products_on_hand` ( `product_id` int(11) NOT NULL, `quantity` int(11) NOT NULL, PRIMARY KEY (`product_id`), CONSTRAINT `products_on_hand_ibfk_1` FOREIGN KEY (`product_id`) REFERENCES `products` (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,947 INFO MySQL|dbserver1|snapshot Step 7: releasing global read lock to enable MySQL writes [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,949 INFO MySQL|dbserver1|snapshot Step 7: blocked writes to MySQL for a total of 00:00:00.312 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,950 INFO MySQL|dbserver1|snapshot Step 8: scanning contents of 4 tables while still in transaction [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,953 INFO MySQL|dbserver1|snapshot Step 8: - scanning table 'inventory.customers' (1 of 4 tables) [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,958 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 4 rows from table 'inventory.customers' after 00:00:00.005 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:01,959 INFO MySQL|dbserver1|snapshot Step 8: - scanning table 'inventory.orders' (2 of 4 tables) [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:02,014 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 4 rows from table 'inventory.orders' after 00:00:00.055 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:02,016 INFO MySQL|dbserver1|snapshot Step 8: - scanning table 'inventory.products' (3 of 4 tables) [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:02,017 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 9 rows from table 'inventory.products' after 00:00:00.001 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:02,018 INFO MySQL|dbserver1|snapshot Step 8: - scanning table 'inventory.products_on_hand' (4 of 4 tables) [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:02,019 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 9 rows from table 'inventory.products_on_hand' after 00:00:00.001 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:02,020 INFO MySQL|dbserver1|snapshot Step 8: scanned 26 rows in 4 tables in 00:00:00.069 [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:02,020 INFO MySQL|dbserver1|snapshot Step 9: committing transaction [io.debezium.connector.mysql.SnapshotReader] 2017-09-21 07:24:02,021 INFO MySQL|dbserver1|snapshot Completed snapshot in 00:00:00.405 [io.debezium.connector.mysql.SnapshotReader] ...
Each of these steps reports what the connector is doing to perform the consistent snapshot. For example, Step 6 involves reverse engineering the DDL create statements for the tables that are being captured; Step 7 releases the global write lock just 0.3 seconds after acquiring it, and Step 8 reads all of the rows in each of the tables and reports the time taken and number of rows found. Note that in our example database, the MySQL connector completed its consistent snapshot in just 0.38 seconds.
This process will take longer with your databases, but the connector outputs enough log messages so that you can track what it is working on, even when the tables have very large numbers of rows. And although an exclusive write lock is used at the beginning of the snapshot process, this should be short even for large databases; this lock is released before any data is copied. See the MySQL connector documentation for more details. |
The new five lines from Kafka Connect sound ominous, but basically tell us that new Kafka topics were created and Kafka had to assign a new leader for each:
... 2017-09-21 07:24:02,632 WARN || Error while fetching metadata with correlation id 1 : {dbserver1=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient] 2017-09-21 07:24:02,775 WARN || Error while fetching metadata with correlation id 5 : {dbserver1.inventory.customers=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient] 2017-09-21 07:24:02,910 WARN || Error while fetching metadata with correlation id 9 : {dbserver1.inventory.orders=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient] 2017-09-21 07:24:03,045 WARN || Error while fetching metadata with correlation id 13 : {dbserver1.inventory.products=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient] 2017-09-21 07:24:03,179 WARN || Error while fetching metadata with correlation id 17 : {dbserver1.inventory.products_on_hand=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient] ...
Finally, we see a line reporting that the connector has transitioned from its snapshot mode into continuously reading the MySQL server’s binlog:
... Sep 21, 2017 7:24:03 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:7) 2017-09-21 07:24:03,373 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows [io.debezium.connector.mysql.BinlogReader] 2017-09-21 07:25:01,096 INFO || Finished WorkerSourceTask{id=inventory-connector-0} commitOffsets successfully in 18 ms [org.apache.kafka.connect.runtime.WorkerSourceTask] ...
Viewing the change events
We saw in the connector’s output that events were written to five topics:
-
dbserver1
-
dbserver1.inventory.products
-
dbserver1.inventory.products_on_hand
-
dbserver1.inventory.customers
-
dbserver1.inventory.orders
As described in the MySQL connector documentation, each topic names start with dbserver1
, which is the logical name we gave our connector. The first is our schema change topic to which all of the DDL statements are written. The remaining four topics are used to capture the change events for each of our four tables, and their topic names include the database name (e.g., inventory
) and the table name.
Let’s look at all of the data change events in the dbserver1.inventory.customers
topic. We’ll use the debezium/kafka
Docker image to start a new container that runs one of Kafka’s utilities to watch the topic from the beginning of the topic:
$ docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.10 watch-topic -a -k dbserver1.inventory.customers
Again, we use the --rm
flag since we want the container to be removed when it stops, and we use the -a
flag on watch-topic
to signal that we want to see all events since the beginning of the topic. (If we were to remove the -a
flag, we’d see only the events that are recorded in the topic after we start watching.) The -k
flag specifies that the output should include the event’s key, which in our case contains the row’s primary key. Here’s the output:
Using ZOOKEEPER_CONNECT=172.17.0.3:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.8:9092
Contents of topic dbserver1.inventory.customers:
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.10.0.Final","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"0.10.0.Final","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"0.10.0.Final","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.10.0.Final","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
This utility keeps watching, so any new events would automatically appear as long as the utility keeps running. And this |
These events happen to be encoded in JSON, since that’s how we configured our Kafka Connect service. Each event includes one JSON document for the key, and one for the value. Let’s look at the last event in more detail, by first reformatting the event’s key to be easier to read:
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
The event’s key has two parts: a schema
and payload
. The schema
contains a Kafka Connect schema describing what is in the payload, and in our case that means that the payload
is a struct named dbserver1.inventory.customers.Key
that is not optional and has one required field named id
of type int32
.
If we look at the value of the key’s payload
field, we’ll see that it is indeed a structure (which in JSON is just an object) with a single id
field, whose value is 1004
.
Therefore, we interpret this event as applying to the row in the inventory.customers
table (output from the connector named dbserver1
) whose id
primary key column had a value of 1004
.
Now let’s look at the same event’s value, which again we reformat to be easier to read:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "0.10.0.Final",
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"snapshot": true,
"thread": null,
"db": "inventory",
"table": "customers"
},
"op": "c",
"ts_ms": 1486500577691
}
}
This portion of the event is much larger, but like the event’s key this, too, has a schema
and a payload
. The schema
contains a Kafka Connect schema named dbserver1.inventory.customers.Envelope
(version 1) that can contain 5 fields:
-
op
is a mandatory field that contains a string value describing the type of operation. Values for the MySQL connector arec
for create (or insert),u
for update,d
for delete, andr
for read (in the case of a non-initial snapshot). -
before
is an optional field that if present contains the state of the row before the event occurred. The structure will be described by thedbserver1.inventory.customers.Value
Kafka Connect schema, which thedbserver1
connector uses for all rows in theinventory.customers
table. -
after
is an optional field that if present contains the state of the row after the event occurred. The structure is described by the samedbserver1.inventory.customers.Value
Kafka Connect schema used inbefore
. -
source
is a mandatory field that contains a structure describing the source metadata for the event, which in the case of MySQL contains several fields: the connector name, the name of the binlog file where the event was recorded, the position in that binlog file where the event appeared, the row within the event (if there is more than one), the names of the affected database and table, the MySQL thread ID that made the change, whether this event was part of a snapshot, and if available the MySQL server ID, and the timestamp in seconds. -
ts_ms
is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
If we look at the payload
of the event’s value, we can see the information in the event, namely that it is describing that the row was created, contains the id
, first_name
, last_name
, and email
of the inserted row.
You may have noticed that the JSON representations of the events are much larger than the rows they describe. This is because Kafka Connect ships with every event key and value the schema that describes the payload. Over time, this structure may change, and having the schemas for the key and value in the event itself makes it much easier for consuming applications to understand the messages, especially as they evolve over time. The Debezium MySQL connector constructs these schemas based upon the structure of the database tables. If you use DDL statements to alter the table definitions in the MySQL databases, the connector reads these DDL statements and updates its Kafka Connect schemas. This is the only way that each event is structured exactly like the table from where it originated at the time the event occurred. But the Kafka topic containing all of the events for a single table might have events that correspond to each state of the table definition. The JSON converter does produce very verbose events since it includes the key and value schemas in every message. The Avro converter, on the other hand, is far smarter and results in far smaller event messages. The Avro converter transforms each Kafka Connect schema into an Avro schema and stores the Avro schemas in a separate Schema Registry service. Thus when the Avro converter serializes an event message, it places only an unique identifier for the schema along with an Avro-encoded binary representation of the value. Thus, the serialized messages transferred over the wire and stored in Kafka are far smaller than they appear above. In fact, the Avro Converter is able to use Avro schema evolution techniques to maintain the history of each schema in the Schema Registry. |
We can compare these to the state of the database. Go back to the terminal that is running the MySQL command line client, and run the following statement:
mysql> SELECT * FROM customers;
which produces the following output:
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
As we can see, all of our event records match the database.
Now that we’re monitoring changes, what happens when we change one of the records in the database? Run the following statement in the MySQL command line client:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
which produces the following output:
Query OK, 1 row affected (0.05 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Rerun the select …
statement to see the updated table:
mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne Marie | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
Now, go back to the terminal running watch-topic
and we should see a new fifth event:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.10.0.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635059,"gtid":null,"file":"mysql-bin.000003","pos":364,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"u","ts_ms":1490635059389}}
Let’s reformat the new event’s key to be easier to read:
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
This key is exactly the same key as what we saw in the fourth record. Here’s that new event’s value formatted to be easier to read:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"name": "0.10.0.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501486,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 364,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
},
"op": "u",
"ts_ms": 1486501486308
}
}
When we compare this to the value in the fourth event, we see no changes in the schema
section and a couple of changes in the payload
section:
-
The
op
field value is nowu
, signifying that this row changed because of an update -
The
before
field now has the state of the row with the values before the database commit -
The
after
field now has the updated state of the row, and here was can see that thefirst_name
value is nowAnne Marie
. -
The
source
field structure has many of the same values as before, except thets_sec
andpos
fields have changed (and thefile
might have changed in other circumstances). -
The
ts_ms
shows the timestamp that Debezium processed this event.
There are several things we can learn by just looking at this payload
section. We can compare the before
and after
structures to determine what actually changed in this row because of the commit. The source
structure tells us information about MySQL’s record of this change (providing traceability), but more importantly this has information we can compare to other events in this and other topics to know whether this event occurred before, after, or as part of the same MySQL commit as other events.
So far we’ve seen samples of create and update events. Now, let’s look at delete events. Since Anne Marie has not placed any orders, we can remove her record from our database using the MySQL command line client:
mysql> DELETE FROM customers WHERE id=1004;
If the above command fails with a foreign key constraint violation, then you will have to remove the reference of the customer address from the addresses table using the following statement:
|
In our terminal running watch-topic
, we see two new events:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":null,"source":{"version":"0.10.0.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635100,"gtid":null,"file":"mysql-bin.000003","pos":725,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"d","ts_ms":1490635100301}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}} {"schema":null,"payload":null}
What happened? We only deleted one row, but we now have two events. To understand what the MySQL connector does, let’s look at the first of our two new messages. Here’s the key reformatted to be easier to read:
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
Once again, this key is exactly the same key as in the previous two events we looked at. Here’s the value of the first new event, formatted to be easier to read:
{
"schema": {...},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null,
"source": {
"name": "0.10.0.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501558,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 725,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
},
"op": "d",
"ts_ms": 1486501558315
}
Again, the schema
is identical to the previous messages, but the payload
fragment has a few things of note:
-
The
op
field value is nowd
, signifying that this row was deleted -
The
before
field now has the state of the row that was deleted with the database commit -
The
after
field is null, signifying that the row no longer exists -
The
source
field structure has many of the same values as before, except thets_sec
andpos
fields have changed (and thefile
might have changed in other circumstances). -
The
ts_ms
shows the timestamp that Debezium processed this event.
This event gives a consumer all kinds of information that it can use to process the removal of this row. We include the old values because some consumers might require them in order to properly handle the removal, and without it they may have to resort to far more complex behavior.
Remember that we saw two events when we deleted the row? Let’s look at that second event. Here’s the key for the event:
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
Once again, this key is exactly the same key as in the previous three events we looked at. Here’s the value of that same event:
{
"schema": null,
"payload": null
}
What gives? Well, all of the Kafka topics that the MySQL connector writes to can be set up to be log compacted, which means that Kafka can remove older messages from the topic as long as there is at least one message later in the topic with the exact same key. This is Kafka’s way to collect the garbage. This last event is what Debezium calls a tombstone event, and because it has a key and an empty value Kafka understands it can remove all prior messages with this same key.
Kafka log compaction is great, because it still allows consumers to read the topic from the very beginning and not miss any events.
Restart the Kafka Connect service
One feature of the Kafka Connect service is that it automatically manages tasks for the registered connectors. And, because it stores its data in Kafka, if a running service stops or goes away completely, upon restart (perhaps on another host) the server will start any non-running tasks. To demostrate this, let’s stop our Kafka Connect service, change some data in the database, and restart our service.
In a new terminal, use the following Docker commands to stop the connect
container that is running our Kafka Connect service:
$ docker stop connect
Stopping the container like this stops the process running inside of it, but the Kafka Connect service handles this by gracefully shutting down. And because we ran the container with the --rm
flag, Docker removed the container after it stopped it.
While the service is down, let’s go back to the MySQL command line client and add a few records:
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");
Notice that in the terminal where we’re running watch-topic
, there’s been no update. Also, we’re still able to watch the topic because Kafka is still running.
In a production system, you would have enough brokers to handle the producers and consumers, and to maintain a minimum number of in sync replicas for each topic. So if enough brokers fail such that there are not the minimum number of ISRs, Kafka should become unavailable. Producers, like the Debezium connectors, and consumers will simply wait patiently for the Kafka cluster or network to recover. Yes, that means that your consumers might temporarily see no change events as data is changed in the databases, but that’s because none are being produced. As soon as the Kafka cluster is restarted or the network recovers, Debezium will continue producing change events and your consumers will continue consuming events where they left off. |
Now, in a new terminal, start a new container using the same command we used before:
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.10
This creates a whole new container that runs the Kafka Connect distributed service, and since we’ve intialized it with the same topic information, the new service connects to Kafka, read the previous service’s configuration, and starts the registered connectors that will continue exactly where they last left off.
Here’s the last few lines from this restarted service:
... 2017-09-21 07:38:48,385 INFO MySQL|dbserver1|task Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser] 2017-09-21 07:38:48,386 INFO MySQL|dbserver1|task Kafka commitId : cb8625948210849f [org.apache.kafka.common.utils.AppInfoParser] 2017-09-21 07:38:48,390 INFO MySQL|dbserver1|task Discovered coordinator 172.17.0.4:9092 (id: 2147483646 rack: null) for group inventory-connector-dbhistory. [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:38:48,390 INFO MySQL|dbserver1|task Revoking previously assigned partitions [] for group inventory-connector-dbhistory [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 2017-09-21 07:38:48,391 INFO MySQL|dbserver1|task (Re-)joining group inventory-connector-dbhistory [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:38:48,402 INFO MySQL|dbserver1|task Successfully joined group inventory-connector-dbhistory with generation 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:38:48,403 INFO MySQL|dbserver1|task Setting newly assigned partitions [dbhistory.inventory-0] for group inventory-connector-dbhistory [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 2017-09-21 07:38:48,888 INFO MySQL|dbserver1|task Step 0: Get all known binlogs from MySQL [io.debezium.connector.mysql.MySqlConnectorTask] 2017-09-21 07:38:48,903 INFO MySQL|dbserver1|task MySQL has the binlog file 'mysql-bin.000003' required by the connector [io.debezium.connector.mysql.MySqlConnectorTask] Sep 21, 2017 7:38:49 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:10) 2017-09-21 07:38:49,045 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows [io.debezium.connector.mysql.BinlogReader] 2017-09-21 07:38:49,046 INFO || Source task WorkerSourceTask{id=inventory-connector-0} finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
As you can see, these lines show that the service finds the offsets previously recorded by the last task before it was shut down, and that it then connects to the MySQL database, starts reading the binlog from that position, and generates events from any changes in the MySQL database since that point in time.
Jump back to the terminal running watch-topic
, and you should now see events for our two new records:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"version":"0.10.0.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635153,"gtid":null,"file":"mysql-bin.000003","pos":1046,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181455}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1006}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"version":"0.10.0.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635160,"gtid":null,"file":"mysql-bin.000003","pos":1356,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181456}}
These events are create events that are similar to what we saw before. The important point to understand, though, is that Debezium will still report all of the changes in a database even when it is not running, as long as it is restarted before the MySQL database starts purging those commits we missed from its binlog.
Exploration
Go ahead and use the MySQL command line client to add, modify, and remove rows to the database tables, and see the effect on the topics. You may need to run a separate watch-topic
command for each topic. And remember that you can’t remove a row that is referenced by a foreign key. Have fun!
Clean up
You can use Docker to stop all of the running containers:
$ docker stop mysqlterm watcher connect mysql kafka zookeeper
Again, since we used the --rm
flag when starting the connectors, Docker should remove them right after it stops them. We can verify that all of the other processes are stopped and removed:
$ docker ps -a
Of course, if any are still running, simply stop them using docker stop <name>
or docker stop <containerId>
.
Docker Compose setup
If you have already completed the tutorial and you would like to go again through the setup quickly, then you can use a Docker Compose version of this tutorial located in our examples repository. We provide Docker Compose files for running the tutorial with MySQL, Postgres, MongoDB, SQL Server and Oracle. Please follow the steps described in the readme file.