[VELODB.IO]
DATANOMIX.PRO // БЛОГ // DATA ENGINEERING

Airflow + Doris: пайплайн оркестрациясы

MySQL Protocol арқылы Lakehouse пайплайндарын қалай оркестрациялау — арнайы провайдерсіз

Дайындаған:
Datanomix.pro
Оқу уақыты:
~10 мин
МАЗМҰНЫ:
01 / Job Scheduler бар болса, Airflow не үшін керек
02 / MySQL Protocol арқылы қосылу
03 / Үш DAG паттерні
04 / HTTP арқылы Stream Load
05 / Нені қашан қолдану
FAQ

1. Doris-те Job Scheduler бар болса, Airflow не үшін керек

2.1 нұсқасынан бастап Apache Doris-те кірістірілген Job Scheduler бар — SQL-тапсырмаларды тікелей қозғалтқыш ішінде жоспарлауға болады. Қарапайым сценарийлер үшін (тұрақты INSERT INTO SELECT, материализованы көріністерді жаңарту) бұл жеткілікті.

Бірақ enterprise-шынайылықта пайплайн — бұл бір SQL-сұраныс емес. Бұл тізбек:

  • API / S3 / Kafka-дан деректерді алу
  • Трансформациялау (dbt, Spark, Python)
  • Doris-ке жүктеу (Stream Load / INSERT)
  • Деректер сапасын тексеру (Great Expectations, Soda)
  • Витриналар мен материализованы көріністерді жаңарту
  • Қате болған жағдайда Slack / Telegram-ға хабарлау

Job Scheduler мұны біле алмайды. Airflow — біледі.

Ереже: Doris ішіндегі тапсырмалар үшін Job Scheduler, бір SQL-сұраныстан шығатын барлық нәрсе үшін Airflow қолданыңыз.

2. Қосылу: MySQL Protocol = Zero Custom Code

Басты жаңалық: Apache Doris үшін ресми Airflow Provider жоқ. Және ол қажет емес.

Doris/VeloDB MySQL Protocol-мен толық үйлесімді. Стандартты apache-airflow-providers-mysql дайын жұмыс істейді. Doris-ке кәдімгі MySQL сияқты қосыласыз.

// AIRFLOW CONNECTION SETUP
# pip install apache-airflow-providers-mysql # Airflow UI → Admin → Connections → Add Connection Id: doris_default Connection Type: MySQL Host: your-doris-fe-host Schema: your_database Login: root Port: 9030 # Doris MySQL port (не 3306!)
Порт 9030, 3306 емес. Doris FE MySQL Protocol-ды әдепкі бойынша 9030 портында тыңдайды.

3. Doris үшін үш DAG паттерні

Паттерн 1: MySqlOperator арқылы SQL-сұраныстар

Ең қарапайым нұсқа — SQL-ды тікелей орындау. Кестелер жасау, витриналарды жаңарту, INSERT INTO SELECT үшін қолайлы.

// DAG: SQL VIA MYSQL OPERATOR
from airflow import DAG from airflow.providers.mysql.operators.mysql import MySqlOperator from datetime import datetime with DAG("doris_refresh_marts", schedule="0 6 * * *", # каждый день в 6:00 start_date=datetime(2026, 1, 1), catchup=False): refresh_sales = MySqlOperator( task_id="refresh_sales_mart", mysql_conn_id="doris_default", sql=""" INSERT INTO sales_mart SELECT date, region, SUM(amount) FROM raw_transactions WHERE date = CURRENT_DATE() - 1 GROUP BY date, region; """ ) refresh_users = MySqlOperator( task_id="refresh_user_profiles", mysql_conn_id="doris_default", sql="REFRESH MATERIALIZED VIEW mv_user_profiles;" ) refresh_sales >> refresh_users

Паттерн 2: Толық ETL-пайплайн

Деректерді алдымен сыртқы дереккөзден алып, трансформациялап, жүктеу қажет болғанда. Классикалық Extract → Transform → Load.

// DAG: ETL PIPELINE (API → TRANSFORM → DORIS)
from airflow.decorators import dag, task from airflow.providers.mysql.operators.mysql import MySqlOperator @dag(schedule="@hourly", catchup=False) def api_to_doris(): @task() def extract(): # Забираем данные из REST API import httpx resp = httpx.get("https://api.example.com/events") return resp.json() @task() def transform(raw_data): # Чистим, маппим поля return [ (e["id"], e["type"], e["amount"]) for e in raw_data if e["status"] == "completed" ] @task() def load(rows): # Stream Load через HTTP API Doris import httpx, csv, io buf = io.StringIO() writer = csv.writer(buf) writer.writerows(rows) httpx.put( "http://doris-fe:8030/api/db/tbl/_stream_load", content=buf.getvalue(), headers={"format": "csv"}, auth=("root", "") ) data = extract() clean = transform(data) load(clean) api_to_doris()

Паттерн 3: Lakehouse оркестрациясы (Iceberg + Doris)

Doris Data Lake (Iceberg/Hudi) үстінде query engine ретінде жұмыс істегенде. Airflow үйлестіреді: Spark деректерді Iceberg-ке жазады, Doris каталог арқылы оқиды.

// DAG: LAKEHOUSE ORCHESTRATION
# Spark записывает → Iceberg хранит → Doris читает spark_write = SparkSubmitOperator( task_id="spark_to_iceberg", application="/jobs/etl_to_iceberg.py", conf={"spark.sql.catalog.lakehouse": "org.apache.iceberg.spark.SparkCatalog"} ) refresh_catalog = MySqlOperator( task_id="refresh_doris_catalog", mysql_conn_id="doris_default", sql="REFRESH CATALOG iceberg_catalog;" ) check_freshness = MySqlOperator( task_id="check_data_freshness", mysql_conn_id="doris_default", sql="""SELECT COUNT(*) FROM iceberg_catalog.db.events WHERE dt = CURRENT_DATE();""" ) spark_write >> refresh_catalog >> check_freshness

4. HTTP арқылы Stream Load — үлкен көлемдерді жүктеу

Doris-ке файлдарды (CSV, JSON, Parquet) жүктеу үшін Stream Load API — FE-нодаға HTTP PUT-сұраныс қолданылады. Бұл bulk-жүктеудің ең жылдам тәсілі.

Airflow-дан SimpleHttpOperator немесе PythonOperator арқылы шақырылады.

// STREAM LOAD VIA HTTP
# curl-эквивалент: curl -u root: -H "format: csv" \ -H "column_separator: ," \ -T data.csv \ http://doris-fe:8030/api/mydb/mytable/_stream_load # Airflow PythonOperator: def stream_load_to_doris(**ctx): import httpx with open("/data/export.csv", "rb") as f: resp = httpx.put( "http://doris-fe:8030/api/mydb/mytable/_stream_load", content=f.read(), headers={ "format": "csv", "column_separator": ",", "max_filter_ratio": "0.1" }, auth=("root", ""), timeout=300 ) assert resp.json()["Status"] == "Success"

5. Нені қашан қолдану — шешім матрицасы

Тапсырма Құрал Неге
MV-ді кесте бойынша жаңарту Doris Job Scheduler Бір SQL, тәуелділіктер жоқ
Сыртқы API-дан ETL Airflow Python, қайта әрекеттер, алерттар
Spark → Iceberg → Doris Airflow Жүйелер арасындағы тәуелділіктер
Data Quality тексерулер Airflow + Great Expectations GE/Soda-мен интеграция
CDC синхрондау Flink CDC / Kafka Connect Real-time, batch емес

FAQ

Doris үшін ресми Airflow Provider бар ма?

Жоқ, және ол қажет емес. Doris MySQL Protocol-мен үйлесімді, сондықтан стандартты apache-airflow-providers-mysql дайын жұмыс істейді. 9030 портына қосылу (Doris FE).

Қосылу үшін қай портты пайдалану керек?

9030 — Doris FE MySQL Protocol порты. 8030 — Stream Load API HTTP-порты. 3306-мен (стандартты MySQL) шатастырмаңыз.

Airflow-да таза SQL орнына dbt қолдануға бола ма?

Иә. dbt dbt-doris адаптері арқылы Doris-ті қолдайды. Airflow dbt-ді BashOperator арқылы шақыра алады.

Пайплайндарды қалай бақылау керек?

Airflow DAG визуализациясы, тапсырма логтары, Slack/Email/Telegram алерттары бар Web UI ұсынады.

Airflow + VeloDB-ды сіздің деректеріңізде баптағыңыз келе ме?

./ПИЛОТ_СҰРАУ.sh
© 2026 DATANOMIX.PRO — VELODB-НЫҢ ОРТАЛЫҚ АЗИЯДА ЭКСКЛЮЗИВТІК СЕРІКТЕСІ
VeloDB — Data Engineering БАСТЫ