• Что бы вступить в ряды "Принятый кодер" Вам нужно:
    Написать 10 полезных сообщений или тем и Получить 10 симпатий.
    Для того кто не хочет терять время,может пожертвовать средства для поддержки сервеса, и вступить в ряды VIP на месяц, дополнительная информация в лс.

  • Пользаватели которые будут спамить, уходят в бан без предупреждения. Спам сообщения определяется администрацией и модератором.

  • Гость, Что бы Вы хотели увидеть на нашем Форуме? Изложить свои идеи и пожелания по улучшению форума Вы можете поделиться с нами здесь. ----> Перейдите сюда
  • Все пользователи не прошедшие проверку электронной почты будут заблокированы. Все вопросы с разблокировкой обращайтесь по адресу электронной почте : info@guardianelinks.com . Не пришло сообщение о проверке или о сбросе также сообщите нам.

Standing up Debezium Server for Postgres CDC

Lomanu4 Оффлайн

Lomanu4

Команда форума
Администратор
Регистрация
1 Мар 2015
Сообщения
1,481
Баллы
155
In this tutorial, you'll set up a complete CDC pipeline using Debezium Server (version 3.1) to capture changes from a PostgreSQL database and stream them directly to a webhook endpoint. Unlike the Kafka Connect runtime, Debezium Server provides a lightweight standalone application that can send change events directly to various destinations without requiring a Kafka cluster. Keep in mind that your trading out the overhead of Kafka for less scale, performance, and delivery guarantees.

What you'll learn:

  • How to configure PostgreSQL for logical replication
  • How to set up Debezium Server to capture database changes
  • How to stream these changes directly to a webhook endpoint
  • How to observe and work with CDC events from your database
The Architecture


Here's what you're building:

  1. A PostgreSQL database with logical replication enabled
  2. Debezium Server running as a standalone application
  3. A webhook endpoint (using webhook.site) that receives the change events
  4. A simple "customers" table that you'll monitor for changes

When you're done, any change to the customers table will be captured by Debezium Server and sent as a JSON event to your webhook endpoint in real-time. This setup provides a foundation for building event-driven architectures without the complexity of managing a Kafka cluster.

Step 1: Configure PostgreSQL for logical replication


Ensure logical replication is enabled on your Postgres database:


psql -U postgres -c "SHOW wal_level;"

You should see logical in the output. If not, run the following commands to enable logical replication:


# Connect to PostgreSQL as the postgres user to modify system settings
sudo -u postgres psql -c "ALTER SYSTEM SET wal_level = logical;"
sudo -u postgres psql -c "ALTER SYSTEM SET max_replication_slots = 10;"
sudo -u postgres psql -c "ALTER SYSTEM SET max_wal_senders = 10;"

# Restart PostgreSQL to apply changes
# For Linux (systemd):
sudo systemctl restart postgresql

# For macOS (Homebrew):
brew services restart postgresql

These commands:

  • Set the Write-Ahead Log (WAL) level to "logical", enabling detailed change tracking
  • Configure replication slots to allow Debezium to track its position
  • Increase the number of WAL sender processes that can run simultaneously
Step 2: Create a database user and sample data


Debezium requires a PostgreSQL user with replication privileges and a table to monitor.


# Create a dedicated user for Debezium
psql -U postgres -c "CREATE ROLE dbz WITH LOGIN PASSWORD 'dbz' REPLICATION;"

# Create a sample database
createdb -U postgres inventory

# Create a sample table
psql -d inventory -U postgres <<'SQL'
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Set REPLICA IDENTITY to FULL to capture old values on updates and deletes
ALTER TABLE customers REPLICA IDENTITY FULL;

-- Grant necessary permissions to the dbz user
GRANT ALL PRIVILEGES ON DATABASE inventory TO dbz;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO dbz;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO dbz;
SQL

The REPLICA IDENTITY FULL setting ensures that both old and new values are captured for updates and deletes, which is crucial for comprehensive change tracking.

Step 3: Download and set up Debezium Server


Debezium Server is a standalone application that connects to PostgreSQL and forwards change events to various sinks. Let's download and extract it:


# Download Debezium Server
curl -L -o debezium-server.zip

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



# Extract the archive
unzip debezium-server.zip

# Navigate to the Debezium Server directory
cd debezium-server

You'll now have a directory structure containing:

  • run.sh - The script to start Debezium Server
  • lib/ - JAR files for Debezium and its dependencies
  • config/ - Configuration directory
Step 4: Create a webhook endpoint


To make it easy to standup and test Debezium Server, use webhook.site to quickly test webhook delivery:

  1. Open

    Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

    in your browser
  2. A unique URL will be automatically generated for you (it looks like

    Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

    )
  3. Copy this URL - you’ll use it in the Debezium configuration
  4. Keep this browser tab open to see events as they arrive
Step 5: Configure Debezium Server


Create or modify the application.properties file in the config/ directory to tell Debezium where to connect and where to send events:


# Create or modify the configuration file
# PostgreSQL source connector configuration
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=dbz
debezium.source.database.password=dbz
debezium.source.database.dbname=inventory
debezium.source.database.server.name=inventory-server
debezium.source.schema.include.list=public
debezium.source.table.include.list=public.customers

# Set a unique replication slot name to avoid conflicts
debezium.source.slot.name=debezium_tutorial

# Topic prefix configuration
debezium.source.topic.prefix=inventory-server

# For capturing all changes including updates and deletes in full
debezium.source.tombstones.on.delete=false

# Initial snapshot configuration
debezium.source.snapshot.mode=initial

# HTTP sink (webhook) configuration
debezium.sink.type=http
debezium.sink.http.url=YOUR_WEBHOOK.SITE_URL_HERE
debezium.sink.http.timeout.ms=10000

# JSON formatter
debezium.format.value=json
debezium.format.key=json

Replace YOUR_WEBHOOK_URL_HERE with the URL you copied from webhook.site.

Step 6: Start Debezium Server


Now that everything is configured, start Debezium Server:


# Make the run script executable
chmod +x run.sh

# Start Debezium Server
./run.sh

Prepare for a wall of Java logs. You should see output indicating that Debezium Server is starting, with messages about Quarkus, the connector, and eventually reaching a "started" state.

Common startup messages include:

  • Quarkus initialization
  • PostgreSQL connector configuration
  • Connection to the database
  • Snapshot process (if this is the first run)
  • HTTP sink initialization
Step 7: Test the setup with database changes


Open a new terminal (keep Debezium Server running in the first one) and execute some changes to the customers table:


# Insert some test data
psql -d inventory -U postgres <<'SQL'
INSERT INTO customers (name, email) VALUES
('Alice Johnson', 'alice@example.com'),
('Bob Smith', 'bob@example.com');
SQL

# Update a record
psql -d inventory -U postgres -c "UPDATE customers SET email = 'alice.new@example.com' WHERE name = 'Alice Johnson';"

# Delete a record
psql -d inventory -U postgres -c "DELETE FROM customers WHERE name = 'Bob Smith';"
Step 8: Observe the results


Switch back to your webhook.site browser tab. You should see several POST requests that correspond to the database operations you just performed:

  1. Two insert events (one for each new customer)
  2. An update event (when you changed Alice's email)
  3. A delete event (when you removed Bob's record)

Each event contains a JSON payload with details about the operation and the data. For example, an insert event might look like this:


{
"schema": { /* schema information */ },
"payload": {
"before": null,
"after": {
"id": 1,
"name": "Alice Johnson",
"email": "alice@example.com",
"created_at": "2023-05-06T10:15:30.123456Z"
},
"source": {
/* metadata about the event source */
"db": "inventory",
"table": "public.customers",
"operation": "c",
"ts_ms": 1683367230123
},
"op": "c",
"ts_ms": 1683367230456
}
}

The op field indicates the operation type:

  • c for create (insert)
  • u for update
  • d for delete
  • r for read (during initial snapshot)

For updates, both before and after fields are populated, showing the previous and new values.

How it works


Let's understand what's happening under the hood:


  1. PostgreSQL logical replication: The WAL settings enable PostgreSQL to maintain a log of changes that can be read by external processes.


  2. Debezium Server: Acts as a standalone change data capture service that:
    • Connects to PostgreSQL using the configured credentials
    • Reads the WAL stream to detect changes
    • Converts database changes to structured JSON events
    • Forwards these events to the configured sink (webhook in our case)

  3. Webhook endpoint: Receives HTTP POST requests containing the change events as JSON payloads.
Next steps and variations


Now that you have a working Debezium setup, here are some ways to expand and customize it:

Monitor multiple tables


To track changes from additional tables, adjust the debezium.source.table.include.list property in application.properties:


debezium.source.table.include.list=public.customers,public.orders,public.products
Transform events before sending


Debezium supports Single Message Transforms (SMTs) to modify events before they're sent. For example, to rename a field:


# Add this to application.properties
debezium.transforms=rename
debezium.transforms.rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value
debezium.transforms.rename.renames=email:contact_email
Conclusion


You've successfully set up Debezium Server to capture and stream PostgreSQL changes to a webhook endpoint. This foundation can be extended to build robust event-driven architectures, real-time data pipelines, and more.

Want to skip all this complexity? Check out

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

for hassle-free change data capture and event streaming without the maintenance burden.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

 
Вверх Снизу