Debezium is an open-source CDC (Change Data Capture) tool that streams database changes in real-time to Kafka, where data can be further processed or synchronized with various databases like MySQL, Oracle, PostgreSQL, StarRocks, and more through Sink Connectors.
Using a MySQL -> Kafka -> StarRocks pipeline as an example, this guide demonstrates how Debezium enables efficient and stable data synchronization across systems.
Debezium Environment Setup
- One-click deployment of related resources (Docker) debezium-test.tar.gz
- Kafka Cluster + Kafka UI (Middleware)
- Debezium (Synchronization Tool)
- MySQL (Source)
- StarRocks (Target)
tar -xzvf debezium-test.tar.gz
sh install.sh
Create MySQL Source Connector
The source is MySQL, created using the table below.
CREATE DATABASE `inventory`;
CREATE TABLE `inventory`.`customer` (
`c_int` int NOT NULL,
`c_bigint` bigint NOT NULL,
`c_decimal` decimal(10,3) NOT NULL,
`c_date` date NOT NULL,
`c_datetime` datetime NOT NULL,
`c_timestamp` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`c_year` int NOT NULL,
`c_varchar` varchar(10) NOT NULL,
`c_text` text NOT NULL,
PRIMARY KEY (`c_int`)
);Create a Connector to subscribe to MySQL change events via Debezium’s API.
curl -i -X POST http://127.0.0.1:7750/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "connector-test-mx",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "112.124.38.87",
"database.port": "25000",
"database.user": "root",
"database.password": "123456",
"database.server.id": "1",
"database.server.name": "mx",
"database.include.list": "inventory",
"decimal.handling.mode": "string",
"binary.handling.mode": "hex",
"topic.prefix": "mx",
"table.include.list": "inventory.customer",
"snapshot.mode": "never",
"database.history.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092",
"schema.history.internal.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092",
"schema.history.internal.kafka.topic": "mx.schemahistory.customer",
"database.history.kafka.topic": "mx.mx_history_schema",
"include.schema.changes": "false",
"converters": "mysqltime",
"mysqltime.type": "io.debezium.converter.MySQLTimeConverter",
"mysqltime.format.date": "yyyy-MM-dd",
"mysqltime.format.time": "HH:mm:ss",
"mysqltime.format.datetime": "yyyy-MM-dd HH:mm:ss",
"mysqltime.format.timestamp": "yyyy-MM-dd HH:mm:ss",
"mysqltime.format.timestamp.zone": "UTC+8"
}
}'After creation, check the status of the Connector.
curl -s http://127.0.0.1:7750/connectors/connector-test-mx/status
Create Sink Connector for StarRocks
The target is StarRocks, created using the table below.
CREATE DATABASE `inventory`;
CREATE TABLE `inventory`.`customer` (
`c_int` int NOT NULL,
`c_bigint` bigint NOT NULL,
`c_decimal` decimal(10,3) NOT NULL,
`c_date` date NOT NULL,
`c_datetime` datetime NOT NULL,
`c_timestamp` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`c_year` int NOT NULL,
`c_varchar` varchar(10) NOT NULL,
`c_text` text NOT NULL
) ENGINE=OLAP
PRIMARY KEY(`c_int`)
DISTRIBUTED BY HASH(`c_int`) BUCKETS 4
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
);Create a Sink Connector via Debezium’s API to write Kafka change data into StarRocks.
curl -i -X POST http://127.0.0.1:7750/connectors \
-H 'Content-Type: application/json' \
-d '{
"name":"jdbc-sink-starrocks",
"config":{
"connector.class":"io.debezium.connector.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://112.124.38.87:19030/inventory",
"connection.username": "root",
"connection.password": "123456",
"topics":"mx.inventory.customer",
"auto.create":"false",
"insert.mode": "insert",
"delete.enabled": "true",
"primary.key.mode":"record_key",
"primary.key.fields":"c_int",
"table.name.format": "inventory.customer"
}
}'
Data Synchronization Test
- Perform random CRUD operations on MySQL. Debezium will capture MySQL’s CDC change data and write it to Kafka; the Sink Connector will write Kafka data into StarRocks.
- Conduct a data comparison test to verify data consistency on both sides.
- Pause Debezium’s Sink task.
curl -i -X PUT http://127.0.0.1:7750/connectors/jdbc-sink-starrocks/pause
Conclusion
Using Debezium for overall data synchronization has been relatively smooth, with a rich ecosystem supported by various plugins.
I personally feel it’s more suitable for development and can be integrated into internal business systems based