Log based CDC between DB2 & MySQL using Debezium

Raghavendiran Sudhakaran
3 min readMar 17, 2020

Debezium is a distributed platform for CDC. It can be run as a Kafka Connect plugin or embedded into your application, allowing your app to receive all the data change events. My objective for this example is to setup a CDC pipeline, streaming all data changes from a DB2 database (source) via Kafka into a MySQL database (sink).

At the time of this writing, Debezium DB2 connector is still incubating. It did work as expected for the basic tests I performed. For more details, check their github repo.

The entire process to setup the source DB2 database and the DB2 Debezium connector is very clearly documented on their official DB2 connector page. I will use the CUSTOMERS table created as part of their instructions for CDC.

Before setting up the source and sink connectors, I want to highlight the 2 areas where some additional work was needed on source side.

Setting up CDC on DB2

Debezium team provides a neat utility to enable CDC on DB2, without the need to install any additional tools other than the DB2 docker image.

Detailed instructions are available here. However, there was a slight difference when using the DB2 docker image. The bldtrn utility needs to be copied from any existing location (I used $HOME/sqllib/samples/c/bldtrn) into the source files we copy.

JDBC Driver for IBM DB2

JDBC driver for IBM DB2 is not supplied as part of Debezium Kafka Connect due to copyright issues. So, it needs to be manually copied inside the Kafka Connect container.

Setting up the Source Connector

At this point, the next step is to register the Debezium DB2 Connector into Kafka Connect to start streaming the DB2 changes. Posting the below payload to Kafka Connect would register and start the DB2 connector.

{
"name": "db2-connector",
"config": {
"connector.class": "io.debezium.connector.db2.Db2Connector",
"database.hostname": "db2",
"database.port": "50000",
"database.user": "db2inst1",
"database.password": "password",
"database.dbname": "testdb",
"database.server.name": "testdb",
"table.whitelist": "TESTSCHEMA.customers",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testdb",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "testdb.TESTSCHEMA.(.*)",
"transforms.route.replacement": "$1"
}
}

Note that a message transformation has been placed to change the topic name inside Kafka. By default, the topic name is the table’s name prefixed with the database name and schema name. This behavior would break the Jdbc Sink connector. So, setting the RegexRouter, to extract only the table name from the generated topic name, forces Kafka to store the topic name as simply the table name.

Once the connector is created, the topics and the message payloads for each change can be verified with a simple Kafka console consumer.

Setting up the Sink Connector

The final step in the whole CDC pipeline, is to setup the sink connector, to get the data into MySQL.

The Debezium messages generated by the source connector need to be unwrapped before they can be processed by the Jdbc Sink Connector. To achieve this, the unwrap transformation will be added as part of the sink connector.

A sample payload would be like below:

{
"name": "sink-mysql-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "CUSTOMERS",
"connection.url": "jdbc:mysql://mysql:3306/testschema?user=mysqluser&password=mysqlpw&nullCatalogMeansCurrent=true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "ID",
"pk.mode": "record_key"
}
}

Note: I had to use nullCatalogMeansCurrent=true to the jdbc URL as the newest MySQL connector defaults it to true.

Once the connector starts, it creates the table and inserts the records captured so far, from the DB2 database. Going forward, any change made to the DB2 source DB will start reflecting to your MySQL DB automatically.

Enjoy the magic!

--

--