Change data capture Pipeline using Debezium

Change data capture Pipeline using Debezium

December 11, 2024

Real-time data replication and indexing are critical for building responsive and scalable systems. Change Data Capture (CDC) is a technique for tracking changes in a database and propagating them to downstream systems in near-real time.

In this blog, we’ll walk through setting up a CDC pipeline using Docker Compose. Our pipeline will sync changes from a PostgreSQL database to Elasticsearch and visualize them in Kibana.

Architecture Overview

flowchart TD
    id_postgres[(PostgreSQL)] -->|Change Events| id_debezium_postgres_connector[Debezium postgres connector]
    
    id_debezium_postgres_connector -->|CDC Events| id_kafka[Kafka]

    id_kafka[Kafka] --> id_debezium_snowflake_sink_connector
    id_kafka[Kafka] --> id_debezium_elasticsearch_sink_connector
    id_kafka[Kafka] --> id_debezium_s3_sink_connector
    
    id_debezium_snowflake_sink_connector[Debezium Snowfkale Sink] -->|Propagates Data| id_snowflake[Snowflake]
    id_debezium_elasticsearch_sink_connector[Debezium Elasticsearch Sink] -->|Propagates Data| id_elasticsearch[Elasticsearch]
    id_elasticsearch -->|Data Indexed| id_kibana[Kibana]
    id_debezium_s3_sink_connector[Debezium S3 Sink] -->|Propagates Data| S3[S3] --> id_continue_from_s3[...]
    id_snowflake --> id_continue_from_snowflake[...]
    
    
Debezium source connector
A Debezium source connector is a tool that extracts changes from a database and sends them to Apache Kafka Connect.
Debezium sink connector
A Debezium sink connector is a tool that receives events from sources like Apache Kafka topics, standardizes the data format, and then saves the event data to a configured sink repository.

We will follow the middle part of the architecture. i.e., create a pipeline that takes in changes from postgres and push them to elasticsearch and finally verify the data in Kibana

Setting up the environment

Standing up the components using docker compose

We will primary rely on docker compose to bring up the entire stack mentioned above

docker-compose.yml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
services:
  postgres:
    image: postgres:15
    container_name: postgres
    environment:
      POSTGRES_USER: cdc_user
      POSTGRES_PASSWORD: cdc_pass
      POSTGRES_DB: cdc_db
    command: postgres -c wal_level=logical -c max_replication_slots=5 -c max_wal_senders=5      
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_BROKER_ID: 1
    ports:
      - "9092:9092"

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  debezium:
    image: debezium/connect:2.3
    container_name: debezium
    depends_on:
      - kafka
      - postgres
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: debezium-connect-configs
      OFFSET_STORAGE_TOPIC: debezium-connect-offsets
      STATUS_STORAGE_TOPIC: debezium-connect-statuses
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONFIG_STORAGE_REPLICATION_FACTOR: 1
      STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/plugins"      
    ports:
      - "8083:8083"
    volumes:
      - ./plugins:/etc/kafka-connect/plugins

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.16.1
    container_name: elasticsearch
    environment:
      discovery.type: single-node
      xpack.security.enabled: false      
    ports:
      - "9200:9200"

  kibana:
    image: docker.elastic.co/kibana/kibana:8.16.1
    container_name: kibana
    depends_on:
      - elasticsearch
    ports:
      - "5601:5601"

volumes:
  postgres_data:

In the above docker-compose file, we need to specify few critical things

  • Lines 6-8 specify the user we would be using for debezium source connector
  • Line 56 specifies the path where debezium would look for plugins in order to connect to external systems. These external systems could be source systems (postgres, mariadb, etc.) or sink systems (snowflake, s3, elasticsearch, etc.)
  • Line 60 is used to mount the plugins so that it can be used by debezium

We can download the plugins required from and store them after extracting under plugins directory Once we bring up the setup using compose, the below command should show that we have both - postgres and elasticsearch connectors available

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ curl -X GET http://localhost:8083/connector-plugins -s | python -m json.tool

[
    {
        "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type": "sink",
        "version": "14.1.2"
    },
    {
        "class": "io.debezium.connector.postgresql.PostgresConnector",
        "type": "source",
        "version": "2.5.4.Final"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "3.4.0"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "3.4.0"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "3.4.0"
    }
]

Setting up the pipeline

We would also use the below script to bootstrap few things as part of the setup. This inclues

Creating tables

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ python app.py create-table
Table 'employees' created successfully.

Populating data (optional)

We can insert data after setting up source and sink connectors as well.

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ python app.py insert-data                          
Inserted: 1, Alice, Engineer
Inserted: 2, Bob, Analyst
Inserted: 3, Charlie, Manager

Setting up source connector

Once we setup the connector, debezium starts to track any changes and push them to Kafka connect. This happens using replication slots in postgres and differes based on source systems.

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ python app.py register-connector              
Connector registered successfully.

Verify that the connector has been registered

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ curl http://localhost:8083/connectors/postgres-connector/status -s | python -m json.tool
{
    "name": "postgres-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "172.19.0.7:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "172.19.0.7:8083"
        }
    ],
    "type": "source"
}

Setting up sink connector

Finally, we setup the sink connector. This tells debezium to pull records from kafka and push them to elasticsearch. This is essentially a Kafka consumer.

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ python app.py register-sink-connector
Elasticsearch Sink Connector registered successfully.
app.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import psycopg2
import typer
import requests
import time
import copy

from psycopg2 import sql

# Generate a random number

import random
random.seed(random.randint(1, 100))

app = typer.Typer()

DEBEZIUM_URL = "http://localhost:8083/connectors"
CONNECTOR_NAME = "postgres-connector"

@app.command()
def create_table():
    """Create the employees table if it does not exist."""
    try:
        config = {
            "dbname": "cdc_db",
            "user": "cdc_user",
            "password": "cdc_pass",
            "host": "localhost",
            "port": 5432,
        }        
        conn = psycopg2.connect(**config)
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS employees (
                id SERIAL PRIMARY KEY,
                name VARCHAR(100),
                role VARCHAR(50),
                created_at TIMESTAMP DEFAULT NOW()
            )
        """)
        cursor.execute("ALTER TABLE public.employees REPLICA IDENTITY FULL;")
        conn.commit()
        typer.echo("Table 'employees' created successfully.")
    except Exception as e:
        typer.echo(f"Error creating table: {e}")
    finally:
        cursor.close()
        conn.close()


@app.command()
def insert_data():
    """Insert sample data into the employees table."""
    try:
        config = {
            "dbname": "cdc_db",
            "user": "cdc_user",
            "password": "cdc_pass",
            "host": "localhost",
            "port": 5432,
        }     
        conn = psycopg2.connect(**config)
        cursor = conn.cursor()
        employees = [(1, "Alice", "Engineer"), (2, "Bob", "Analyst"), (3, "Charlie", "Manager")]
        for id, name, role in employees:
            cursor.execute("INSERT INTO employees (id, name, role) VALUES (%s, %s, %s)", (id, name, role))
            conn.commit()
            typer.echo(f"Inserted: {id}, {name}, {role}")
            time.sleep(2)
    except Exception as e:
        typer.echo(f"Error inserting data: {e}")
    finally:
        cursor.close()
        conn.close()

@app.command()
def update_data():
    config = {
        "dbname": "cdc_db",
        "user": "cdc_user",
        "password": "cdc_pass",
        "host": "localhost",
        "port": 5432,
    }    
    """
    Updates the roles of Alice, Bob, and Charlie in the 'employees' table.
    Assumes the 'employees' table has 'name' and 'role' columns.
    """

    # Define the updates for each user

    updates = {
        '1': f"Admin {random.randint(1, 100)}",
        '2': f"Manager {random.randint(1, 100)}",
        '3': f"Developer {random.randint(1, 100)}", 
    }

    try:
        # Connect to PostgreSQL database
        conn = psycopg2.connect(**config)
        cursor = conn.cursor()

        # Loop through the updates and apply each one
        for name, new_role in updates.items():
            update_query = sql.SQL("UPDATE employees SET role = %s WHERE id = %s;")
            cursor.execute(update_query, (new_role, name))
            print(f"Updated role for {name} to {new_role}.")

        # Commit the changes to the database
        conn.commit()

    except Exception as e:
        print(f"Error: {e}")
    finally:
        # Close the cursor and connection
        if conn:
            cursor.close()
            conn.close()    

@app.command()
def drop_connector(connector_name: str = CONNECTOR_NAME):
    """
    Drop (delete) the Debezium connector by its name.
    Args:
        connector_name (str): Name of the connector to drop (default: postgres-connector).
    """
    try:
        response = requests.delete(f"{DEBEZIUM_URL}/{connector_name}")
        if response.status_code == 204:
            typer.echo(f"Connector '{connector_name}' deleted successfully.")
        elif response.status_code == 404:
            typer.echo(f"Connector '{connector_name}' not found.")
        else:
            typer.echo(f"Failed to delete connector: {response.json()}")
    except Exception as e:
        typer.echo(f"Error deleting connector: {e}")

@app.command()
def register_connector():
    config = {
        "dbname": "cdc_db",
        "user": "cdc_user",
        "password": "cdc_pass",
        "host": "postgres",
        "port": 5432,
    }
    """Register the Debezium PostgreSQL connector."""
    config = {
        "name": CONNECTOR_NAME,
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "tasks.max": "1",
            "database.hostname": config["host"],
            "database.port": config["port"],
            "database.user": config["user"],
            "database.password": config["password"],
            "database.dbname": config["dbname"],
            "database.server.name": "postgres_server",
            "table.include.list": "public.employees",
            "slot.name": "debezium_slot",
            "topic.prefix": "cdc",
            "plugin.name": "pgoutput",
            "slot.drop.on.stop": "true",
        }
    }
    try:
        response = requests.post(DEBEZIUM_URL, json=config)
        if response.status_code == 201:
            typer.echo("Connector registered successfully.")
        else:
            typer.echo(f"Failed to register connector: {response.json()}")
    except Exception as e:
        typer.echo(f"Error registering connector: {e}")

@app.command()
def register_sink_connector():
    """Register the Kafka Elasticsearch Sink Connector."""
    connector_config = {
        "name": "elasticsearch-sink",  # Unique name for the connector
        "config": {
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "tasks.max": "1",
            "topics": "cdc.public.employees",  # Kafka topic to consume from
            "connection.url": "http://elasticsearch:9200",  # Elasticsearch endpoint
            "type.name": "_doc",
            "key.ignore": "true",  # Ignore Kafka record key
            "schema.ignore": "true",  # Ignore schemas in messages
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "true",  # Simplify JSON message parsing
            "topic.index.map": "cdc.public.employees:cdc.public.employees"  # Topic to index mapping
        }
    }
    response = requests.post(f"{DEBEZIUM_URL}", json=connector_config)
    if response.status_code == 201:
        print("Elasticsearch Sink Connector registered successfully.")
    else:
        print(f"Failed to register connector: {response.json()}")

@app.command()
def drop_sink_connector(connector_name="elasticsearch-sink", connect_rest_url="http://localhost:8083"):
    """
    Drop (delete) a sink connector from Kafka Connect.

    :param connector_name: Name of the sink connector to be deleted.
    :param connect_rest_url: URL of the Kafka Connect REST API (default is http://localhost:8083).
    :return: Response object or error message.
    """
    # Construct the URL to delete the connector
    url = f"{connect_rest_url}/connectors/{connector_name}"

    # Send the DELETE request
    try:
        response = requests.delete(url)

        # Check for successful deletion
        if response.status_code == 204:
            print(f"Connector '{connector_name}' successfully deleted.")
        else:
            print(f"Failed to delete connector '{connector_name}'. Status code: {response.status_code}")
            print("Response:", response.text)
    except requests.exceptions.RequestException as e:
        print(f"Error while trying to delete the connector: {e}")

if __name__ == "__main__":
    app()

Here, we need to note a few things

  • Line 40 specifies REPLICA IDENTITY FULL for the employees table once the table is created. This is needed when we want to capture full diff of the rows - irrespective of whether the primary key was changed or not.

Testing the pipeline

Now that we have our entire pipeline ready, we can start making some changes in the postgres database and see them follow through the other side in elasticsearch

Verify initial data

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ curl --location 'http://localhost:9200/cdc.public.employees/_count' -s | python -m json.tool
{
    "count": 3,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    }
}

This shows the index count as 3. Matching our initual records that we added during the insert-data stage of the pipeline setup

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ curl --location 'http://localhost:9200/cdc.public.employees/_search' -s | python -m json.tool
{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 3,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "cdc.public.employees",
                "_id": "cdc.public.employees+0+1",
                "_score": 1.0,
                "_source": {
                    "before": null,
                    "after": {
                        "id": 2,
                        "name": "Bob",
                        "role": "Analyst",
                        "created_at": 1733947180335591
                    },
                    "source": {
                        "version": "2.5.4.Final",
                        "connector": "postgresql",
                        "name": "cdc",
                        "ts_ms": 1733947265959,
                        "snapshot": "true",
                        "db": "cdc_db",
                        "sequence": "[null,\"26785888\"]",
                        "schema": "public",
                        "table": "employees",
                        "txId": 742,
                        "lsn": 26785888,
                        "xmin": null
                    },
                    "op": "r",
                    "ts_ms": 1733947266036,
                    "transaction": null
                }
            },
            {
                "_index": "cdc.public.employees",
                "_id": "cdc.public.employees+0+2",
                "_score": 1.0,
                "_source": {
                    "before": null,
                    "after": {
                        "id": 3,
                        "name": "Charlie",
                        "role": "Manager",
                        "created_at": 1733947182347031
                    },
                    "source": {
                        "version": "2.5.4.Final",
                        "connector": "postgresql",
                        "name": "cdc",
                        "ts_ms": 1733947265959,
                        "snapshot": "last",
                        "db": "cdc_db",
                        "sequence": "[null,\"26785888\"]",
                        "schema": "public",
                        "table": "employees",
                        "txId": 742,
                        "lsn": 26785888,
                        "xmin": null
                    },
                    "op": "r",
                    "ts_ms": 1733947266036,
                    "transaction": null
                }
            },
            {
                "_index": "cdc.public.employees",
                "_id": "cdc.public.employees+0+0",
                "_score": 1.0,
                "_source": {
                    "before": null,
                    "after": {
                        "id": 1,
                        "name": "Alice",
                        "role": "Engineer",
                        "created_at": 1733947178324531
                    },
                    "source": {
                        "version": "2.5.4.Final",
                        "connector": "postgresql",
                        "name": "cdc",
                        "ts_ms": 1733947265959,
                        "snapshot": "first",
                        "db": "cdc_db",
                        "sequence": "[null,\"26785888\"]",
                        "schema": "public",
                        "table": "employees",
                        "txId": 742,
                        "lsn": 26785888,
                        "xmin": null
                    },
                    "op": "r",
                    "ts_ms": 1733947266034,
                    "transaction": null
                }
            }
        ]
    }
}

Here, the before field indicates that there was no data before this change and after this change, the data had id, name and role fields.

Send some updates

Here, we generate a random number and update the role of each of the 3 employees

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ python app.py update-data
Updated role for 1 to Admin 82.
Updated role for 2 to Manager 54.
Updated role for 3 to Developer 55.

Verify changes replicated

Finally, we check the elasticsearch index count to see 3 new documents

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ curl --location 'http://localhost:9200/cdc.public.employees/_count' -s | python -m json.tool
{
    "count": 6,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    }
}
(venv) ➜  cdc-pipeline-using-debezium git:(main)

We could also see below that this time, the before field is not null for the new 3 entries. It shows how each row changed as part of the update

(venv) ➜  cdc-pipeline-using-debezium git:(main) ✗ curl --location 'http://localhost:9200/cdc.public.employees/_search' -s | python -m json.tool
{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 6,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "cdc.public.employees",
                "_id": "cdc.public.employees+0+1",
                "_score": 1.0,
                "_source": {
                    "before": null,
                    "after": {
                        "id": 2,
                        "name": "Bob",
                        "role": "Analyst",
                        "created_at": 1733947180335591
                    },
                    "source": {
                        "version": "2.5.4.Final",
                        "connector": "postgresql",
                        "name": "cdc",
                        "ts_ms": 1733947265959,
                        "snapshot": "true",
                        "db": "cdc_db",
                        "sequence": "[null,\"26785888\"]",
                        "schema": "public",
                        "table": "employees",
                        "txId": 742,
                        "lsn": 26785888,
                        "xmin": null
                    },
                    "op": "r",
                    "ts_ms": 1733947266036,
                    "transaction": null
                }
            },
            {
                "_index": "cdc.public.employees",
                "_id": "cdc.public.employees+0+2",
                "_score": 1.0,
                "_source": {
                    "before": null,
                    "after": {
                        "id": 3,
                        "name": "Charlie",
                        "role": "Manager",
                        "created_at": 1733947182347031
                    },
                    "source": {
                        "version": "2.5.4.Final",
                        "connector": "postgresql",
                        "name": "cdc",
                        "ts_ms": 1733947265959,
                        "snapshot": "last",
                        "db": "cdc_db",
                        "sequence": "[null,\"26785888\"]",
                        "schema": "public",
                        "table": "employees",
                        "txId": 742,
                        "lsn": 26785888,
                        "xmin": null
                    },
                    "op": "r",
                    "ts_ms": 1733947266036,
                    "transaction": null
                }
            },
            {
                "_index": "cdc.public.employees",
                "_id": "cdc.public.employees+0+0",
                "_score": 1.0,
                "_source": {
                    "before": null,
                    "after": {
                        "id": 1,
                        "name": "Alice",
                        "role": "Engineer",
                        "created_at": 1733947178324531
                    },
                    "source": {
                        "version": "2.5.4.Final",
                        "connector": "postgresql",
                        "name": "cdc",
                        "ts_ms": 1733947265959,
                        "snapshot": "first",
                        "db": "cdc_db",
                        "sequence": "[null,\"26785888\"]",
                        "schema": "public",
                        "table": "employees",
                        "txId": 742,
                        "lsn": 26785888,
                        "xmin": null
                    },
                    "op": "r",
                    "ts_ms": 1733947266034,
                    "transaction": null
                }
            },
            {
                "_index": "cdc.public.employees",
                "_id": "cdc.public.employees+0+4",
                "_score": 1.0,
                "_source": {
                    "before": {
                        "id": 2,
                        "name": "Bob",
                        "role": "Analyst",
                        "created_at": 1733947180335591
                    },
                    "after": {
                        "id": 2,
                        "name": "Bob",
                        "role": "Manager 54",
                        "created_at": 1733947180335591
                    },
                    "source": {
                        "version": "2.5.4.Final",
                        "connector": "postgresql",
                        "name": "cdc",
                        "ts_ms": 1733948856640,
                        "snapshot": "false",
                        "db": "cdc_db",
                        "sequence": "[null,\"26786616\"]",
                        "schema": "public",
                        "table": "employees",
                        "txId": 743,
                        "lsn": 26786616,
                        "xmin": null
                    },
                    "op": "u",
                    "ts_ms": 1733948856896,
                    "transaction": null
                }
            },
            {
                "_index": "cdc.public.employees",
                "_id": "cdc.public.employees+0+5",
                "_score": 1.0,
                "_source": {
                    "before": {
                        "id": 3,
                        "name": "Charlie",
                        "role": "Manager",
                        "created_at": 1733947182347031
                    },
                    "after": {
                        "id": 3,
                        "name": "Charlie",
                        "role": "Developer 55",
                        "created_at": 1733947182347031
                    },
                    "source": {
                        "version": "2.5.4.Final",
                        "connector": "postgresql",
                        "name": "cdc",
                        "ts_ms": 1733948856640,
                        "snapshot": "false",
                        "db": "cdc_db",
                        "sequence": "[null,\"26786744\"]",
                        "schema": "public",
                        "table": "employees",
                        "txId": 743,
                        "lsn": 26786744,
                        "xmin": null
                    },
                    "op": "u",
                    "ts_ms": 1733948856896,
                    "transaction": null
                }
            },
            {
                "_index": "cdc.public.employees",
                "_id": "cdc.public.employees+0+3",
                "_score": 1.0,
                "_source": {
                    "before": {
                        "id": 1,
                        "name": "Alice",
                        "role": "Engineer",
                        "created_at": 1733947178324531
                    },
                    "after": {
                        "id": 1,
                        "name": "Alice",
                        "role": "Admin 82",
                        "created_at": 1733947178324531
                    },
                    "source": {
                        "version": "2.5.4.Final",
                        "connector": "postgresql",
                        "name": "cdc",
                        "ts_ms": 1733948856640,
                        "snapshot": "false",
                        "db": "cdc_db",
                        "sequence": "[null,\"26786216\"]",
                        "schema": "public",
                        "table": "employees",
                        "txId": 743,
                        "lsn": 26786216,
                        "xmin": null
                    },
                    "op": "u",
                    "ts_ms": 1733948856895,
                    "transaction": null
                }
            }
        ]
    }
}

Lookup data in Kibana (optional)

Finally, we can open up Kibana and lookup for an index with name cdc.public.employees and find our data Lookup data in Kibana

Conclusion

Key Takeaways

  • CDC pipelines provide real-time synchronization between databases and downstream systems.
  • Debezium and Kafka make it easy to implement CDC pipelines.
  • Elasticsearch and Kibana are powerful tools for indexing and visualization.
Last updated on