Change Data Capture (CDC)¶
"Polling tells you what changed. Logs tell you what happened."
Change Data Capture (CDC) is the process of tracking and propagating changes (INSERT, UPDATE, DELETE) in a source database to downstream systems — data warehouses, caches, search indexes, or event streams — in near real-time.
This notebook walks you through five progressive labs, each exposing a limitation that motivates the next approach:
| Lab | Approach | Core Idea | Captures Deletes? | DB Load |
|---|---|---|---|---|
| 1 | Timestamp Polling | Query last_updated column |
❌ | High |
| 2 | Cron Batch CDC | Schedule the poller | ❌ | High |
| 3 | Redis Pub/Sub | Application publishes events | ✅ | None |
| 4 | Redis Queue Hybrid | Poller pushes into queue | ✅ (manual) | Medium |
| 5 | Log-Based CDC (Debezium) | Read DB transaction log | ✅ | None |
| 6 | Warehouse Simulation | Apply events to target table | ✅ | None |
Prerequisites¶
Python packages¶
pip install --user redis
External services¶
# Redis (required for Labs 3 & 4)
# If Docker is available without sudo:
docker run -d --name redis-cdc -p 6379:6379 redis:7
# OR install Redis locally (no sudo needed on many systems):
# Download: https://redis.io/download then run: ./src/redis-server
SQLite¶
SQLite ships with Python's standard library — no installation needed.
Lab 1: Baseline — Query-Based CDC (Polling)¶
Goal¶
Understand timestamp-based polling as the simplest CDC approach — and understand why it is fundamentally flawed.
Idea¶
Every row carries a last_updated timestamp. The poller remembers the last timestamp it saw and asks:
"Give me everything newer than
last_seen."
Architecture¶
┌─────────────┐ writes ┌───────────────────────────┐
│ Application │ ──────────▶ │ SQLite DB (users table) │
└─────────────┘ └────────────┬──────────────┘
│ SELECT WHERE last_updated > ?
│ (every N seconds)
▼
┌───────────────────────────┐
│ Polling Consumer │
└───────────────────────────┘
import sqlite3
import os
DB_PATH = os.path.expanduser("~/cdc_demo.db")
# ── Create (or reset) the database ───────────────────────────────────
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.executescript("""
DROP TABLE IF EXISTS users;
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO users (name) VALUES ('Alice'), ('Bob'), ('Charlie');
""")
conn.commit()
rows = cursor.execute("SELECT * FROM users").fetchall()
print("Initial rows in users table:")
print(f" {'id':<5} {'name':<12} {'last_updated'}")
for r in rows:
print(f" {r[0]:<5} {r[1]:<12} {r[2]}")
Initial rows in users table: id name last_updated 1 Alice 2026-04-07 09:44:52 2 Bob 2026-04-07 09:44:52 3 Charlie 2026-04-07 09:44:52
import sqlite3
import time
import os
DB_PATH = os.path.expanduser("~/cdc_demo.db")
def polling_cdc(db_path, poll_interval=2, max_polls=6):
"""Poll the users table for rows newer than last_seen."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
last_seen = "1970-01-01 00:00:00" # epoch — capture everything on first run
for poll_num in range(1, max_polls + 1):
rows = cursor.execute(
"SELECT * FROM users WHERE last_updated > ?",
(last_seen,)
).fetchall()
if rows:
for row in rows:
print(f" [Poll #{poll_num}] CHANGE → id={row[0]}, name='{row[1]}', updated={row[2]}")
last_seen = row[2] # advance cursor to latest timestamp
else:
print(f" [Poll #{poll_num}] No new changes.")
time.sleep(poll_interval)
conn.close()
print("▶ Starting poller (6 polls × 2 s)...")
print(" Open a separate terminal and run the SQL changes in the next cell!")
polling_cdc(DB_PATH)
▶ Starting poller (6 polls × 2 s)... Open a separate terminal and run the SQL changes in the next cell! [Poll #1] CHANGE → id=1, name='Alice', updated=2026-04-07 09:44:52 [Poll #1] CHANGE → id=2, name='Bob', updated=2026-04-07 09:44:52 [Poll #1] CHANGE → id=3, name='Charlie', updated=2026-04-07 09:44:52 [Poll #2] No new changes. [Poll #3] No new changes. [Poll #4] No new changes. [Poll #5] No new changes. [Poll #6] No new changes.
# ── 🔥 Exercise: make changes while the poller is running ─────────────
# Run this cell WHILE the cell above is executing.
import sqlite3, time, os
DB_PATH = os.path.expanduser("~/cdc_demo.db")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
time.sleep(3) # wait for 1-2 polls to fire first
# UPDATE — poller WILL catch this
cursor.execute("UPDATE users SET name='Alice_updated', last_updated=CURRENT_TIMESTAMP WHERE id=1")
conn.commit()
print("✅ Updated Alice (id=1)")
time.sleep(2)
# DELETE — poller will NEVER see this
cursor.execute("DELETE FROM users WHERE id=2")
conn.commit()
print("🗑 Deleted Bob (id=2) — the poller won't notice!")
conn.close()
✅ Updated Alice (id=1) 🗑 Deleted Bob (id=2) — the poller won't notice!
Key Observations¶
| Problem | Why it happens |
|---|---|
| Misses hard DELETEs | Deleted rows vanish — no last_updated to query |
| Adds load to source DB | Every poll issues a full WHERE query |
| Race conditions | Two rows updated in the same second may share the same timestamp; one could be skipped |
| No intermediate states | If a row is updated twice between polls, only the final state is seen |
Note¶
Polling is state-based detection, not event-based truth. You see what is, not what happened.
Lab 2: Cron-Based Batch CDC¶
Goal¶
Understand how scheduled batch ingestion differs from continuous polling, and its latency/cost tradeoff.
Idea¶
Instead of polling continuously inside a long-running process, we schedule the poller as a cron job that runs at fixed intervals (e.g., every minute).
Time ────────────────────────────────────────────────────────▶
T=0 T=60s T=120s T=180s
│ │ │ │
▼ ▼ ▼ ▼
[cron] [cron] [cron] [cron]
runs poller runs poller runs poller runs poller
The poller.py Script¶
Save the script below, then wire it to cron.
# ── Write the standalone poller script ────────────────────────────────
import os
script = '''
#!/usr/bin/env python3
"""
poller.py — standalone CDC poller meant to be invoked by cron.
Persists the last-seen timestamp to a state file so each cron
invocation picks up where the previous one left off.
"""
import sqlite3, json, os, datetime
DB_PATH = os.path.expanduser("~/cdc_demo.db")
STATE_FILE = os.path.expanduser("~/cdc_state.json")
LOG_FILE = os.path.expanduser("~/cdc_changes.log")
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE) as f:
return json.load(f).get("last_seen", "1970-01-01 00:00:00")
return "1970-01-01 00:00:00"
def save_state(last_seen):
with open(STATE_FILE, "w") as f:
json.dump({"last_seen": last_seen}, f)
def main():
last_seen = load_state()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
rows = cursor.execute(
"SELECT * FROM users WHERE last_updated > ? ORDER BY last_updated",
(last_seen,)
).fetchall()
conn.close()
now = datetime.datetime.now().isoformat()
with open(LOG_FILE, "a") as log:
log.write(f"[{now}] Poll found {len(rows)} change(s).\\n")
for row in rows:
log.write(f" CHANGE: {row}\\n")
last_seen = row[2]
save_state(last_seen)
print(f"[{now}] Processed {len(rows)} change(s). last_seen={last_seen}")
if __name__ == "__main__":
main()
'''
script_path = os.path.expanduser("~/poller.py")
with open(script_path, "w") as f:
f.write(script.strip())
os.chmod(script_path, 0o755)
print(f"✅ Written: {script_path}")
✅ Written: /home/ankush/poller.py
# ── Simulate three cron invocations programmatically ─────────────────
import subprocess, time, sqlite3, os
DB_PATH = os.path.expanduser("~/cdc_demo.db")
# Reset state file
state_file = os.path.expanduser("~/cdc_state.json")
if os.path.exists(state_file):
os.remove(state_file)
# --- Cron tick 1: baseline -----------------------------------------------
print("=== Cron Tick 1 ===")
subprocess.run(["python3", os.path.expanduser("~/poller.py")])
# Make a change BETWEEN ticks — simulates real app activity
conn = sqlite3.connect(DB_PATH)
conn.execute("UPDATE users SET name='Alice_v2', last_updated=CURRENT_TIMESTAMP WHERE id=1")
conn.commit()
conn.close()
print(" (change made between ticks: Alice → Alice_v2)")
time.sleep(1) # ensure a different timestamp
# --- Cron tick 2: picks up the UPDATE ------------------------------------
print("\n=== Cron Tick 2 ===")
subprocess.run(["python3", os.path.expanduser("~/poller.py")])
# --- Cron tick 3: nothing new ─────────────────────────────────────────
print("\n=== Cron Tick 3 ===")
subprocess.run(["python3", os.path.expanduser("~/poller.py")])
print("\n📄 Change log:")
log_path = os.path.expanduser("~/cdc_changes.log")
if os.path.exists(log_path):
with open(log_path) as f:
print(f.read())
=== Cron Tick 1 === [2026-04-07T15:15:09.154802] Processed 2 change(s). last_seen=2026-04-07 09:45:07 (change made between ticks: Alice → Alice_v2) === Cron Tick 2 === [2026-04-07T15:15:10.213475] Processed 1 change(s). last_seen=2026-04-07 09:45:09 === Cron Tick 3 === [2026-04-07T15:15:10.233828] Processed 0 change(s). last_seen=2026-04-07 09:45:09 📄 Change log: [2026-04-07T15:15:09.154802] Poll found 2 change(s). CHANGE: (3, 'Charlie', '2026-04-07 09:44:52') CHANGE: (1, 'Alice_updated', '2026-04-07 09:45:07') [2026-04-07T15:15:10.213475] Poll found 1 change(s). CHANGE: (1, 'Alice_v2', '2026-04-07 09:45:09') [2026-04-07T15:15:10.233828] Poll found 0 change(s).
How to Wire to Real Cron¶
# Open crontab editor:
crontab -e
# Run the poller every minute:
*/1 * * * * /usr/bin/python3 ~/poller.py >> ~/cdc_changes.log 2>&1
Problems Remain¶
| Problem | Status |
|---|---|
| Misses DELETEs | ❌ Still broken |
| DB load | ❌ Still polling |
| Latency | Minimum = cron interval (1 min typical) |
| Scalability | ❌ State file becomes a bottleneck |
Tradeoff¶
Cron batch CDC trades latency for simplicity. It is the most common approach in legacy ETL pipelines precisely because it requires no infrastructure changes — but it is fundamentally limited.
Key question: How do we get real-time change notification without continuously hammering the DB?
Lab 3: Event-Based CDC using Redis Pub/Sub¶
Goal¶
Shift from polling to event-driven thinking — the application publishes a CDC event every time it mutates the database.
Idea¶
Instead of the consumer asking "anything new?", the producer (application) shouts "something happened!"
┌─────────────┐ INSERT/UPDATE/DELETE ┌─────────────────┐
│ Application │ ──────────────────────▶ │ SQLite DB │
│ │ └─────────────────┘
│ │ publish(cdc_channel) ┌─────────────────┐
│ │ ──────────────────────▶ │ Redis Broker │
└─────────────┘ └────────┬────────┘
│ push (subscribe)
▼
┌─────────────────┐
│ CDC Consumer │
│ (Warehouse / │
│ Search Index) │
└─────────────────┘
Prerequisite¶
Redis must be running on localhost:6379. See prerequisites at the top of the notebook.
# ── Consumer: listens on the CDC channel in a background thread ───────
import redis
import json
import threading
received_events = []
def cdc_consumer(channel="cdc_channel", max_events=5):
"""Subscribe to Redis CDC channel and print every event."""
r = redis.Redis(decode_responses=True)
pubsub = r.pubsub()
pubsub.subscribe(channel)
print(f"📡 Consumer subscribed to '{channel}'")
count = 0
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
count += 1
received_events.append(event)
op_emoji = {"INSERT": "➕", "UPDATE": "✏️", "DELETE": "🗑"}.get(event["op"], "❓")
print(f" [CDC Event #{count}] {op_emoji} op={event['op']}, "
f"table={event['table']}, data={event['data']}")
if count >= max_events:
break
pubsub.unsubscribe()
print(f"\n✅ Consumer done. Received {count} events.")
# Start consumer in background
consumer_thread = threading.Thread(target=cdc_consumer, daemon=True)
consumer_thread.start()
print("Consumer thread started.")
Consumer thread started.
📡 Consumer subscribed to 'cdc_channel'
[CDC Event #1] ➕ op=INSERT, table=users, data={'id': 4, 'name': 'Diana'}
[CDC Event #2] ✏️ op=UPDATE, table=users, data={'id': 1, 'name': 'Alice_final'}
[CDC Event #3] 🗑 op=DELETE, table=users, data={'id': 3}
[CDC Event #4] ✏️ op=UPDATE, table=users, data={'id': 1, 'name': 'Alice_v3'}
[CDC Event #5] ➕ op=INSERT, table=users, data={'id': 5, 'name': 'Eve'}
✅ Consumer done. Received 5 events.
# ── Producer: application that writes to DB AND publishes CDC events ──
import redis
import sqlite3
import json
import time
import os
DB_PATH = os.path.expanduser("~/cdc_demo.db")
r = redis.Redis(decode_responses=True)
def publish_event(op: str, table: str, data: dict):
"""Publish a CDC event to Redis after committing to the DB."""
event = {"op": op, "table": table, "data": data}
r.publish("cdc_channel", json.dumps(event))
def app_insert(conn, name: str):
cursor = conn.execute(
"INSERT INTO users (name, last_updated) VALUES (?, CURRENT_TIMESTAMP)", (name,)
)
conn.commit()
publish_event("INSERT", "users", {"id": cursor.lastrowid, "name": name})
print(f" → DB INSERT: name='{name}' (id={cursor.lastrowid})")
def app_update(conn, user_id: int, new_name: str):
conn.execute(
"UPDATE users SET name=?, last_updated=CURRENT_TIMESTAMP WHERE id=?",
(new_name, user_id)
)
conn.commit()
publish_event("UPDATE", "users", {"id": user_id, "name": new_name})
print(f" → DB UPDATE: id={user_id} → name='{new_name}'")
def app_delete(conn, user_id: int):
conn.execute("DELETE FROM users WHERE id=?", (user_id,))
conn.commit()
publish_event("DELETE", "users", {"id": user_id})
print(f" → DB DELETE: id={user_id}")
conn = sqlite3.connect(DB_PATH)
time.sleep(0.5) # ensure consumer is subscribed
print("▶ Running application operations:")
app_insert(conn, "Diana")
time.sleep(0.2)
app_update(conn, 1, "Alice_final")
time.sleep(0.2)
app_delete(conn, 3) # Charlie — DELETE is now captured!
time.sleep(0.2)
app_update(conn, 1, "Alice_v3")
time.sleep(0.2)
app_insert(conn, "Eve")
conn.close()
consumer_thread.join(timeout=5)
print("\n📋 All received events:")
for e in received_events:
print(f" {e}")
▶ Running application operations:
→ DB INSERT: name='Diana' (id=4)
→ DB UPDATE: id=1 → name='Alice_final'
→ DB DELETE: id=3
→ DB UPDATE: id=1 → name='Alice_v3'
→ DB INSERT: name='Eve' (id=5)
📋 All received events:
{'op': 'INSERT', 'table': 'users', 'data': {'id': 4, 'name': 'Diana'}}
{'op': 'UPDATE', 'table': 'users', 'data': {'id': 1, 'name': 'Alice_final'}}
{'op': 'DELETE', 'table': 'users', 'data': {'id': 3}}
{'op': 'UPDATE', 'table': 'users', 'data': {'id': 1, 'name': 'Alice_v3'}}
{'op': 'INSERT', 'table': 'users', 'data': {'id': 5, 'name': 'Eve'}}
What We Gained¶
- Captures INSERT, UPDATE, DELETE — including hard deletes!
- Zero load on the DB from polling
- Real-time — microsecond latency
New Problems¶
| Problem | Explanation |
|---|---|
| App code must be modified | Every DB mutation needs a matching r.publish() call |
| Dual-write risk | DB commit succeeds but app crashes before publish() → event lost |
| Redis Pub/Sub is fire-and-forget | If consumer is down, events vanish |
| Not atomic | No guarantee DB change and event are always in sync |
Insight¶
We now capture what happened, but we're relying on the application to faithfully report it. Applications lie — they crash, have bugs, skip edge cases.
Lab 4: Redis as CDC Buffer (Hybrid Model)¶
Goal¶
Combine polling with a durable queue — decouple detection from processing and survive consumer downtime.
Idea¶
The poller still queries the DB, but instead of processing events inline it pushes them into a Redis list (a reliable queue). Consumers BRPOP from the queue at their own pace.
┌──────────────┐ SELECT WHERE ┌──────────────────┐
│ Poller │ last_updated>? │ SQLite DB │
│ (producer) │ ─────────────▶ │ │
└──────┬───────┘ └──────────────────┘
│ LPUSH cdc_queue
▼
┌──────────────┐
│ Redis List │ ← acts as a durable, ordered buffer
│ cdc_queue │
└──────┬───────┘
│ BRPOP (blocking pop)
▼
┌──────────────┐
│ Consumer │ ← can be offline, messages wait
└──────────────┘
# ── Step 1: Poller that pushes changes into a Redis list ──────────────
import redis
import sqlite3
import json
import time
import os
DB_PATH = os.path.expanduser("~/cdc_demo.db")
QUEUE = "cdc_queue"
r = redis.Redis(decode_responses=True)
# Clear queue from previous runs
r.delete(QUEUE)
def poller_to_queue(db_path, poll_interval=1, max_polls=5):
"""Poll DB for changes and push each row into a Redis queue."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
last_seen = "1970-01-01 00:00:00"
for poll_num in range(1, max_polls + 1):
rows = cursor.execute(
"SELECT * FROM users WHERE last_updated > ? ORDER BY last_updated",
(last_seen,)
).fetchall()
pushed = 0
for row in rows:
event = {"op": "UPSERT", "table": "users",
"data": {"id": row[0], "name": row[1], "last_updated": row[2]}}
r.lpush(QUEUE, json.dumps(event)) # push to LEFT (newest end)
last_seen = row[2]
pushed += 1
queue_len = r.llen(QUEUE)
print(f" [Poller #{poll_num}] Pushed {pushed} event(s) → Queue depth: {queue_len}")
time.sleep(poll_interval)
conn.close()
print(f"\n✅ Poller done. Final queue depth: {r.llen(QUEUE)}")
# Make a fresh change so the poller has something to detect
conn = sqlite3.connect(DB_PATH)
conn.execute("UPDATE users SET name='Alice_hybrid', last_updated=CURRENT_TIMESTAMP WHERE id=1")
conn.commit()
conn.close()
poller_to_queue(DB_PATH)
[Poller #1] Pushed 3 event(s) → Queue depth: 3 [Poller #2] Pushed 0 event(s) → Queue depth: 3 [Poller #3] Pushed 0 event(s) → Queue depth: 3 [Poller #4] Pushed 0 event(s) → Queue depth: 3 [Poller #5] Pushed 0 event(s) → Queue depth: 3 ✅ Poller done. Final queue depth: 3
# ── Step 2: Consumer — drains the queue (use BRPOP for real pipeline) ─
import redis, json
QUEUE = "cdc_queue"
r = redis.Redis(decode_responses=True)
def drain_queue(queue_name, timeout=2):
"""Process all events currently in the Redis queue."""
processed = 0
while True:
# BRPOP blocks for `timeout` seconds; returns None if empty
result = r.brpop(queue_name, timeout=timeout)
if result is None:
break
_, raw = result
event = json.loads(raw)
processed += 1
print(f" [Consumer] Processing event #{processed}: {event}")
print(f"\n✅ Queue drained. Processed {processed} event(s).")
print(f"Queue depth before draining: {r.llen(QUEUE)}")
drain_queue(QUEUE)
Queue depth before draining: 3
[Consumer] Processing event #1: {'op': 'UPSERT', 'table': 'users', 'data': {'id': 4, 'name': 'Diana', 'last_updated': '2026-04-07 09:45:10'}}
[Consumer] Processing event #2: {'op': 'UPSERT', 'table': 'users', 'data': {'id': 1, 'name': 'Alice_hybrid', 'last_updated': '2026-04-07 09:45:11'}}
[Consumer] Processing event #3: {'op': 'UPSERT', 'table': 'users', 'data': {'id': 5, 'name': 'Eve', 'last_updated': '2026-04-07 09:45:11'}}
✅ Queue drained. Processed 3 event(s).
What the Queue Brings¶
| Feature | Redis Pub/Sub | Redis Queue (LPUSH/BRPOP) |
|---|---|---|
| Consumer can be offline | ❌ Events lost | ✅ Events wait |
| Guaranteed delivery | ❌ | ✅ (until popped) |
| Fan-out | ✅ | ❌ Single consumer |
| Ordering | ✅ | ✅ FIFO (RPUSH/LPOP) |
Still Fundamentally Polling¶
- Misses DELETEs — the queue only contains what the
SELECT WHEREreturns. - Not truly real-time — bounded by poll interval.
Insight¶
The hybrid improves reliability (messages survive consumer crashes) but the detection mechanism is still the same flawed timestamp poll. We need to go deeper — into the database itself.
Lab 5: True CDC — Log-Based with Debezium¶
Goal¶
Understand how production-grade CDC works by reading the database's own transaction log — zero polling, zero DB load, every change captured.
Core Concept¶
Every major database keeps a Write-Ahead Log (WAL) or binary log for crash recovery:
| Database | Log Name | Purpose |
|---|---|---|
| MySQL | Binlog | Replication + crash recovery |
| PostgreSQL | WAL | Crash recovery |
| MongoDB | Oplog | Replication |
| SQL Server | Transaction Log | ACID guarantees |
Debezium taps into these logs as a log reader, not a query poller:
┌─────────────┐ SQL Mutation ┌───────────────────────┐
│ Application │ ──────────────▶ │ Source DB │
└─────────────┘ │ ┌─────────────────┐ │
│ │ Transaction Log│ │
│ │ (Binlog / WAL) │ │
│ └────────┬────────┘ │
└───────────│────────────┘
│ tail (no query!)
▼
┌───────────────────────┐
│ Debezium │
│ (Kafka Connect │
│ Source Connector) │
└───────────┬───────────┘
│ publish
▼
┌───────────────────────┐
│ Apache Kafka │
│ (Event Stream) │
└───────────┬───────────┘
│ consume
▼
┌────────────────────────┐
│ Consumers │
│ (Warehouse / Search / │
│ Cache / Microservice)│
└────────────────────────┘
Debezium Event Format¶
Every Debezium event carries a before and after snapshot of the row, plus operation metadata:
{
"op": "u",
"ts_ms": 1712345678901,
"source": {
"db": "testdb",
"table": "users",
"file": "mysql-bin.000003",
"pos": 4872
},
"before": { "id": 1, "name": "Alice", "last_updated": "..." },
"after": { "id": 1, "name": "Alice_updated", "last_updated": "..." }
}
op value |
Meaning |
|---|---|
c |
CREATE (INSERT) |
u |
UPDATE |
d |
DELETE |
r |
READ (initial snapshot) |
Docker Setup (Minimal — MySQL + Debezium)¶
# Full stack: MySQL + Zookeeper + Kafka + Debezium Connect
# Use the official Debezium tutorial docker-compose:
# https://github.com/debezium/debezium-examples/tree/main/tutorial
git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/tutorial
# Start all services
export DEBEZIUM_VERSION=2.5
docker-compose -f docker-compose-mysql.yaml up -d
# Register the MySQL connector
curl -i -X POST -H "Content-Type: application/json" \
http://localhost:8083/connectors/ \
-d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"table.include.list": "inventory.customers",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'
# Watch the Kafka topic for CDC events:
docker-compose -f docker-compose-mysql.yaml exec kafka \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--topic dbserver1.inventory.customers \
--from-beginning
# ── Simulate Debezium events in Python (no Docker required) ───────────
# In a real setup these events arrive from a Kafka topic.
# Here we construct them manually to study the format.
import json
from datetime import datetime, timezone
def simulate_debezium_event(op: str, table: str, before=None, after=None):
"""Construct a Debezium-style CDC event."""
return {
"op": op,
"ts_ms": int(datetime.now(timezone.utc).timestamp() * 1000),
"source": {
"db": "testdb",
"table": table,
"file": "mysql-bin.000003",
},
"before": before,
"after": after,
}
events = [
simulate_debezium_event("c", "users",
before=None,
after={"id": 10, "name": "Frank", "last_updated": "2024-04-07T10:00:00"}),
simulate_debezium_event("u", "users",
before={"id": 10, "name": "Frank", "last_updated": "2024-04-07T10:00:00"},
after= {"id": 10, "name": "Frank_updated", "last_updated": "2024-04-07T10:05:00"}),
simulate_debezium_event("d", "users",
before={"id": 10, "name": "Frank_updated", "last_updated": "2024-04-07T10:05:00"},
after=None),
]
LABELS = {"c": "CREATE", "u": "UPDATE", "d": "DELETE", "r": "READ"}
EMOJIS = {"c": "➕", "u": "✏️", "d": "🗑", "r": "📖"}
print("Simulated Debezium CDC events:")
print("═" * 60)
for i, ev in enumerate(events, 1):
emoji = EMOJIS.get(ev["op"], "❓")
label = LABELS.get(ev["op"], ev["op"])
print(f"\nEvent #{i}: {emoji} {label}")
print(f" table : {ev['source']['table']}")
print(f" before : {ev['before']}")
print(f" after : {ev['after']}")
print(f" log pos: {ev['source']['file']}")
Simulated Debezium CDC events:
════════════════════════════════════════════════════════════
Event #1: ➕ CREATE
table : users
before : None
after : {'id': 10, 'name': 'Frank', 'last_updated': '2024-04-07T10:00:00'}
log pos: mysql-bin.000003
Event #2: ✏️ UPDATE
table : users
before : {'id': 10, 'name': 'Frank', 'last_updated': '2024-04-07T10:00:00'}
after : {'id': 10, 'name': 'Frank_updated', 'last_updated': '2024-04-07T10:05:00'}
log pos: mysql-bin.000003
Event #3: 🗑 DELETE
table : users
before : {'id': 10, 'name': 'Frank_updated', 'last_updated': '2024-04-07T10:05:00'}
after : None
log pos: mysql-bin.000003
Why Debezium Wins¶
| Property | Polling | Redis Pub/Sub | Debezium |
|---|---|---|---|
| Captures INSERT | ✅ | ✅ | ✅ |
| Captures UPDATE | ✅ | ✅ | ✅ |
| Captures DELETE | ❌ | ✅ (if app publishes) | ✅ |
| Before-image | ❌ | ❌ | ✅ |
| Zero DB query load | ❌ | ✅ | ✅ |
| App code changes | ❌ needed | ✅ needed | ❌ not needed |
| Exact order preserved | ❌ | Partial | ✅ (log position) |
| Intermediary states | ❌ | Partial | ✅ |
Key Insight¶
Debezium doesn't ask the database "what changed?" — it reads the commit journal the database was already writing for its own crash recovery. This is called tailing the log.
import sqlite3
import json
import os
from datetime import datetime, timezone
# ── Target warehouse (separate DB) ───────────────────────────────────
WH_PATH = os.path.expanduser("~/warehouse.db")
wh_conn = sqlite3.connect(WH_PATH)
wh_conn.executescript("""
DROP TABLE IF EXISTS users;
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT,
last_updated TEXT
);
""")
wh_conn.commit()
print("✅ Warehouse DB initialised.")
# ── Event applicator ─────────────────────────────────────────────────
def apply_event(conn, event: dict):
"""
Apply a single Debezium-style CDC event to the warehouse table.
op codes: 'c' = insert, 'u' = update, 'd' = delete, 'r' = read/snapshot
"""
op = event["op"]
if op in ("c", "r"): # CREATE or snapshot READ
row = event["after"]
conn.execute(
"INSERT OR REPLACE INTO users (id, name, last_updated) VALUES (?,?,?)",
(row["id"], row["name"], row.get("last_updated"))
)
return f"INSERT id={row['id']} name='{row['name']}'"
elif op == "u": # UPDATE
row = event["after"]
conn.execute(
"UPDATE users SET name=?, last_updated=? WHERE id=?",
(row["name"], row.get("last_updated"), row["id"])
)
return f"UPDATE id={row['id']} → name='{row['name']}'"
elif op == "d": # DELETE
row = event["before"]
conn.execute("DELETE FROM users WHERE id=?", (row["id"],))
return f"DELETE id={row['id']}"
else:
return f"UNKNOWN op={op}"
# ── Replay a stream of events ─────────────────────────────────────────
def simulate_debezium_event(op, before=None, after=None):
return {"op": op,
"ts_ms": int(datetime.now(timezone.utc).timestamp() * 1000),
"before": before, "after": after}
event_stream = [
simulate_debezium_event("c", after={"id": 1, "name": "Alice", "last_updated": "2024-04-07T08:00"}),
simulate_debezium_event("c", after={"id": 2, "name": "Bob", "last_updated": "2024-04-07T08:01"}),
simulate_debezium_event("c", after={"id": 3, "name": "Charlie", "last_updated": "2024-04-07T08:02"}),
simulate_debezium_event("u",
before={"id": 1, "name": "Alice", "last_updated": "2024-04-07T08:00"},
after= {"id": 1, "name": "Alice_updated", "last_updated": "2024-04-07T09:00"}),
simulate_debezium_event("d",
before={"id": 2, "name": "Bob", "last_updated": "2024-04-07T08:01"}),
simulate_debezium_event("c", after={"id": 4, "name": "Diana", "last_updated": "2024-04-07T10:00"}),
simulate_debezium_event("u",
before={"id": 1, "name": "Alice_updated", "last_updated": "2024-04-07T09:00"},
after= {"id": 1, "name": "Alice_final", "last_updated": "2024-04-07T11:00"}),
]
EMOJIS = {"c": "➕", "u": "✏️", "d": "🗑", "r": "📖"}
print("▶ Applying CDC event stream to warehouse:")
print("─" * 55)
for i, event in enumerate(event_stream, 1):
emoji = EMOJIS.get(event["op"], "❓")
result = apply_event(wh_conn, event)
wh_conn.commit()
print(f" [{i:>2}] {emoji} {result}")
# ── Final warehouse state ─────────────────────────────────────────────
print("\n📊 Final warehouse state:")
print(f" {'id':<5} {'name':<20} {'last_updated'}")
print(" " + "─" * 50)
for row in wh_conn.execute("SELECT * FROM users ORDER BY id").fetchall():
print(f" {row[0]:<5} {row[1]:<20} {row[2]}")
wh_conn.close()
✅ Warehouse DB initialised. ▶ Applying CDC event stream to warehouse: ─────────────────────────────────────────────────────── [ 1] ➕ INSERT id=1 name='Alice' [ 2] ➕ INSERT id=2 name='Bob' [ 3] ➕ INSERT id=3 name='Charlie' [ 4] ✏️ UPDATE id=1 → name='Alice_updated' [ 5] 🗑 DELETE id=2 [ 6] ➕ INSERT id=4 name='Diana' [ 7] ✏️ UPDATE id=1 → name='Alice_final' 📊 Final warehouse state: id name last_updated ────────────────────────────────────────────────── 1 Alice_final 2024-04-07T11:00 3 Charlie 2024-04-07T08:02 4 Diana 2024-04-07T10:00
CDC = State Reconstruction from Events¶
Notice what just happened:
- We started with an empty warehouse.
- We replayed a sequence of CDC events.
- The warehouse now holds the exact final state of the source database.
This is the foundation of event sourcing — the log is the truth; the database state is just a materialized view of it.
If you save the event log to Kafka forever, you can replay it at any point in time to reconstruct any historical state of the database.
🧭 Final Conceptual Comparison¶
| Approach | Captures Deletes | Before-Image | Load on DB | Real-time | App Changes Needed | Complexity |
|---|---|---|---|---|---|---|
| Polling | ❌ | ❌ | High | Medium | ❌ | Low |
| Cron Batch | ❌ | ❌ | High | Low | ❌ | Low |
| Redis Pub/Sub | ✅ | ❌ | None | High | ✅ | Medium |
| Redis Queue | ✅ (manual) | ❌ | Medium | Medium | ✅ | Medium |
| Debezium (Log) | ✅ | ✅ | None | High | ❌ | High |
The Progressive Story¶
Lab 1 → Polling: Simple but blind to deletes and slow
↓
Lab 2 → Cron: Batch tradeoff — latency vs cost
↓
Lab 3 → Redis Pub/Sub: Real-time events but app must publish
↓
Lab 4 → Redis Queue: Survives consumer downtime, still polling
↓
Lab 5 → Debezium: Database log as the single source of truth
↓
Lab 6 → Warehouse: State reconstruction from events
Closing Insight¶
"Polling tells you what changed. Logs tell you what happened."
The transaction log is the only place where every mutation — including deletes, intermediate states, and exact ordering — is faithfully recorded. Debezium simply makes that log a first-class streaming data source.