Kafka Connect and Debezium CDC Streaming
Kafka Connect Debezium enables real-time Change Data Capture (CDC) streaming from databases to Apache Kafka. Instead of building custom data synchronization code, Debezium monitors database transaction logs and streams every insert, update, and delete as an event to Kafka topics. This powers real-time analytics, search index updates, cache invalidation, and cross-service data replication without modifying application code.
This guide covers building production CDC pipelines with Kafka Connect and Debezium, including connector configuration for PostgreSQL and MySQL, Single Message Transforms (SMTs), schema evolution handling, and monitoring strategies.
CDC Architecture Overview
Change Data Capture Pipeline
┌──────────────┐ Transaction ┌──────────────┐
│ PostgreSQL │───── Log ────────▶│ Debezium │
│ (Source DB) │ (WAL/Binlog) │ Connector │
└──────────────┘ └──────┬───────┘
│
┌──────▼───────┐
│ Kafka │
│ Topics │
└──────┬───────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Elasticsearch│ │ Data │ │ Analytics │
│ (Search) │ │ Warehouse │ │ Service │
└─────────────┘ └─────────────┘ └────────────┘Setting Up Kafka Connect
# docker-compose.yml — Kafka Connect with Debezium
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:9093'
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
CLUSTER_ID: 'q1Sh-9_ISia_zwGINzRvyQ'
connect:
image: debezium/connect:2.6
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: connect-cluster
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
depends_on:
- kafka
postgres:
image: postgres:16
environment:
POSTGRES_DB: myapp
POSTGRES_USER: appuser
POSTGRES_PASSWORD: secret
command:
- postgres
- -c
- wal_level=logical # Required for CDC!Configuring the Debezium Connector
The connector configuration specifies which database and tables to monitor. Moreover, Debezium supports fine-grained control over which changes to capture:
{
"name": "postgres-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${file:/secrets/db-password}",
"database.dbname": "myapp",
"topic.prefix": "myapp.cdc",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_pub",
"table.include.list": "public.orders,public.customers,public.products",
"column.exclude.list": "public.customers.ssn,public.customers.password_hash",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"transforms": "route,unwrap,timestamp",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "myapp\\.cdc\\.public\\.(.*)",
"transforms.route.replacement": "cdc.$1",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,source.ts_ms",
"transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp.target.type": "string",
"transforms.timestamp.field": "__source_ts_ms",
"transforms.timestamp.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"heartbeat.interval.ms": 10000,
"snapshot.mode": "initial",
"tombstones.on.delete": true,
"decimal.handling.mode": "string",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "cdc.dlq",
"errors.deadletterqueue.context.headers.enable": true
}
}# Deploy the connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @postgres-cdc-connector.json
# Check connector status
curl http://localhost:8083/connectors/postgres-cdc-connector/status | jq
# List topics created
kafka-topics --bootstrap-server kafka:9092 --list | grep cdcConsuming CDC Events
CDC events contain the full before and after state of each record. Therefore, consumers can implement any downstream processing logic:
@Component
public class OrderCdcConsumer {
private final ElasticsearchClient esClient;
private final CacheManager cacheManager;
@KafkaListener(topics = "cdc.orders",
groupId = "search-indexer")
public void handleOrderChange(ConsumerRecord<String, String> record) {
ObjectMapper mapper = new ObjectMapper();
JsonNode event = mapper.readTree(record.value());
String operation = event.get("__op").asText();
String orderId = event.get("id").asText();
switch (operation) {
case "c", "u" -> {
// Create or update: index in Elasticsearch
OrderDocument doc = mapper.treeToValue(
event, OrderDocument.class);
esClient.index(i -> i
.index("orders")
.id(orderId)
.document(doc));
// Invalidate cache
cacheManager.getCache("orders")
.evict(orderId);
}
case "d" -> {
// Delete: remove from search index
esClient.delete(d -> d
.index("orders")
.id(orderId));
cacheManager.getCache("orders")
.evict(orderId);
}
}
}
}Monitoring and Operations
# Key JMX metrics to monitor
# Source connector metrics:
# - MilliSecondsBehindSource: replication lag
# - TotalNumberOfEventsSeen: throughput
# - NumberOfEventsFiltered: filtered events
# - SnapshotCompleted: initial snapshot status
# Kafka Connect metrics:
# - connector-status: running/paused/failed
# - task-count: number of active tasks
# - offset-commit-success-rate: offset reliability
# Alert thresholds:
# MilliSecondsBehindSource > 30000 → WARNING
# MilliSecondsBehindSource > 300000 → CRITICAL
# connector-status != RUNNING → CRITICALWhen NOT to Use Kafka Connect with Debezium
CDC adds operational complexity with replication slots, connector management, and schema evolution handling. If your data synchronization needs are simple and infrequent, batch ETL jobs may be more appropriate. Furthermore, databases with extremely high write throughput (100,000+ transactions/second) can generate more CDC events than Kafka can handle without significant infrastructure investment. Additionally, if you only need to replicate data between two PostgreSQL instances, native logical replication is simpler than the Debezium pipeline. Evaluate whether the real-time requirement justifies the operational overhead.
Key Takeaways
- Kafka Connect Debezium captures database changes from transaction logs without modifying application code
- Single Message Transforms (SMTs) reshape events — route topics, extract new state, convert timestamps
- Dead letter queues prevent poison messages from blocking the entire pipeline
- Monitor MilliSecondsBehindSource to detect replication lag before it impacts downstream consumers
- Use CDC for real-time search indexing, cache invalidation, analytics, and cross-service data replication
Related Reading
- Redis Streams Event-Driven Architecture
- PostgreSQL 18 New Features and Performance
- ClickHouse vs DuckDB Analytical Databases