• Что бы вступить в ряды "Принятый кодер" Вам нужно:
    Написать 10 полезных сообщений или тем и Получить 10 симпатий.
    Для того кто не хочет терять время,может пожертвовать средства для поддержки сервеса, и вступить в ряды VIP на месяц, дополнительная информация в лс.

  • Пользаватели которые будут спамить, уходят в бан без предупреждения. Спам сообщения определяется администрацией и модератором.

  • Гость, Что бы Вы хотели увидеть на нашем Форуме? Изложить свои идеи и пожелания по улучшению форума Вы можете поделиться с нами здесь. ----> Перейдите сюда
  • Все пользователи не прошедшие проверку электронной почты будут заблокированы. Все вопросы с разблокировкой обращайтесь по адресу электронной почте : info@guardianelinks.com . Не пришло сообщение о проверке или о сбросе также сообщите нам.

Connecting RDBs and Search Engines — Chapter 4 Part 2

Lomanu4 Оффлайн

Lomanu4

Команда форума
Администратор
Регистрация
1 Мар 2015
Сообщения
1,481
Баллы
155
Chapter 4 (Part 2): Integrating Kafka CDC Data with OpenSearch Using Flink


In this chapter, we use Flink SQL to process CDC data in Debezium JSON format from Kafka and write it to OpenSearch.

1. Architecture Overview


graph TD
subgraph Source
PG[PostgreSQL]
end
subgraph CDC
DBZ[Debezium Connect]
end
subgraph Stream
TOPIC[Kafka Topic]
end
subgraph Flink
FLINK[Flink SQL Kafka Source]
OS_SINK[Flink OpenSearch Sink]
end
subgraph Search
OS[OpenSearch]
end

PG --> DBZ --> TOPIC --> FLINK --> OS_SINK --> OS

Flink consumes CDC events from Kafka, transforms them, and upserts the data into OpenSearch.

2. Prerequisites


Ensure the following OSS components are running (e.g., via Docker Compose):

  • PostgreSQL
  • Apache Kafka
  • Kafka Connect (Debezium)
  • ZooKeeper
  • Flink (1.19)
  • OpenSearch (2.13)
? See docker-compose.yaml for details.
3. Flink JAR Setup


Place the following JARs in flink/lib/ext, and copy them to flink/lib/ at container startup.

Required Connector JARs

  • flink-sql-connector-kafka-3.3.0-1.19.jar
  • flink-sql-connector-opensearch2-2.0.0-1.19.jar
  • flink-connector-opensearch2-2.0.0-1.19.jar
  • flink-connector-opensearch-base-2.0.0-1.19.jar
Additional OpenSearch Dependencies


Use the following to automatically fetch dependencies:


bash scripts/00-flink-setup.sh
cd flink
mvn dependency:copy-dependencies -DoutputDirectory=./lib/ext
rm lib/ext/log4j-*
Example Entry Point Script


#!/bin/bash
set -e
cp /opt/flink/lib/ext/*.jar /opt/flink/lib/
exec /docker-entrypoint.sh jobmanager

Apply the same logic for flink-taskmanager as well.

4. Create OpenSearch Index


Create opensearch/test-index-mapping.json:


{
"mappings": {
"properties": {
"doc_id": { "type": "keyword" },
"id": { "type": "integer" },
"message": { "type": "text" }
}
}
}

Run:


curl -X PUT "

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

" \
-H "Content-Type: application/json" \
-d @opensearch/test-index-mapping.json
✅ Defining mappings helps avoid automatic keyword field generation.
5. Create Flink SQL Script


Save the following as flink/sql/cdc_to_opensearch.sql:


CREATE TABLE cdc_source (
id INT,
message STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.testtable',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE opensearch_sink (
doc_id STRING,
id INT,
message STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'opensearch-2',
'hosts' = '

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

',
'allow-insecure' = 'true',
'index' = 'test-index',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '1mb'
);

INSERT INTO opensearch_sink
SELECT
MD5(CAST(id AS STRING)) AS doc_id,
id,
message
FROM cdc_source;
6. Run the Flink Job


docker compose exec flink-jobmanager bash
sql-client.sh -f /opt/flink/sql/cdc_to_opensearch.sql
7. Verify OpenSearch Output


curl -X GET "

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

"

Sample output:


"hits" : [
{
"_index" : "test-index",
"_id" : "c4ca4238a0b923820dcc509a6f75849b",
"_source" : {
"doc_id" : "c4ca4238a0b923820dcc509a6f75849b",
"id" : 1,
"message" : "CDC test row"
}
}
]

In this chapter, we built and verified a pipeline using Flink SQL to upsert CDC data into OpenSearch.

(Coming soon: Chapter 5 — Implementing a CDC Join Pipeline with Flink SQL)

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

 
Вверх Снизу