Airflow + Doris: pipeline orkestratsiyasi
MySQL Protocol orqali Lakehouse pipelinelarni qanday orkestratsiya qilish — maxsus providersiz
Datanomix.pro
~10 min
1. Doris-da Job Scheduler boʼlsa, Airflow nima uchun kerak
2.1 versiyasidan boshlab Apache Doris-da oʼrnatilgan Job Scheduler bor — SQL-vazifalarni toʼgʼridan-toʼgʼri dvigatel ichida rejalashtirish mumkin. Oddiy stsenariylar uchun (muntazam INSERT INTO SELECT, materialized viewlarni yangilash) bu yetarli.
Lekin enterprise-reallikda pipeline — bu bitta SQL-soʼrov emas. Bu zanjir:
- API / S3 / Kafka-dan maʼlumotlarni olish
- Transformatsiya (dbt, Spark, Python)
- Doris-ga yuklash (Stream Load / INSERT)
- Maʼlumotlar sifatini tekshirish (Great Expectations, Soda)
- Vitrinalar va materialized viewlarni yangilash
- Xato boʼlganda Slack / Telegram-ga xabar berish
Job Scheduler buni qila olmaydi. Airflow — qila oladi.
Qoida: Doris ichidagi vazifalar uchun Job Scheduler, bitta SQL-soʼrovdan chiqadigan barcha narsa uchun Airflow ishlating.
2. Ulanish: MySQL Protocol = Zero Custom Code
Asosiy yangilik: Apache Doris uchun rasmiy Airflow Provider yoʼq. Va u kerak emas.
Doris/VeloDB MySQL Protocol bilan toʼliq mos. Standart apache-airflow-providers-mysql tayyor ishlaydi. Doris-ga oddiy MySQL sifatida ulanasiz.
# pip install apache-airflow-providers-mysql # Airflow UI → Admin → Connections → Add Connection Id: doris_default
Connection Type: MySQL
Host: your-doris-fe-host Schema: your_database Login: root Port: 9030 # Doris MySQL port (не 3306!) 3. Doris uchun uchta DAG patterni
Pattern 1: MySqlOperator orqali SQL-soʼrovlar
Eng oddiy variant — SQL-ni toʼgʼridan-toʼgʼri bajarish. Jadvallar yaratish, vitrinalarni yangilash, INSERT INTO SELECT uchun mos.
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from datetime import datetime
with DAG("doris_refresh_marts",
schedule="0 6 * * *", # каждый день в 6:00
start_date=datetime(2026, 1, 1),
catchup=False):
refresh_sales = MySqlOperator(
task_id="refresh_sales_mart",
mysql_conn_id="doris_default",
sql="""
INSERT INTO sales_mart
SELECT date, region, SUM(amount)
FROM raw_transactions
WHERE date = CURRENT_DATE() - 1
GROUP BY date, region;
"""
)
refresh_users = MySqlOperator(
task_id="refresh_user_profiles",
mysql_conn_id="doris_default",
sql="REFRESH MATERIALIZED VIEW mv_user_profiles;"
)
refresh_sales >> refresh_users Pattern 2: Toʼliq ETL-pipeline
Maʼlumotlarni avval tashqi manbadan olib, transformatsiya qilib, yuklash kerak boʼlganda. Klassik Extract → Transform → Load.
from airflow.decorators import dag, task
from airflow.providers.mysql.operators.mysql import MySqlOperator
@dag(schedule="@hourly", catchup=False)
def api_to_doris():
@task()
def extract():
# Забираем данные из REST API import httpx
resp = httpx.get("https://api.example.com/events")
return resp.json()
@task()
def transform(raw_data):
# Чистим, маппим поля return [
(e["id"], e["type"], e["amount"])
for e in raw_data
if e["status"] == "completed"
]
@task()
def load(rows):
# Stream Load через HTTP API Doris import httpx, csv, io
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(rows)
httpx.put(
"http://doris-fe:8030/api/db/tbl/_stream_load",
content=buf.getvalue(),
headers={"format": "csv"},
auth=("root", "")
)
data = extract()
clean = transform(data)
load(clean)
api_to_doris() Pattern 3: Lakehouse orkestratsiyasi (Iceberg + Doris)
Doris Data Lake (Iceberg/Hudi) ustida query engine sifatida ishlayotganda. Airflow muvofiqlashtiradi: Spark maʼlumotlarni Iceberg-ga yozadi, Doris katalog orqali oʼqiydi.
# Spark записывает → Iceberg хранит → Doris читает
spark_write = SparkSubmitOperator(
task_id="spark_to_iceberg",
application="/jobs/etl_to_iceberg.py",
conf={"spark.sql.catalog.lakehouse":
"org.apache.iceberg.spark.SparkCatalog"}
)
refresh_catalog = MySqlOperator(
task_id="refresh_doris_catalog",
mysql_conn_id="doris_default",
sql="REFRESH CATALOG iceberg_catalog;"
)
check_freshness = MySqlOperator(
task_id="check_data_freshness",
mysql_conn_id="doris_default",
sql="""SELECT COUNT(*) FROM iceberg_catalog.db.events
WHERE dt = CURRENT_DATE();"""
)
spark_write >> refresh_catalog >> check_freshness 4. HTTP orqali Stream Load — katta hajmlarni yuklash
Doris-ga fayllarni (CSV, JSON, Parquet) yuklash uchun Stream Load API — FE-nodaga HTTP PUT-soʼrov ishlatiladi. Bu bulk-yuklashning eng tez usuli.
Airflow-dan SimpleHttpOperator yoki PythonOperator orqali chaqiriladi.
# curl-эквивалент:
curl -u root: -H "format: csv" \
-H "column_separator: ," \
-T data.csv \
http://doris-fe:8030/api/mydb/mytable/_stream_load
# Airflow PythonOperator: def stream_load_to_doris(**ctx):
import httpx
with open("/data/export.csv", "rb") as f:
resp = httpx.put(
"http://doris-fe:8030/api/mydb/mytable/_stream_load",
content=f.read(),
headers={ "format": "csv",
"column_separator": ",",
"max_filter_ratio": "0.1" },
auth=("root", ""),
timeout=300
)
assert resp.json()["Status"] == "Success" 5. Nimani qachon ishlatish — qaror matritsasi
| Vazifa | Asbob | Nega |
|---|---|---|
| MV-ni jadval boʼyicha yangilash | Doris Job Scheduler | Bitta SQL, bogʼliqliklar yoʼq |
| Tashqi API-dan ETL | Airflow | Python, qayta urinishlar, alertlar |
| Spark → Iceberg → Doris | Airflow | Tizimlar orasidagi bogʼliqliklar |
| Data Quality tekshiruvlar | Airflow + Great Expectations | GE/Soda bilan integratsiya |
| CDC sinxronizatsiya | Flink CDC / Kafka Connect | Real-time, batch emas |
FAQ
Doris uchun rasmiy Airflow Provider bormi?
Yoʼq, va u kerak emas. Doris MySQL Protocol bilan mos, shuning uchun standart apache-airflow-providers-mysql tayyor ishlaydi. 9030 portiga ulanish (Doris FE).
Ulanish uchun qaysi portni ishlatish kerak?
9030 — Doris FE MySQL Protocol porti. 8030 — Stream Load API HTTP-porti. 3306 (standart MySQL) bilan adashtirmang.
Airflow-da toza SQL oʼrniga dbt ishlatish mumkinmi?
Ha. dbt dbt-doris adapteri orqali Doris-ni qoʼllab-quvvatlaydi.
Pipelinelarni qanday monitoring qilish kerak?
Airflow DAG vizualizatsiyasi, vazifa loglari, Slack/Email/Telegram alertlari bilan Web UI taqdim etadi.
Airflow + VeloDB-ni sizning maʼlumotlaringizda sozlamoqchimisiz?
./PILOT_SOʼRASH.shArxitektura, foydalanish stsenariylari, asosiy imkoniyatlar. 5000+ kompaniya production-da.
Data Warehouse vs Data Lake vs Lakehouse. Open table formats. Beshta noaniq afzallik.
SuperSet, PowerBI, Tableau sekin ishlayaptimi? Sub-100ms, Auto Query Rewrite, bepul pilot.
Kafka -> Doris: standalone/distributed, SSL, DLQ, schema evolution va best practices.