Apache Kafka Connect with Debezium CDC: Real-Time Data Streaming Pipeline

Kafka Connect and Debezium CDC Streaming

Kafka Connect Debezium enables real-time Change Data Capture (CDC) streaming from databases to Apache Kafka. Instead of building custom data synchronization code, Debezium monitors database transaction logs and streams every insert, update, and delete as an event to Kafka topics. This powers real-time analytics, search index updates, cache invalidation, and cross-service data replication without modifying application code.

This guide covers building production CDC pipelines with Kafka Connect and Debezium, including connector configuration for PostgreSQL and MySQL, Single Message Transforms (SMTs), schema evolution handling, and monitoring strategies.

CDC Architecture Overview

Change Data Capture Pipeline

┌──────────────┐    Transaction    ┌──────────────┐
│  PostgreSQL  │───── Log ────────▶│  Debezium    │
│  (Source DB) │    (WAL/Binlog)   │  Connector   │
└──────────────┘                   └──────┬───────┘
                                          │
                                   ┌──────▼───────┐
                                   │  Kafka       │
                                   │  Topics      │
                                   └──────┬───────┘
                                          │
                    ┌─────────────────────┼─────────────────────┐
                    │                     │                     │
             ┌──────▼──────┐      ┌──────▼──────┐      ┌──────▼──────┐
             │ Elasticsearch│      │ Data        │      │ Analytics  │
             │ (Search)     │      │ Warehouse   │      │ Service    │
             └─────────────┘      └─────────────┘      └────────────┘
Kafka Connect Debezium CDC streaming architecture
Debezium captures database changes from transaction logs without impacting application performance

Setting Up Kafka Connect

# docker-compose.yml — Kafka Connect with Debezium
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:9093'
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      CLUSTER_ID: 'q1Sh-9_ISia_zwGINzRvyQ'

  connect:
    image: debezium/connect:2.6
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: connect-cluster
      CONFIG_STORAGE_TOPIC: connect-configs
      OFFSET_STORAGE_TOPIC: connect-offsets
      STATUS_STORAGE_TOPIC: connect-status
      KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
    depends_on:
      - kafka

  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: myapp
      POSTGRES_USER: appuser
      POSTGRES_PASSWORD: secret
    command:
      - postgres
      - -c
      - wal_level=logical  # Required for CDC!

Configuring the Debezium Connector

The connector configuration specifies which database and tables to monitor. Moreover, Debezium supports fine-grained control over which changes to capture:

{
  "name": "postgres-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${file:/secrets/db-password}",
    "database.dbname": "myapp",
    "topic.prefix": "myapp.cdc",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "debezium_pub",

    "table.include.list": "public.orders,public.customers,public.products",
    "column.exclude.list": "public.customers.ssn,public.customers.password_hash",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,

    "transforms": "route,unwrap,timestamp",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "myapp\\.cdc\\.public\\.(.*)",
    "transforms.route.replacement": "cdc.$1",

    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": false,
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "op,source.ts_ms",

    "transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.timestamp.target.type": "string",
    "transforms.timestamp.field": "__source_ts_ms",
    "transforms.timestamp.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",

    "heartbeat.interval.ms": 10000,
    "snapshot.mode": "initial",
    "tombstones.on.delete": true,
    "decimal.handling.mode": "string",

    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "cdc.dlq",
    "errors.deadletterqueue.context.headers.enable": true
  }
}
# Deploy the connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @postgres-cdc-connector.json

# Check connector status
curl http://localhost:8083/connectors/postgres-cdc-connector/status | jq

# List topics created
kafka-topics --bootstrap-server kafka:9092 --list | grep cdc

Consuming CDC Events

CDC events contain the full before and after state of each record. Therefore, consumers can implement any downstream processing logic:

@Component
public class OrderCdcConsumer {

    private final ElasticsearchClient esClient;
    private final CacheManager cacheManager;

    @KafkaListener(topics = "cdc.orders",
                   groupId = "search-indexer")
    public void handleOrderChange(ConsumerRecord<String, String> record) {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode event = mapper.readTree(record.value());

        String operation = event.get("__op").asText();
        String orderId = event.get("id").asText();

        switch (operation) {
            case "c", "u" -> {
                // Create or update: index in Elasticsearch
                OrderDocument doc = mapper.treeToValue(
                    event, OrderDocument.class);
                esClient.index(i -> i
                    .index("orders")
                    .id(orderId)
                    .document(doc));
                // Invalidate cache
                cacheManager.getCache("orders")
                    .evict(orderId);
            }
            case "d" -> {
                // Delete: remove from search index
                esClient.delete(d -> d
                    .index("orders")
                    .id(orderId));
                cacheManager.getCache("orders")
                    .evict(orderId);
            }
        }
    }
}
CDC event consumption and downstream processing
CDC events drive real-time updates to search indexes, caches, and analytics systems

Monitoring and Operations

# Key JMX metrics to monitor
# Source connector metrics:
# - MilliSecondsBehindSource: replication lag
# - TotalNumberOfEventsSeen: throughput
# - NumberOfEventsFiltered: filtered events
# - SnapshotCompleted: initial snapshot status

# Kafka Connect metrics:
# - connector-status: running/paused/failed
# - task-count: number of active tasks
# - offset-commit-success-rate: offset reliability

# Alert thresholds:
# MilliSecondsBehindSource > 30000 → WARNING
# MilliSecondsBehindSource > 300000 → CRITICAL
# connector-status != RUNNING → CRITICAL

When NOT to Use Kafka Connect with Debezium

CDC adds operational complexity with replication slots, connector management, and schema evolution handling. If your data synchronization needs are simple and infrequent, batch ETL jobs may be more appropriate. Furthermore, databases with extremely high write throughput (100,000+ transactions/second) can generate more CDC events than Kafka can handle without significant infrastructure investment. Additionally, if you only need to replicate data between two PostgreSQL instances, native logical replication is simpler than the Debezium pipeline. Evaluate whether the real-time requirement justifies the operational overhead.

Data streaming pipeline decision framework
Choose CDC when real-time data synchronization is a genuine business requirement

Key Takeaways

  • Kafka Connect Debezium captures database changes from transaction logs without modifying application code
  • Single Message Transforms (SMTs) reshape events — route topics, extract new state, convert timestamps
  • Dead letter queues prevent poison messages from blocking the entire pipeline
  • Monitor MilliSecondsBehindSource to detect replication lag before it impacts downstream consumers
  • Use CDC for real-time search indexing, cache invalidation, analytics, and cross-service data replication

Related Reading

External Resources

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top