Update (Oct. 11 2019): An alternative, and much simpler, approach for running Debezium (and Apache Kafka and Kafka Connect in general) on Kubernetes is to use a K8s operator such as Strimzi. You can find instructions for the set-up of Debezium on OpenShift here, and similar steps apply for plain Kubernetes.
Our Debezium Tutorial walks you step by step through using Debezium by installing, starting, and linking together all of the Docker containers running on a single host machine. Of course, you can use things like Docker Compose or your own scripts to make this easier, although that would just automating running all the containers on a single machine. What you really want is to run the containers on a cluster of machines. In this blog, we’ll run Debezium using a container cluster manager from Red Hat and Google called Kubernetes.
Kubernetes is a container (Docker/Rocket/Hyper.sh) cluster management tool. Like many other popular cluster management and compute resource scheduling platforms, Kubernetes' roots are in Google, who is no stranger to running containers at scale. They start, stop, and cluster 2 billion containers per week and they contributed a lot of the Linux kernel underpinnings that make containers possible. One of their famous papers talks about an internal cluster manager named Borg. With Kubernetes, Google got tired of everyone implementing their papers in Java so they decided to implement this one themselves :)
Kubernetes is written in Go-lang and is quickly becoming the de-facto API for scheduling, managing, and clustering containers at scale. This blog isn’t intended to be a primer on Kubernetes, so we recommend heading over to the Getting Started docs to learn more about Kubernetes.
Getting started
To get started, we need to have access to a Kubernetes cluster. Getting one started is pretty easy: just follow the getting started guides. A favorite of ours is OpenShift’s all in one VM or the Red Hat Container Development Kit which provide a hardened, production-ready distribution of Kubernetes. Once you’ve installed it and logged in, you should be able to run kubectl get pod
to get a list of Kubernetes pods you may have running. You don’t need anything running else inside Kubernetes to get started.
To get and build the Kubernetes manifest files (yaml descriptors), go clone the Debezium Kubernetes repo and run the following command:
$ mvn clean
$ mvn install
This project uses the awesome Fabric8 Maven plugin to automatically generate the Kubernetes manifest files. Here’s an example of what gets generated in $PROJECT_ROOT/zk-standalone/target/classes/kubernetes.yml
apiVersion: "v1" items: - apiVersion: "v1" kind: "Service" metadata: annotations: {} labels: project: "zookeeper" provider: "debezium" version: "0.1-SNAPSHOT" group: "io.debezium" name: "zookeeper" spec: deprecatedPublicIPs: [] externalIPs: [] ports: - port: 2181 protocol: "TCP" targetPort: 2181 selector: project: "zookeeper" provider: "debezium" group: "io.debezium" - apiVersion: "v1" kind: "ReplicationController" metadata: annotations: fabric8.io/git-branch: "master" fabric8.io/git-commit: "004e222462749fbaf12c3ee33edca9b077ee9003" labels: project: "zookeeper" provider: "debezium" version: "0.1-SNAPSHOT" group: "io.debezium" name: "zk-standalone" spec: replicas: 1 selector: project: "zookeeper" provider: "debezium" version: "0.1-SNAPSHOT" group: "io.debezium" template: metadata: annotations: {} labels: project: "zookeeper" provider: "debezium" version: "0.1-SNAPSHOT" group: "io.debezium" spec: containers: - args: [] command: [] env: - name: "KUBERNETES_NAMESPACE" valueFrom: fieldRef: fieldPath: "metadata.namespace" image: "docker.io/debezium/zookeeper:0.1" imagePullPolicy: "IfNotPresent" name: "zk-standalone" ports: - containerPort: 3888 name: "election" - containerPort: 2888 name: "peer" - containerPort: 2181 name: "client" securityContext: {} volumeMounts: [] imagePullSecrets: [] nodeSelector: {} volumes: [] kind: "List"
Starting Zookeeper and Kafka on Kubernetes
To start Apache Zookeeper or Apache Kafka inside Kubernetes you have two options. If you have the kubectl
command line (or the oc
tool from the OpenShift client distros) on your machine you can apply any of the newly generated Kubernetes manifest files like this:
$ kubectl create -f <path_to_file>
Or you can use the Fabric8 Maven plugin and its fabric8:apply
goal to apply the manifest files. Note for either of these two options to work, you must be currently logged into your Kubernetes cluster. (Also, OpenShift’s oc login <url>
makes this super easy, or see Logging into a Kubernetes Cluster with kubectl for more information.)
First, let’s deploy Zookeeper to our Kubernetes cluster. We need to be in $PROJECT_ROOT/zk-standalone
directory, and then we’ll apply our Kubernetes configuration. First, let’s see how to do this with the kubectl
command:
$ cd zk-standalone
$ kubectl create -f target/classes/kubernetes.yml
service "zookeeper" created
replicationcontroller "zk-standalone" created
You can do the same thing with Maven and the fabric8 maven plugin:
$ cd zk-standalone
$ mvn fabric8:apply
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building zk-standalone 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ zk-standalone ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/zk-standalone/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name zookeeper
[INFO] Created Service: zk-standalone/target/fabric8/applyJson/ticket/service-zookeeper.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name zk-standalone
[INFO] Created ReplicationController: zk-standalone/target/fabric8/applyJson/ticket/replicationcontroller-zk-standalone.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.661 s
[INFO] Finished at: 2016-05-19T15:59:26-07:00
[INFO] Final Memory: 26M/260M
[INFO] ------------------------------------------------------------------------
Zookeeper is deployed, so let’s continue with deploying Kafka. Navigate to $PROJECT_ROOT/kafka
, and then apply the Kafka deployment configuration:
$ cd ../kafka
$ kubectl create -f target/classes/kubernetes.yml
service "kafka" created
replicationcontroller "kafka" created
Or with fabric8 maven plugin:
$ cd ../kafka
$ mvn fabric8:apply
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building kafka 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ kafka ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/kafka/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name kafka
[INFO] Created Service: kafka/target/fabric8/applyJson/ticket/service-kafka.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name kafka
[INFO] Created ReplicationController: kafka/target/fabric8/applyJson/ticket/replicationcontroller-kafka.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.563 s
[INFO] Finished at: 2016-05-19T16:03:25-07:00
[INFO] Final Memory: 26M/259M
[INFO] ------------------------------------------------------------------------
Use the kubectl get pod
command to see what is running:
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
kafka-mqmxt 1/1 Running 0 46s
zk-standalone-4mo02 1/1 Running 0 4m
Did you notice that we didn’t manually "link" the containers as we started them? Kubernetes has a cluster service discovery feature called Kubernetes Services that load-balances against and lets us use internal DNS (or cluster IPs) to discover pods. For example, in the kubernetes.yml
deployment configuration for Kafka, you’ll see the following:
... containers: - args: [] command: [] env: - name: "KAFKA_ADVERTISED_PORT" value: "9092" - name: "KAFKA_ADVERTISED_HOST_NAME" value: "kafka" - name: "KAFKA_ZOOKEEPER_CONNECT" value: "zookeeper:2181" - name: "KAFKA_PORT" value: "9092" - name: "KUBERNETES_NAMESPACE" valueFrom: fieldRef: fieldPath: "metadata.namespace" image: "docker.io/debezium/kafka:0.1" imagePullPolicy: "IfNotPresent" name: "kafka" ...
We’re specifying values for the KAFKA_ZOOKEEPER_CONNECT
environment variable used by the Docker image, and thus enabling Kafka to discover Zookeeper pods wherever they are running. Although we could have used any hostname, to keep things simple we use just zookeeper
for the DNS name. So, if you were to log in to one of the pods and try to reach the host named zookeeper
, Kubernetes would transparently resolve that request to one of the Zookeeper pods (if there are multiple). Slick! This discovery mechanism is used for the rest of the components, too. (Note, this cluster IP that the DNS resolves to never changes for the life of the Kubernetes Service regardless of how many Pods exist for a given service. This means you can rely on this service discovery without all of the DNS caching issues you may otherwise run into.)
The next step is to create a schema-changes
topic that Debezium’s MySQL connector will use. Let’s use the Kafka tools to create this:
$ KAFKA_POD_NAME=$(kubectl get pod | grep -i running | grep kafka | awk '{ print $1 }')
$ kubectl exec $KAFKA_POD_NAME -- /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic schema-changes.inventory
Start up a MySQL Database on Kubernetes
Starting the MySQL database follows the same instructions as installing Zookeeper or Kafka. We will navigate to the $PROJECT_ROOT/mysql56
directory, and we’ll use the MySQL 5.6 OpenShift Docker image so that it runs on both vanilla Kubernetes and OpenShift v3.x. Here’s the kubectl
command to start up our MySQL instance:
$ cd ../mysql56
$ kubectl create -f target/classes/kubernetes.yml
service "mysql" created
replicationcontroller "mysql56" created
Or the equivalent Maven command:
$ cd mysql56
$ mvn fabric8:apply
Now, when we run kubectl get pod
we should see our MySQL database running, too:
NAME READY STATUS RESTARTS AGE kafka-mqmxt 1/1 Running 0 17m mysql56-b4f36 1/1 Running 0 9m zk-standalone-4mo02 1/1 Running 0 21m
Let’s run a command to get client access to the database. First, set a few environment variables to the pod’s name and IP address:
$ MYSQL_POD_NAME=$(kubectl get pod | grep Running | grep ^mysql | awk '{ print $1 }')
$ MYSQL_POD_IP=$(kubectl describe pod $MYSQL_POD_NAME | grep IP | awk '{ print $2 }')
Then, log in to the Kubernetes pod that’s running the MySQL database, and start the MySQL command client:
$ kubectl exec -it $MYSQL_POD_NAME -- /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin
Warning: Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 1
Server version: 5.6.26-log MySQL Community Server (GPL)
Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
This shows that the kubectl
command line lets us easily get access to a pod or Docker container regardless of where it’s running in the cluster.
Next, exit out of the mysql shell (type exit
) and run the following command to download a SQL script that populates an inventory
sample database:
$ kubectl exec -it $MYSQL_POD_NAME -- bash -c "curl -s -L https://gist.github.com/christian-posta/e20ddb5c945845b4b9f6eba94a98af09/raw | /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin"
Now, if we log back into the MySQL pod we can show the databases and tables:
$ kubectl exec -it $MYSQL_POD_NAME -- /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin -e 'use inventory; show tables;'
+---------------------+
| Tables_in_inventory |
+---------------------+
| customers |
| orders |
| products |
| products_on_hand |
+---------------------+
4 rows in set (0.00 sec)
Start Kafka Connect and Debezium
Navigate into the directory $PROJECT_ROOT/connect-mysql
directory. Here, we’ll start a Kubernetes pod that runs Kafka Connect with the Debezium MySQL connector already installed. The Debezium MySQL connector connects to a MySQL database, reads the binlog, and writes those row events to Kafka. Start up Kafka Connect with Debezium on Kubernetes similarly to the previous components:
$ cd ../connect-mysql
$ kubectl create -f target/classes/kubernetes.yml
service "connect-mysql" created
replicationcontroller "connect-mysql" created
Or with the fabric8 maven plugin:
$ cd ../connect-mysql
$ mvn fabric8:apply
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building connect-mysql 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ connect-mysql ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/connect-mysql/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name connect-mysql
[INFO] Created Service: connect-mysql/target/fabric8/applyJson/ticket/service-connect-mysql.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name connect-mysql
[INFO] Created ReplicationController: connect-mysql/target/fabric8/applyJson/ticket/replicationcontroller-connect-mysql.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.255 s
[INFO] Finished at: 2016-05-25T09:21:04-07:00
[INFO] Final Memory: 27M/313M
[INFO] ------------------------------------------------------------------------
Just like in the Docker tutorial for Debezium, we now want to send a JSON object to the Kafka Connect API to start up our Debezium connector. First, we need to expose the API for the Kafka Connect cluster. You can do this however you want: on Kubernetes (Ingress definitions, NodePort services, etc) or on OpenShift you can use OpenShift Routes. For this simple example, we’ll use simple Pod port-forwarding to forward the connect-mysql
pod’s 8083
port to our local machine (again, regardless of where the Pod is actually running the cluster. (This is such an incredible feature of Kubernetes that makes it so easy to develop distributed services!)
Let’s determine the pod name and then use port forwarding to our local machine:
$ CONNECT_POD_NAME=$(kubectl get pod | grep -i running | grep ^connect | awk '{ print $1 }')
$ kubectl port-forward $CONNECT_POD_NAME 8083:8083
I0525 09:30:08.390491 6651 portforward.go:213] Forwarding from 127.0.0.1:8083 -> 8083
I0525 09:30:08.390631 6651 portforward.go:213] Forwarding from [::1]:8083 -> 8083
We are forwarding the pod’s port 8083
to our local machine’s 8083
. Now if we hit http://localhost:8083
it will be directed to the pod which runs our Kafka Connect and Debezium services.
Since it may be useful to see the output from the pod to see whether or not there are any exceptions, start another terminal and type the following to follow the Kafka Connect output:
$ CONNECT_POD_NAME=$(kubectl get pod | grep -i running | grep ^connect | awk '{ print $1 }')
$ kubectl logs -f $CONNECT_POD_NAME
Now, let’s use an HTTP client to post the Debezium Connector/Task to the endpoint we’ve just exposed locally:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "replicator", "database.password": "replpass", "database.server.id": "184054", "database.server.name": "mysql-server-1", "database.binlog": "mysql-bin.000001", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" } }'
If we’re watching the log output for the connect-mysql
pod, we’ll see it eventually end up looking something like this:
2016-05-27 18:50:14,580 - WARN [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 2 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE} 2016-05-27 18:50:14,690 - WARN [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 3 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE} 2016-05-27 18:50:14,911 - WARN [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 7 : {mysql-server-1.inventory.products_on_hand=LEADER_NOT_AVAILABLE} 2016-05-27 18:50:15,136 - WARN [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 10 : {mysql-server-1.inventory.customers=LEADER_NOT_AVAILABLE} 2016-05-27 18:50:15,362 - WARN [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 13 : {mysql-server-1.inventory.orders=LEADER_NOT_AVAILABLE}
These error are just Kafka’s way of telling us the topics didn’t exist but were created.
If we now do a listing of our topics inside Kafka, we should see a Kafka topic for each table in the mysql inventory
database:
$ kubectl exec $KAFKA_POD_NAME -- /kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
my-connect-configs
my-connect-offsets
mysql-server-1.inventory.customers
mysql-server-1.inventory.orders
mysql-server-1.inventory.products
mysql-server-1.inventory.products_on_hand
schema-changes.inventory
Let’s take a look at what’s in one of these topics:
$ kubectl exec $KAFKA_POD_NAME -- /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --topic mysql-server-1.inventory.customers --from-beginning --property print.key=true
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}}
What happened? When we started Debezium’s MySQL connector, it started reading the binary replication log from the MySQL server, and it replayed all of the history and generated an event for each INSERT, UPDATE, and DELETE operation (though in our sample inventory
database we only had INSERTs). If we or some client apps were to commit other changes to the database, Debezium would see those immediately and write those to the correct topic. In other words, Debezium records all of the changes to our MySQL database as events in Kafka topics! And from there, any tool, connector, or service can independnetly consume those event streams from Kafka and process them or put them into a different database, into Hadoop, elasticsearch, data grid, etc.
Cleanup
If you want to delete the connector, simply issue a REST request to remove it:
curl -i -X DELETE -H "Accept:application/json" http://localhost:8083/connectors/inventory-connector
About Debezium
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.
Get involved
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.