• Что бы вступить в ряды "Принятый кодер" Вам нужно:
    Написать 10 полезных сообщений или тем и Получить 10 симпатий.
    Для того кто не хочет терять время,может пожертвовать средства для поддержки сервеса, и вступить в ряды VIP на месяц, дополнительная информация в лс.

  • Пользаватели которые будут спамить, уходят в бан без предупреждения. Спам сообщения определяется администрацией и модератором.

  • Гость, Что бы Вы хотели увидеть на нашем Форуме? Изложить свои идеи и пожелания по улучшению форума Вы можете поделиться с нами здесь. ----> Перейдите сюда
  • Все пользователи не прошедшие проверку электронной почты будут заблокированы. Все вопросы с разблокировкой обращайтесь по адресу электронной почте : info@guardianelinks.com . Не пришло сообщение о проверке или о сбросе также сообщите нам.

Connecting RDBs and Search Engines — Chapter 4 Part 1

Lomanu4 Оффлайн

Lomanu4

Команда форума
Администратор
Регистрация
1 Мар 2015
Сообщения
1,481
Баллы
155
Chapter 4 (Part 1): Outputting Kafka CDC Data to Console with Flink


In this chapter, we will process the CDC data delivered to Kafka using Flink SQL and print the output to the console. Before persisting to OpenSearch, we visually verify that Flink is correctly consuming and processing the data from Kafka.

1. Prerequisites


Ensure the following components are already running:

  • PostgreSQL
  • Apache Kafka
  • Kafka Connect
  • ZooKeeper
  • Flink

Refer to

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

for details on setting up the Debezium → Kafka pipeline.

2. Architecture Overview


graph TD
subgraph source
PG[PostgreSQL]
end
subgraph Change Data Capture
DBZ[Debezium Connect]
end
subgraph stream platform
TOPIC[Kafka Topic]
end
subgraph stream processing
FLINK[Flink SQL Kafka Source]
PRINT[Flink Print Sink Console]
end

PG --> DBZ --> TOPIC --> FLINK --> PRINT

We verify that CDC events flow from Kafka to Flink and appear in the standard output in a format like +I[...].

3. Add Kafka Connector to Flink


Add the Kafka SQL connector JAR to Flink:

  • flink-sql-connector-kafka-3.3.0-1.19.jar
⚠ To avoid interference with Flink core libraries, place the JAR in flink/lib/ext/ and copy it into /opt/flink/lib/ at startup.
docker-compose.yaml Excerpt


flink-jobmanager:
image: flink:1.19
command: ["/bin/bash", "/jobmanager-entrypoint.sh"]
volumes:
- ./flink/sql:/opt/flink/sql
- ./flink/lib/ext:/opt/flink/lib/ext
- ./flink/jobmanager-entrypoint.sh:/jobmanager-entrypoint.sh
jobmanager-entrypoint.sh Example


#!/bin/bash
set -e
cp /opt/flink/lib/ext/*.jar /opt/flink/lib/
exec /docker-entrypoint.sh jobmanager
Apply a similar setup to TaskManager to copy the JAR.
4. Write Flink SQL Script


Save the following SQL in flink/sql/cdc_to_console.sql:


CREATE TABLE cdc_source (
id INT,
message STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.testtable',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE print_sink (
id INT,
message STRING
) WITH (
'connector' = 'print'
);

INSERT INTO print_sink
SELECT * FROM cdc_source;
Explanation of Flink SQL Tables

Kafka Source Table: cdc_source

PropertyDescription
connector = 'kafka'Reads data from a Kafka topic
format = 'debezium-json'Handles JSON messages in Debezium format
scan.startup.mode = 'earliest-offset'Reads from the earliest offset
PRIMARY KEY (...) NOT ENFORCEDDefines a primary key without enforcement
Console Sink Table: print_sink

  • Uses Flink's internal print connector to write output to stdout.
5. Run the Flink SQL Job


docker compose exec flink-jobmanager bash
sql-client.sh -f /opt/flink/sql/cdc_to_console.sql
Verify Running Jobs


docker compose exec flink-jobmanager bash
flink list

Expected output:


------------------ Running/Restarting Jobs -------------------
<job-id> : insert-into_default_catalog.default_database.print_sink (RUNNING)
6. Check the Output


docker compose logs flink-taskmanager

Expected output:


flink-taskmanager-1 | +I[1, CDC test row]
Why TaskManager?

  • The print sink outputs to the logs of the TaskManager running the job.
  • Use docker compose logs flink-taskmanager to view the output.
7. Troubleshooting

SQL Client Doesn’t Start

  • Check if JobManager is running:

docker compose logs flink-jobmanager
No CDC Output Appears

  • Is the Kafka topic name correct?
  • Is there data in the Kafka topic?
  • Is scan.startup.mode set to earliest-offset?

Check topic content:


kafka-console-consumer --bootstrap-server localhost:9092 \
--topic dbserver1.public.testtable \
--from-beginning
Bonus: Observing Parallelism


If you scale Flink to use multiple TaskManagers, you’ll see output distributed across their logs.

This allows you to observe parallel execution, slot allocation, and subtask distribution.

In this chapter, we confirmed the flow from Kafka → Flink → console output. Next, we will write the results to OpenSearch for persistence.

(Coming soon: Chapter 4 Part 2 — Integrating Kafka CDC Data with OpenSearch Using Flink)

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

 
Вверх Снизу