[VELODB.IO]
DATANOMIX.PRO // BLOG // DATA ENGINEERING

Airflow + Doris: pipeline orkestratsiyasi

MySQL Protocol orqali Lakehouse pipelinelarni qanday orkestratsiya qilish — maxsus providersiz

Tayyorlagan:
Datanomix.pro
Oʼqish vaqti:
~10 min
MUNDARIJA:
01 / Job Scheduler boʼlsa, Airflow nima uchun kerak
02 / MySQL Protocol orqali ulanish
03 / Uchta DAG patterni
04 / HTTP orqali Stream Load
05 / Nimani qachon ishlatish
FAQ

1. Doris-da Job Scheduler boʼlsa, Airflow nima uchun kerak

2.1 versiyasidan boshlab Apache Doris-da oʼrnatilgan Job Scheduler bor — SQL-vazifalarni toʼgʼridan-toʼgʼri dvigatel ichida rejalashtirish mumkin. Oddiy stsenariylar uchun (muntazam INSERT INTO SELECT, materialized viewlarni yangilash) bu yetarli.

Lekin enterprise-reallikda pipeline — bu bitta SQL-soʼrov emas. Bu zanjir:

  • API / S3 / Kafka-dan maʼlumotlarni olish
  • Transformatsiya (dbt, Spark, Python)
  • Doris-ga yuklash (Stream Load / INSERT)
  • Maʼlumotlar sifatini tekshirish (Great Expectations, Soda)
  • Vitrinalar va materialized viewlarni yangilash
  • Xato boʼlganda Slack / Telegram-ga xabar berish

Job Scheduler buni qila olmaydi. Airflow — qila oladi.

Qoida: Doris ichidagi vazifalar uchun Job Scheduler, bitta SQL-soʼrovdan chiqadigan barcha narsa uchun Airflow ishlating.

2. Ulanish: MySQL Protocol = Zero Custom Code

Asosiy yangilik: Apache Doris uchun rasmiy Airflow Provider yoʼq. Va u kerak emas.

Doris/VeloDB MySQL Protocol bilan toʼliq mos. Standart apache-airflow-providers-mysql tayyor ishlaydi. Doris-ga oddiy MySQL sifatida ulanasiz.

// AIRFLOW CONNECTION SETUP
# pip install apache-airflow-providers-mysql # Airflow UI → Admin → Connections → Add Connection Id: doris_default Connection Type: MySQL Host: your-doris-fe-host Schema: your_database Login: root Port: 9030 # Doris MySQL port (не 3306!)
Port 9030, 3306 emas. Doris FE MySQL Protocol-ni standart holatda 9030 portida tinglaydi.

3. Doris uchun uchta DAG patterni

Pattern 1: MySqlOperator orqali SQL-soʼrovlar

Eng oddiy variant — SQL-ni toʼgʼridan-toʼgʼri bajarish. Jadvallar yaratish, vitrinalarni yangilash, INSERT INTO SELECT uchun mos.

// DAG: SQL VIA MYSQL OPERATOR
from airflow import DAG from airflow.providers.mysql.operators.mysql import MySqlOperator from datetime import datetime with DAG("doris_refresh_marts", schedule="0 6 * * *", # каждый день в 6:00 start_date=datetime(2026, 1, 1), catchup=False): refresh_sales = MySqlOperator( task_id="refresh_sales_mart", mysql_conn_id="doris_default", sql=""" INSERT INTO sales_mart SELECT date, region, SUM(amount) FROM raw_transactions WHERE date = CURRENT_DATE() - 1 GROUP BY date, region; """ ) refresh_users = MySqlOperator( task_id="refresh_user_profiles", mysql_conn_id="doris_default", sql="REFRESH MATERIALIZED VIEW mv_user_profiles;" ) refresh_sales >> refresh_users

Pattern 2: Toʼliq ETL-pipeline

Maʼlumotlarni avval tashqi manbadan olib, transformatsiya qilib, yuklash kerak boʼlganda. Klassik Extract → Transform → Load.

// DAG: ETL PIPELINE (API → TRANSFORM → DORIS)
from airflow.decorators import dag, task from airflow.providers.mysql.operators.mysql import MySqlOperator @dag(schedule="@hourly", catchup=False) def api_to_doris(): @task() def extract(): # Забираем данные из REST API import httpx resp = httpx.get("https://api.example.com/events") return resp.json() @task() def transform(raw_data): # Чистим, маппим поля return [ (e["id"], e["type"], e["amount"]) for e in raw_data if e["status"] == "completed" ] @task() def load(rows): # Stream Load через HTTP API Doris import httpx, csv, io buf = io.StringIO() writer = csv.writer(buf) writer.writerows(rows) httpx.put( "http://doris-fe:8030/api/db/tbl/_stream_load", content=buf.getvalue(), headers={"format": "csv"}, auth=("root", "") ) data = extract() clean = transform(data) load(clean) api_to_doris()

Pattern 3: Lakehouse orkestratsiyasi (Iceberg + Doris)

Doris Data Lake (Iceberg/Hudi) ustida query engine sifatida ishlayotganda. Airflow muvofiqlashtiradi: Spark maʼlumotlarni Iceberg-ga yozadi, Doris katalog orqali oʼqiydi.

// DAG: LAKEHOUSE ORCHESTRATION
# Spark записывает → Iceberg хранит → Doris читает spark_write = SparkSubmitOperator( task_id="spark_to_iceberg", application="/jobs/etl_to_iceberg.py", conf={"spark.sql.catalog.lakehouse": "org.apache.iceberg.spark.SparkCatalog"} ) refresh_catalog = MySqlOperator( task_id="refresh_doris_catalog", mysql_conn_id="doris_default", sql="REFRESH CATALOG iceberg_catalog;" ) check_freshness = MySqlOperator( task_id="check_data_freshness", mysql_conn_id="doris_default", sql="""SELECT COUNT(*) FROM iceberg_catalog.db.events WHERE dt = CURRENT_DATE();""" ) spark_write >> refresh_catalog >> check_freshness

4. HTTP orqali Stream Load — katta hajmlarni yuklash

Doris-ga fayllarni (CSV, JSON, Parquet) yuklash uchun Stream Load API — FE-nodaga HTTP PUT-soʼrov ishlatiladi. Bu bulk-yuklashning eng tez usuli.

Airflow-dan SimpleHttpOperator yoki PythonOperator orqali chaqiriladi.

// STREAM LOAD VIA HTTP
# curl-эквивалент: curl -u root: -H "format: csv" \ -H "column_separator: ," \ -T data.csv \ http://doris-fe:8030/api/mydb/mytable/_stream_load # Airflow PythonOperator: def stream_load_to_doris(**ctx): import httpx with open("/data/export.csv", "rb") as f: resp = httpx.put( "http://doris-fe:8030/api/mydb/mytable/_stream_load", content=f.read(), headers={ "format": "csv", "column_separator": ",", "max_filter_ratio": "0.1" }, auth=("root", ""), timeout=300 ) assert resp.json()["Status"] == "Success"

5. Nimani qachon ishlatish — qaror matritsasi

Vazifa Asbob Nega
MV-ni jadval boʼyicha yangilash Doris Job Scheduler Bitta SQL, bogʼliqliklar yoʼq
Tashqi API-dan ETL Airflow Python, qayta urinishlar, alertlar
Spark → Iceberg → Doris Airflow Tizimlar orasidagi bogʼliqliklar
Data Quality tekshiruvlar Airflow + Great Expectations GE/Soda bilan integratsiya
CDC sinxronizatsiya Flink CDC / Kafka Connect Real-time, batch emas

FAQ

Doris uchun rasmiy Airflow Provider bormi?

Yoʼq, va u kerak emas. Doris MySQL Protocol bilan mos, shuning uchun standart apache-airflow-providers-mysql tayyor ishlaydi. 9030 portiga ulanish (Doris FE).

Ulanish uchun qaysi portni ishlatish kerak?

9030 — Doris FE MySQL Protocol porti. 8030 — Stream Load API HTTP-porti. 3306 (standart MySQL) bilan adashtirmang.

Airflow-da toza SQL oʼrniga dbt ishlatish mumkinmi?

Ha. dbt dbt-doris adapteri orqali Doris-ni qoʼllab-quvvatlaydi.

Pipelinelarni qanday monitoring qilish kerak?

Airflow DAG vizualizatsiyasi, vazifa loglari, Slack/Email/Telegram alertlari bilan Web UI taqdim etadi.

Airflow + VeloDB-ni sizning maʼlumotlaringizda sozlamoqchimisiz?

./PILOT_SOʼRASH.sh
© 2026 DATANOMIX.PRO — MARKAZIY OSIYODA VELODB EKSKLUZIV HAMKORI
VeloDB — Data Engineering BOSH SAHIFA