• Что бы вступить в ряды "Принятый кодер" Вам нужно:
    Написать 10 полезных сообщений или тем и Получить 10 симпатий.
    Для того кто не хочет терять время,может пожертвовать средства для поддержки сервеса, и вступить в ряды VIP на месяц, дополнительная информация в лс.

  • Пользаватели которые будут спамить, уходят в бан без предупреждения. Спам сообщения определяется администрацией и модератором.

  • Гость, Что бы Вы хотели увидеть на нашем Форуме? Изложить свои идеи и пожелания по улучшению форума Вы можете поделиться с нами здесь. ----> Перейдите сюда
  • Все пользователи не прошедшие проверку электронной почты будут заблокированы. Все вопросы с разблокировкой обращайтесь по адресу электронной почте : info@guardianelinks.com . Не пришло сообщение о проверке или о сбросе также сообщите нам.

Sync API Data to Your Data Lake with SeaTunnel+ DolphinScheduler

Lomanu4 Оффлайн

Lomanu4

Команда форума
Администратор
Регистрация
1 Мар 2015
Сообщения
1,481
Баллы
155
Previously, we evaluated using SeaTunnel for CDC (Change Data Capture) data lake ingestion: SeaTunnel-CDC Data Lake Ingestion Practice. Those scenarios were all based on direct database connections. However, in many business cases, we can't directly access databases to perform CDC. In these scenarios, we need to integrate via APIs and schedule data synchronization using Apache DolphinScheduler.

Here’s a real-world example:

  • Synchronizing inventory data from ERP (SAP) into a data lake for inventory analysis.

At the same time, our goal is to enable colleagues to replicate the process and independently complete future API-to-lake integrations—rather than having to write custom code for each use case.

Prerequisites

  • SeaTunnel 2.3.10

First, add the connector names in the ${SEATUNNEL_HOME}/config/plugin_config file, and then install the connectors by executing the command. After installation, the connectors should appear under ${SEATUNNEL_HOME}/connectors/.

In this example, we’ll use: connector-jdbc, connector-paimon.

To write to StarRocks, you can also use connector-starrocks, but for this particular case, connector-jdbc is more appropriate, so we'll use that.


# Configure connector names
--connectors-v2--
connector-jdbc
connector-starrocks
connector-paimon
--end--

# Install connectors
sh bin/install-plugin.sh 2.3.10
SeaTunnel Job


Let’s first ensure the SeaTunnel job runs successfully locally, before integrating it with Apache DolphinScheduler.

  • http to StarRocks

Example path: example/http2starrocks


env {
parallelism = 1
job.mode = "BATCH"
}

source {
Http {
plugin_output = "stock"
url = "

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

"
method = "POST"
headers {
Authorization = "Basic XXX"
Content-Type = "application/json"
}
body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
format = "json"
content_field = "$.ET_RETURN.*"
schema {
fields {
MATNR = "string"
MAKTX = "string"
WERKS = "string"
NAME1 = "string"
LGORT = "string"
LGOBE = "string"
CHARG = "string"
MEINS = "string"
LABST = "double"
UMLME = "double"
INSME = "double"
EINME = "double"
SPEME = "double"
RETME = "double"
}
}
}
}

# This transform is mainly for field renaming and readability
transform {
Sql {
plugin_input = "stock"
plugin_output = "stock-tf-out"
query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
}
}

# Write to StarRocks using JDBC with partition overwrite
sink {
jdbc {
plugin_input = "stock-tf-out"
url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "lab"
password = "XXX"
compatible_mode="starrocks"
query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""
}
}
  • http to Paimon

Example path: example/http2paimon


env {
parallelism = 1
job.mode = "BATCH"
}

source {
Http {
plugin_output = "stock"
url = "

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

"
method = "POST"
headers {
Authorization = "Basic XXX"
Content-Type = "application/json"
}
body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
format = "json"
content_field = "$.ET_RETURN.*"
schema {
fields {
MATNR = "string"
MAKTX = "string"
WERKS = "string"
NAME1 = "string"
LGORT = "string"
LGOBE = "string"
CHARG = "string"
MEINS = "string"
LABST = "double"
UMLME = "double"
INSME = "double"
EINME = "double"
SPEME = "double"
RETME = "double"
}
}
}
}

# This transform is mainly for field renaming and readability
transform {
Sql {
plugin_input = "stock"
plugin_output = "stock-tf-out"
query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
}
}

# Sync to Paimon (currently Paimon does not support insert overwrite on partitions; this is for reference only)
sink {
Paimon {
warehouse = "s3a://test/"
database = "sap"
table = "ods_sap_stock"
paimon.hadoop.conf = {
fs.s3a.access-key=XXX
fs.s3a.secret-key=XXX
fs.s3a.endpoint="

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

"
fs.s3a.path.style.access=true
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}
Integrating SeaTunnel with DolphinScheduler

  • Build a DolphinScheduler Worker image with SeaTunnel pre-installed

FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:3.2.2
RUN mkdir /opt/seatunnel
RUN mkdir /opt/seatunnel/apache-seatunnel-2.3.10
COPY apache-seatunnel-2.3.10/ /opt/seatunnel/apache-seatunnel-2.3.10/

Build and push the Docker image:


docker build --platform=linux/amd64 -t apache/dolphinscheduler-worker:3.2.2-seatunnel .
  • Deploy a new DolphinScheduler Worker using this image. Modify docker-compose.yaml to add a new dolphinscheduler-worker-seatunnel node:

dolphinscheduler-worker-seatunnel:
image: xxx/dolphinscheduler-worker:3.2.2-seatunnel
profiles: ["all"]
env_file: .env
healthcheck:
test: [ "CMD", "curl", "

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

" ]
interval: 30s
timeout: 5s
retries: 3
depends_on:
dolphinscheduler-zookeeper:
condition: service_healthy
volumes:
- ./dolphinscheduler-worker-seatunnel-data:/tmp/dolphinscheduler
- ./dolphinscheduler-logs:/opt/dolphinscheduler/logs
- ./dolphinscheduler-shared-local:/opt/soft
- ./dolphinscheduler-resource-local:/dolphinscheduler
networks:
dolphinscheduler:
ipv4_address: 172.15.0.18
  • Configure Worker Group and Environment in DolphinScheduler

  1. Worker Group Management:
    In the Security Center > Worker Group, create a group for the node IP used in the SeaTunnel Worker.


  2. Environment Configuration:
    In Environment Management, create a new environment for SeaTunnel execution and bind it to the Worker group created above.


  3. Create Workflow Definition:
    Fill in the SeaTunnel job configuration and define the workflow.


  4. Run the Task:
    During execution, select the correct SeaTunnel Worker Group and Environment to ensure the job runs in the integrated SeaTunnel environment.


Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

 
Вверх Снизу