Skip to main content

A Tutorial Guide for Debezium Data Synchronization

· 3 min read
Barry Logen

Debezium is an open-source CDC (Change Data Capture) tool that streams database changes in real-time to Kafka, where data can be further processed or synchronized with various databases like MySQL, Oracle, PostgreSQL, StarRocks, and more through Sink Connectors.

Using a MySQL -> Kafka -> StarRocks pipeline as an example, this guide demonstrates how Debezium enables efficient and stable data synchronization across systems.

debezium_01

Debezium Environment Setup

  • One-click deployment of related resources (Docker) debezium-test.tar.gz
    • Kafka Cluster + Kafka UI (Middleware)
    • Debezium (Synchronization Tool)
    • MySQL (Source)
    • StarRocks (Target)
    tar -xzvf debezium-test.tar.gz
    sh install.sh

Create MySQL Source Connector

  • The source is MySQL, created using the table below.

    CREATE DATABASE `inventory`;

    CREATE TABLE `inventory`.`customer` (
    `c_int` int NOT NULL,
    `c_bigint` bigint NOT NULL,
    `c_decimal` decimal(10,3) NOT NULL,
    `c_date` date NOT NULL,
    `c_datetime` datetime NOT NULL,
    `c_timestamp` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
    `c_year` int NOT NULL,
    `c_varchar` varchar(10) NOT NULL,
    `c_text` text NOT NULL,
    PRIMARY KEY (`c_int`)
    );
  • Create a Connector to subscribe to MySQL change events via Debezium’s API.

    curl -i -X POST http://127.0.0.1:7750/connectors \
    -H 'Content-Type: application/json' \
    -d '{
    "name": "connector-test-mx",
    "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "112.124.38.87",
    "database.port": "25000",
    "database.user": "root",
    "database.password": "123456",
    "database.server.id": "1",
    "database.server.name": "mx",
    "database.include.list": "inventory",
    "decimal.handling.mode": "string",
    "binary.handling.mode": "hex",
    "topic.prefix": "mx",
    "table.include.list": "inventory.customer",
    "snapshot.mode": "never",
    "database.history.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092",
    "schema.history.internal.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092",
    "schema.history.internal.kafka.topic": "mx.schemahistory.customer",
    "database.history.kafka.topic": "mx.mx_history_schema",
    "include.schema.changes": "false",
    "converters": "mysqltime",
    "mysqltime.type": "io.debezium.converter.MySQLTimeConverter",
    "mysqltime.format.date": "yyyy-MM-dd",
    "mysqltime.format.time": "HH:mm:ss",
    "mysqltime.format.datetime": "yyyy-MM-dd HH:mm:ss",
    "mysqltime.format.timestamp": "yyyy-MM-dd HH:mm:ss",
    "mysqltime.format.timestamp.zone": "UTC+8"
    }
    }'
  • After creation, check the status of the Connector.

    curl -s http://127.0.0.1:7750/connectors/connector-test-mx/status

Create Sink Connector for StarRocks

  • The target is StarRocks, created using the table below.

    CREATE DATABASE `inventory`;

    CREATE TABLE `inventory`.`customer` (
    `c_int` int NOT NULL,
    `c_bigint` bigint NOT NULL,
    `c_decimal` decimal(10,3) NOT NULL,
    `c_date` date NOT NULL,
    `c_datetime` datetime NOT NULL,
    `c_timestamp` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
    `c_year` int NOT NULL,
    `c_varchar` varchar(10) NOT NULL,
    `c_text` text NOT NULL
    ) ENGINE=OLAP
    PRIMARY KEY(`c_int`)
    DISTRIBUTED BY HASH(`c_int`) BUCKETS 4
    PROPERTIES (
    "replication_num" = "1",
    "in_memory" = "false",
    "storage_format" = "DEFAULT",
    "enable_persistent_index" = "false",
    "replicated_storage" = "true",
    "compression" = "LZ4"
    );
  • Create a Sink Connector via Debezium’s API to write Kafka change data into StarRocks.

    curl -i -X POST http://127.0.0.1:7750/connectors \
    -H 'Content-Type: application/json' \
    -d '{
    "name":"jdbc-sink-starrocks",
    "config":{
    "connector.class":"io.debezium.connector.jdbc.JdbcSinkConnector",
    "connection.url":"jdbc:mysql://112.124.38.87:19030/inventory",
    "connection.username": "root",
    "connection.password": "123456",
    "topics":"mx.inventory.customer",
    "auto.create":"false",
    "insert.mode": "insert",
    "delete.enabled": "true",
    "primary.key.mode":"record_key",
    "primary.key.fields":"c_int",
    "table.name.format": "inventory.customer"
    }
    }'

Data Synchronization Test

  • Perform random CRUD operations on MySQL. Debezium will capture MySQL’s CDC change data and write it to Kafka; the Sink Connector will write Kafka data into StarRocks.
  • Conduct a data comparison test to verify data consistency on both sides.
  • Pause Debezium’s Sink task.
    curl -i -X PUT  http://127.0.0.1:7750/connectors/jdbc-sink-starrocks/pause

Conclusion

Using Debezium for overall data synchronization has been relatively smooth, with a rich ecosystem supported by various plugins.

I personally feel it’s more suitable for development and can be integrated into internal business systems based