Airflow + Doris: пайплайн оркестрациясы
MySQL Protocol арқылы Lakehouse пайплайндарын қалай оркестрациялау — арнайы провайдерсіз
Datanomix.pro
~10 мин
1. Doris-те Job Scheduler бар болса, Airflow не үшін керек
2.1 нұсқасынан бастап Apache Doris-те кірістірілген Job Scheduler бар — SQL-тапсырмаларды тікелей қозғалтқыш ішінде жоспарлауға болады. Қарапайым сценарийлер үшін (тұрақты INSERT INTO SELECT, материализованы көріністерді жаңарту) бұл жеткілікті.
Бірақ enterprise-шынайылықта пайплайн — бұл бір SQL-сұраныс емес. Бұл тізбек:
- API / S3 / Kafka-дан деректерді алу
- Трансформациялау (dbt, Spark, Python)
- Doris-ке жүктеу (Stream Load / INSERT)
- Деректер сапасын тексеру (Great Expectations, Soda)
- Витриналар мен материализованы көріністерді жаңарту
- Қате болған жағдайда Slack / Telegram-ға хабарлау
Job Scheduler мұны біле алмайды. Airflow — біледі.
Ереже: Doris ішіндегі тапсырмалар үшін Job Scheduler, бір SQL-сұраныстан шығатын барлық нәрсе үшін Airflow қолданыңыз.
2. Қосылу: MySQL Protocol = Zero Custom Code
Басты жаңалық: Apache Doris үшін ресми Airflow Provider жоқ. Және ол қажет емес.
Doris/VeloDB MySQL Protocol-мен толық үйлесімді. Стандартты apache-airflow-providers-mysql дайын жұмыс істейді. Doris-ке кәдімгі MySQL сияқты қосыласыз.
# pip install apache-airflow-providers-mysql # Airflow UI → Admin → Connections → Add Connection Id: doris_default
Connection Type: MySQL
Host: your-doris-fe-host Schema: your_database Login: root Port: 9030 # Doris MySQL port (не 3306!) 3. Doris үшін үш DAG паттерні
Паттерн 1: MySqlOperator арқылы SQL-сұраныстар
Ең қарапайым нұсқа — SQL-ды тікелей орындау. Кестелер жасау, витриналарды жаңарту, INSERT INTO SELECT үшін қолайлы.
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from datetime import datetime
with DAG("doris_refresh_marts",
schedule="0 6 * * *", # каждый день в 6:00
start_date=datetime(2026, 1, 1),
catchup=False):
refresh_sales = MySqlOperator(
task_id="refresh_sales_mart",
mysql_conn_id="doris_default",
sql="""
INSERT INTO sales_mart
SELECT date, region, SUM(amount)
FROM raw_transactions
WHERE date = CURRENT_DATE() - 1
GROUP BY date, region;
"""
)
refresh_users = MySqlOperator(
task_id="refresh_user_profiles",
mysql_conn_id="doris_default",
sql="REFRESH MATERIALIZED VIEW mv_user_profiles;"
)
refresh_sales >> refresh_users Паттерн 2: Толық ETL-пайплайн
Деректерді алдымен сыртқы дереккөзден алып, трансформациялап, жүктеу қажет болғанда. Классикалық Extract → Transform → Load.
from airflow.decorators import dag, task
from airflow.providers.mysql.operators.mysql import MySqlOperator
@dag(schedule="@hourly", catchup=False)
def api_to_doris():
@task()
def extract():
# Забираем данные из REST API import httpx
resp = httpx.get("https://api.example.com/events")
return resp.json()
@task()
def transform(raw_data):
# Чистим, маппим поля return [
(e["id"], e["type"], e["amount"])
for e in raw_data
if e["status"] == "completed"
]
@task()
def load(rows):
# Stream Load через HTTP API Doris import httpx, csv, io
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(rows)
httpx.put(
"http://doris-fe:8030/api/db/tbl/_stream_load",
content=buf.getvalue(),
headers={"format": "csv"},
auth=("root", "")
)
data = extract()
clean = transform(data)
load(clean)
api_to_doris() Паттерн 3: Lakehouse оркестрациясы (Iceberg + Doris)
Doris Data Lake (Iceberg/Hudi) үстінде query engine ретінде жұмыс істегенде. Airflow үйлестіреді: Spark деректерді Iceberg-ке жазады, Doris каталог арқылы оқиды.
# Spark записывает → Iceberg хранит → Doris читает
spark_write = SparkSubmitOperator(
task_id="spark_to_iceberg",
application="/jobs/etl_to_iceberg.py",
conf={"spark.sql.catalog.lakehouse":
"org.apache.iceberg.spark.SparkCatalog"}
)
refresh_catalog = MySqlOperator(
task_id="refresh_doris_catalog",
mysql_conn_id="doris_default",
sql="REFRESH CATALOG iceberg_catalog;"
)
check_freshness = MySqlOperator(
task_id="check_data_freshness",
mysql_conn_id="doris_default",
sql="""SELECT COUNT(*) FROM iceberg_catalog.db.events
WHERE dt = CURRENT_DATE();"""
)
spark_write >> refresh_catalog >> check_freshness 4. HTTP арқылы Stream Load — үлкен көлемдерді жүктеу
Doris-ке файлдарды (CSV, JSON, Parquet) жүктеу үшін Stream Load API — FE-нодаға HTTP PUT-сұраныс қолданылады. Бұл bulk-жүктеудің ең жылдам тәсілі.
Airflow-дан SimpleHttpOperator немесе PythonOperator арқылы шақырылады.
# curl-эквивалент:
curl -u root: -H "format: csv" \
-H "column_separator: ," \
-T data.csv \
http://doris-fe:8030/api/mydb/mytable/_stream_load
# Airflow PythonOperator: def stream_load_to_doris(**ctx):
import httpx
with open("/data/export.csv", "rb") as f:
resp = httpx.put(
"http://doris-fe:8030/api/mydb/mytable/_stream_load",
content=f.read(),
headers={ "format": "csv",
"column_separator": ",",
"max_filter_ratio": "0.1" },
auth=("root", ""),
timeout=300
)
assert resp.json()["Status"] == "Success" 5. Нені қашан қолдану — шешім матрицасы
| Тапсырма | Құрал | Неге |
|---|---|---|
| MV-ді кесте бойынша жаңарту | Doris Job Scheduler | Бір SQL, тәуелділіктер жоқ |
| Сыртқы API-дан ETL | Airflow | Python, қайта әрекеттер, алерттар |
| Spark → Iceberg → Doris | Airflow | Жүйелер арасындағы тәуелділіктер |
| Data Quality тексерулер | Airflow + Great Expectations | GE/Soda-мен интеграция |
| CDC синхрондау | Flink CDC / Kafka Connect | Real-time, batch емес |
FAQ
Doris үшін ресми Airflow Provider бар ма?
Жоқ, және ол қажет емес. Doris MySQL Protocol-мен үйлесімді, сондықтан стандартты apache-airflow-providers-mysql дайын жұмыс істейді. 9030 портына қосылу (Doris FE).
Қосылу үшін қай портты пайдалану керек?
9030 — Doris FE MySQL Protocol порты. 8030 — Stream Load API HTTP-порты. 3306-мен (стандартты MySQL) шатастырмаңыз.
Airflow-да таза SQL орнына dbt қолдануға бола ма?
Иә. dbt dbt-doris адаптері арқылы Doris-ті қолдайды. Airflow dbt-ді BashOperator арқылы шақыра алады.
Пайплайндарды қалай бақылау керек?
Airflow DAG визуализациясы, тапсырма логтары, Slack/Email/Telegram алерттары бар Web UI ұсынады.
Airflow + VeloDB-ды сіздің деректеріңізде баптағыңыз келе ме?
./ПИЛОТ_СҰРАУ.shАрхитектура, қолдану сценарийлері, негізгі мүмкіндіктер. 5000+ компания production-да.
Data Warehouse vs Data Lake vs Lakehouse. Open table formats. Бес айқын емес артықшылық.
SuperSet, PowerBI, Tableau баяу ма? Sub-100ms, Auto Query Rewrite, тегін пилот.
Kafka -> Doris: standalone/distributed, SSL, DLQ, schema evolution және best practices.