
What is CDC/ Change Data Capture
According to wiki, Change data capture is a set of software design patterns used in databases to identify and track changed data so that action may be made based on it. So normally let’s take an example of Microservices, One of the key ideas of Microservices is to create Microservice specific Databases. For instance, I have an Orders Microservice and a Shipping Microservice, and whenever a new order is placed, I need to create record in the Order Microservice database while also publishing the event to Shipping Microservice for further processing .
Now that the order details have changed, I’ll update them in the Order Microservices database at the same time. These details must be sent to the Shipping Microservice. One method to accomplish this is to make code changes in each microservice to allow them to publish and subscribe to events using payloads. As a result, the publishing microservice will need a lot of code changes.
Using Debezium For Tracking DB changes and Publishing them
Debezium is an open source distributed platform for change data capture, according to Debezium. When you start it up and point it towards your databases, your apps will be able to respond to all of the inserts, updates, and deletes that other apps make to your databases. Debezium is both resilient and speedy, allowing your apps to reply swiftly and never miss an event, even if something goes wrong.
Debezium is a CDC tool that captures database changes as events and sends them to multiple sources like as Kafka, Kinesis, Apache Pulsar, and Google Pubsub. It works with a wide range of databases, including MySQL, SQL Server, and PostgreSQL.
Architecture Of Debezium Server:
We’ll look at how to use Debezium Server to communicate CDC events into Kinesis using Debezium. This is a Debezium deployment that uses Kinesis, Apache Pulsar, and Google Pubsub.
On a high level, the Debezium server is set up to capture updates from the source database using one of the Debezium source connectors. Change events can be serialised to JSON or Apache Avro and then transmitted to one of a number of messaging infrastructures, including Amazon Kinesis, Google Cloud Pub/Sub, and Apache Pulsar.
Mysql Configuration
- Before actually getting started with CDC in Mysql, make sure that we have CDC enabled in MySQL
- In this example, I’ll use the Mysql docker container to start a MySql instance using the most recent version.
docker run --name some-mysql -e MYSQL_ROOT_PASSWORD=my-secret-pw -p 3308:3306 -d mysql:latest
SHOW VARIABLES LIKE 'log_bin'; -- Value should be ON
SHOW VARIABLES -- Display information about various config values
SHOW VARIABLES like '%binlog_format%' -- This should be set to ROW or Mixed
With this, we are all set with the Mysql configuration.
- Create a test database called “TestDB”
- Create a test table called “Orders”
CREATE TABLE `TestDB`.`Orders` (
`Id` INT NOT NULL,
`ProductName` VARCHAR(150) NULL,
`OrderDate` DATETIME NULL,
PRIMARY KEY (`Id`));
Create an EC2 instance or Ubuntu Docker Conatiner for Installing Debezium:
On my Windows desktop, I’m going to create a docker ubuntu container, which I’ll use to run Debezium Server. I’m also connecting my MySQL container ( some-MySQL is my container name of MySql)
docker run -it --link some-mysql ubuntu bash
Install AWS CLI:
https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html
apt-get update
apt-get install awscli
apt-get install awscli
apt-get install wget
aws configure
// Provide AWS Access keys and Secret keys
Installing Debezium Server
Download content from this URL to get started with Debezium Server (this is different from actual Debezium deployment).
To download latest version you can browse to https://debezium.io/documentation/reference/operations/debezium-server.html
wget -O debezium.tar.gz "https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/1.5.0.Alpha1/debezium-server-dist-1.5.0.Alpha1.tar.gz"
tar -xf debezium.tar.gz
- Unzip the content of the above file onto the disk
- Make sure you have Java installed. If not install Java
apt-get install openjdk-8-jre
- Once we unzip content, create a config file called application.properties in conf\ folder, For example ‘/home/ubuntu/debezium-server/conf’
touch conf/application.properties mkdir data touch data/offsets.dat
debezium.sink.type=kinesis debezium.sink.kinesis.region=ap-south-1 debezium.sink.kinesis.credentials.profile=default debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=some-mysql debezium.source.database.port=3306 debezium.source.database.user=root debezium.source.database.password=my-secret-pw debezium.source.database.dbname=TestDB debezium.source.database.server.name=debezium-tutorial debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory debezium.source.database.history.file.filename=history.dat
- Create a file called offsets.dat in the data folder ( Create data folder too)
- Make sure you create required streams in Kinesisi. debezium-tutorial (this is the name that we got from ‘debezium.source.database.server.name’ prop)
- Now create Kinesis Streams for each table in this format («debezium.source.database.server.name».«debezium.source.database.dbname».«tablename») For example: debezium-tutorial.TestDB.Orders
Download(commons-logging-1.2-bin.tar.gz) them from http://commons.apache.org/proper/commons-logging/download_logging.cgi
root@1f791b7db475:/debezium-server#wget -O apache.common.log.tar.gz https://downloads.apache.org//commons/logging/binaries/commons-logging-1.2-bin.tar.gz [email protected]:/debezium-server#tar -xvf apache.common.log.tar.gz [email protected]:/debezium-server#cp commons-logging-1.2/commons-logging-1.2.jar lib
So, we are all set to stream our changes in DB to Kinesis. Debezium runs in the background and pushes events into Stream and we can have a sample python script to capture data from these streams.
Here is the sample Python file to read data from Kinesis Stream
pip install boto3
from __future__ import print_function import boto3 from datetime import datetime import time def main(): my_stream_name="debezium-tutorial.TestDB.Orders" kinesis_client = boto3.client("kinesis", region_name='ap-south-1', aws_access_key_id="<<your_access_key>>", aws_secret_access_key="<<your_secret_key>>") response = kinesis_client.describe_stream(StreamName=my_stream_name) my_shard_id = response['StreamDescription']['Shards'][0]['ShardId'] # We use ShardIteratorType of LATEST which means that we start to look # at the end of the stream for new incoming data. Note that this means # you should be running the this lambda before running any write lambdas # shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name, ShardId=my_shard_id, ShardIteratorType='TRIM_HORIZON') # get your shard number and set up iterator my_shard_iterator = shard_iterator['ShardIterator'] record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,Limit=100) while 'NextShardIterator' in record_response: # read up to 100 records at a time from the shard number record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],Limit=100) # Print only if we have something if(record_response['Records']): print (record_response) # wait for 1 seconds before looping back around to see if there is any more data to read time.sleep(1) if __name__ == "__main__": main()
Here is a sample json that we get for each event :
Insert Record equivalent Payload :
Insert into Orders values (6,'Test',CurDate()) { "schema": { "type": "struct", "fields": [{ "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "Id" }, { "type": "string", "optional": true, "field": "ProductName" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "OrderDate" } ], "optional": true, "name": "debezium_tutorial.TestDB.Orders.Value", "field": "before" }, { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "Id" }, { "type": "string", "optional": true, "field": "ProductName" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "OrderDate" } ], "optional": true, "name": "debezium_tutorial.TestDB.Orders.Value", "field": "after" }, { "type": "struct", "fields": [{ "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [{ "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "field": "transaction" } ], "optional": false, "name": "debezium_tutorial.TestDB.Orders.Envelope" }, "payload": { "before": null, "after": { "Id": 6, "ProductName": "Test", "OrderDate": 1614124800000 }, "source": { "version": "1.5.0.Alpha1", "connector": "mysql", "name": "debezium-tutorial", "ts_ms": 1614149085000, "snapshot": "false", "db": "TestDB", "table": "Orders", "server_id": 1, "gtid": null, "file": "binlog.000003", "pos": 384, "row": 0, "thread": 9, "query": null }, "op": "c", "ts_ms": 1614149085256, "transaction": null } }
Update record payload and we can see before and after values tooupdate Orders set Productname='UpdatedProctuctName' where Id=6 { "schema": { "type": "struct", "fields": [{ "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "Id" }, { "type": "string", "optional": true, "field": "ProductName" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "OrderDate" } ], "optional": true, "name": "debezium_tutorial.TestDB.Orders.Value", "field": "before" }, { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "Id" }, { "type": "string", "optional": true, "field": "ProductName" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "OrderDate" } ], "optional": true, "name": "debezium_tutorial.TestDB.Orders.Value", "field": "after" }, { "type": "struct", "fields": [{ "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [{ "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "field": "transaction" } ], "optional": false, "name": "debezium_tutorial.TestDB.Orders.Envelope" }, "payload": { "before": { "Id": 6, "ProductName": "Test", "OrderDate": 1614124800000 }, "after": { "Id": 6, "ProductName": "UpdatedProctuctName", "OrderDate": 1614124800000 }, "source": { "version": "1.5.0.Alpha1", "connector": "mysql", "name": "debezium-tutorial", "ts_ms": 1614149449000, "snapshot": "false", "db": "TestDB", "table": "Orders", "server_id": 1, "gtid": null, "file": "binlog.000003", "pos": 695, "row": 0, "thread": 9, "query": null }, "op": "u", "ts_ms": 1614149449810, "transaction": null } }
Delete record payload. We can see type of operation as ‘d’
delete from Orders where Id=6 { "schema": { "type": "struct", "fields": [{ "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "Id" }, { "type": "string", "optional": true, "field": "ProductName" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "OrderDate" } ], "optional": true, "name": "debezium_tutorial.TestDB.Orders.Value", "field": "before" }, { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "Id" }, { "type": "string", "optional": true, "field": "ProductName" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "OrderDate" } ], "optional": true, "name": "debezium_tutorial.TestDB.Orders.Value", "field": "after" }, { "type": "struct", "fields": [{ "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [{ "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "field": "transaction" } ], "optional": false, "name": "debezium_tutorial.TestDB.Orders.Envelope" }, "payload": { "before": { "Id": 6, "ProductName": "UpdatedProctuctName", "OrderDate": 1614124800000 }, "after": null, "source": { "version": "1.5.0.Alpha1", "connector": "mysql", "name": "debezium-tutorial", "ts_ms": 1614149647000, "snapshot": "false", "db": "TestDB", "table": "Orders", "server_id": 1, "gtid": null, "file": "binlog.000003", "pos": 1029, "row": 0, "thread": 9, "query": null }, "op": "d", "ts_ms": 1614149647433, "transaction": null } }