- Регистрация
- 1 Мар 2015
- Сообщения
- 1,481
- Баллы
- 155
Chapter 4 (Part 2): Integrating Kafka CDC Data with OpenSearch Using Flink
In this chapter, we use Flink SQL to process CDC data in Debezium JSON format from Kafka and write it to OpenSearch.
1. Architecture Overview
graph TD
subgraph Source
PG[PostgreSQL]
end
subgraph CDC
DBZ[Debezium Connect]
end
subgraph Stream
TOPIC[Kafka Topic]
end
subgraph Flink
FLINK[Flink SQL Kafka Source]
OS_SINK[Flink OpenSearch Sink]
end
subgraph Search
OS[OpenSearch]
end
PG --> DBZ --> TOPIC --> FLINK --> OS_SINK --> OS
Flink consumes CDC events from Kafka, transforms them, and upserts the data into OpenSearch.
2. Prerequisites
Ensure the following OSS components are running (e.g., via Docker Compose):
Place the following JARs in flink/lib/ext, and copy them to flink/lib/ at container startup.
Required Connector JARs
Use the following to automatically fetch dependencies:
bash scripts/00-flink-setup.sh
cd flink
mvn dependency:copy-dependencies -DoutputDirectory=./lib/ext
rm lib/ext/log4j-*
Example Entry Point Script
#!/bin/bash
set -e
cp /opt/flink/lib/ext/*.jar /opt/flink/lib/
exec /docker-entrypoint.sh jobmanager
Apply the same logic for flink-taskmanager as well.
4. Create OpenSearch Index
Create opensearch/test-index-mapping.json:
{
"mappings": {
"properties": {
"doc_id": { "type": "keyword" },
"id": { "type": "integer" },
"message": { "type": "text" }
}
}
}
Run:
curl -X PUT "" \
-H "Content-Type: application/json" \
-d @opensearch/test-index-mapping.json
Save the following as flink/sql/cdc_to_opensearch.sql:
CREATE TABLE cdc_source (
id INT,
message STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.testtable',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE opensearch_sink (
doc_id STRING,
id INT,
message STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'opensearch-2',
'hosts' = '',
'allow-insecure' = 'true',
'index' = 'test-index',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '1mb'
);
INSERT INTO opensearch_sink
SELECT
MD5(CAST(id AS STRING)) AS doc_id,
id,
message
FROM cdc_source;
6. Run the Flink Job
docker compose exec flink-jobmanager bash
sql-client.sh -f /opt/flink/sql/cdc_to_opensearch.sql
7. Verify OpenSearch Output
curl -X GET ""
Sample output:
"hits" : [
{
"_index" : "test-index",
"_id" : "c4ca4238a0b923820dcc509a6f75849b",
"_source" : {
"doc_id" : "c4ca4238a0b923820dcc509a6f75849b",
"id" : 1,
"message" : "CDC test row"
}
}
]
In this chapter, we built and verified a pipeline using Flink SQL to upsert CDC data into OpenSearch.
In this chapter, we use Flink SQL to process CDC data in Debezium JSON format from Kafka and write it to OpenSearch.
1. Architecture Overview
graph TD
subgraph Source
PG[PostgreSQL]
end
subgraph CDC
DBZ[Debezium Connect]
end
subgraph Stream
TOPIC[Kafka Topic]
end
subgraph Flink
FLINK[Flink SQL Kafka Source]
OS_SINK[Flink OpenSearch Sink]
end
subgraph Search
OS[OpenSearch]
end
PG --> DBZ --> TOPIC --> FLINK --> OS_SINK --> OS
Flink consumes CDC events from Kafka, transforms them, and upserts the data into OpenSearch.
2. Prerequisites
Ensure the following OSS components are running (e.g., via Docker Compose):
- PostgreSQL
- Apache Kafka
- Kafka Connect (Debezium)
- ZooKeeper
- Flink (1.19)
- OpenSearch (2.13)
3. Flink JAR Setup? See docker-compose.yaml for details.
Place the following JARs in flink/lib/ext, and copy them to flink/lib/ at container startup.
Required Connector JARs
- flink-sql-connector-kafka-3.3.0-1.19.jar
- flink-sql-connector-opensearch2-2.0.0-1.19.jar
- flink-connector-opensearch2-2.0.0-1.19.jar
- flink-connector-opensearch-base-2.0.0-1.19.jar
Use the following to automatically fetch dependencies:
bash scripts/00-flink-setup.sh
cd flink
mvn dependency:copy-dependencies -DoutputDirectory=./lib/ext
rm lib/ext/log4j-*
Example Entry Point Script
#!/bin/bash
set -e
cp /opt/flink/lib/ext/*.jar /opt/flink/lib/
exec /docker-entrypoint.sh jobmanager
Apply the same logic for flink-taskmanager as well.
4. Create OpenSearch Index
Create opensearch/test-index-mapping.json:
{
"mappings": {
"properties": {
"doc_id": { "type": "keyword" },
"id": { "type": "integer" },
"message": { "type": "text" }
}
}
}
Run:
curl -X PUT "" \
-H "Content-Type: application/json" \
-d @opensearch/test-index-mapping.json
5. Create Flink SQL ScriptDefining mappings helps avoid automatic keyword field generation.
Save the following as flink/sql/cdc_to_opensearch.sql:
CREATE TABLE cdc_source (
id INT,
message STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.testtable',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE opensearch_sink (
doc_id STRING,
id INT,
message STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'opensearch-2',
'hosts' = '',
'allow-insecure' = 'true',
'index' = 'test-index',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '1mb'
);
INSERT INTO opensearch_sink
SELECT
MD5(CAST(id AS STRING)) AS doc_id,
id,
message
FROM cdc_source;
6. Run the Flink Job
docker compose exec flink-jobmanager bash
sql-client.sh -f /opt/flink/sql/cdc_to_opensearch.sql
7. Verify OpenSearch Output
curl -X GET ""
Sample output:
"hits" : [
{
"_index" : "test-index",
"_id" : "c4ca4238a0b923820dcc509a6f75849b",
"_source" : {
"doc_id" : "c4ca4238a0b923820dcc509a6f75849b",
"id" : 1,
"message" : "CDC test row"
}
}
]
In this chapter, we built and verified a pipeline using Flink SQL to upsert CDC data into OpenSearch.
(Coming soon: Chapter 5 — Implementing a CDC Join Pipeline with Flink SQL)