Airflow + Apache Doris / VeloDB
Как оркестрировать пайплайны в Lakehouse через MySQL Protocol — без кастомного провайдера
Datanomix.pro
~10 мин
1. Зачем Airflow, если в Doris есть встроенный Job Scheduler
С версии 2.1 Apache Doris имеет встроенный Job Scheduler — можно планировать SQL-задачи по расписанию прямо внутри движка. Для простых сценариев (регулярный INSERT INTO SELECT, обновление материализованных представлений) этого достаточно.
Но в enterprise-реальности пайплайн — это не один SQL-запрос. Это цепочка:
- Забрать данные из API / S3 / Kafka
- Трансформировать (dbt, Spark, Python)
- Загрузить в Doris (Stream Load / INSERT)
- Проверить качество данных (Great Expectations, Soda)
- Обновить витрины и материализованные представления
- Уведомить команду в Slack / Telegram при ошибке
Job Scheduler этого не умеет. Airflow — умеет. Он создан для оркестрации сложных DAG-ов с зависимостями, ретраями, алертами и мониторингом.
Правило: используйте Job Scheduler для внутренних задач Doris (агрегации, materialized views). Используйте Airflow для всего, что выходит за пределы одного SQL-запроса.
2. Подключение: MySQL Protocol = Zero Custom Code
Главная новость: официального Airflow Provider для Apache Doris не существует. И он не нужен.
Doris/VeloDB полностью совместим с MySQL Protocol. Это значит, что стандартный apache-airflow-providers-mysql работает из коробки. Вы подключаетесь к Doris как к обычному MySQL.
# pip install apache-airflow-providers-mysql # Airflow UI → Admin → Connections → Add Connection Id: doris_default
Connection Type: MySQL
Host: your-doris-fe-host Schema: your_database Login: root Port: 9030 # Doris MySQL port (не 3306!) 3. Три паттерна DAG для Doris
Паттерн 1: SQL-запросы через MySqlOperator
Самый простой вариант — выполнять SQL напрямую. Подходит для создания таблиц, обновления витрин, INSERT INTO SELECT.
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from datetime import datetime
with DAG("doris_refresh_marts",
schedule="0 6 * * *", # каждый день в 6:00
start_date=datetime(2026, 1, 1),
catchup=False):
refresh_sales = MySqlOperator(
task_id="refresh_sales_mart",
mysql_conn_id="doris_default",
sql="""
INSERT INTO sales_mart
SELECT date, region, SUM(amount)
FROM raw_transactions
WHERE date = CURRENT_DATE() - 1
GROUP BY date, region;
"""
)
refresh_users = MySqlOperator(
task_id="refresh_user_profiles",
mysql_conn_id="doris_default",
sql="REFRESH MATERIALIZED VIEW mv_user_profiles;"
)
refresh_sales >> refresh_users Паттерн 2: Полный ETL-пайплайн
Когда данные нужно сначала забрать из внешнего источника, трансформировать и загрузить. Классический Extract → Transform → Load.
from airflow.decorators import dag, task
from airflow.providers.mysql.operators.mysql import MySqlOperator
@dag(schedule="@hourly", catchup=False)
def api_to_doris():
@task()
def extract():
# Забираем данные из REST API import httpx
resp = httpx.get("https://api.example.com/events")
return resp.json()
@task()
def transform(raw_data):
# Чистим, маппим поля return [
(e["id"], e["type"], e["amount"])
for e in raw_data
if e["status"] == "completed"
]
@task()
def load(rows):
# Stream Load через HTTP API Doris import httpx, csv, io
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(rows)
httpx.put(
"http://doris-fe:8030/api/db/tbl/_stream_load",
content=buf.getvalue(),
headers={"format": "csv"},
auth=("root", "")
)
data = extract()
clean = transform(data)
load(clean)
api_to_doris() Паттерн 3: Оркестрация Lakehouse (Iceberg + Doris)
Когда Doris работает как query engine поверх Data Lake (Iceberg/Hudi). Airflow координирует: Spark пишет данные в Iceberg, Doris читает через каталог.
# Spark записывает → Iceberg хранит → Doris читает
spark_write = SparkSubmitOperator(
task_id="spark_to_iceberg",
application="/jobs/etl_to_iceberg.py",
conf={"spark.sql.catalog.lakehouse":
"org.apache.iceberg.spark.SparkCatalog"}
)
refresh_catalog = MySqlOperator(
task_id="refresh_doris_catalog",
mysql_conn_id="doris_default",
sql="REFRESH CATALOG iceberg_catalog;"
)
check_freshness = MySqlOperator(
task_id="check_data_freshness",
mysql_conn_id="doris_default",
sql="""SELECT COUNT(*) FROM iceberg_catalog.db.events
WHERE dt = CURRENT_DATE();"""
)
spark_write >> refresh_catalog >> check_freshness 4. Stream Load через HTTP — загрузка больших объёмов
Для загрузки файлов (CSV, JSON, Parquet) в Doris используется Stream Load API — HTTP PUT-запрос на FE-ноду. Это самый быстрый способ bulk-загрузки.
Из Airflow вызывается через SimpleHttpOperator или PythonOperator с библиотекой httpx/requests.
# curl-эквивалент:
curl -u root: -H "format: csv" \
-H "column_separator: ," \
-T data.csv \
http://doris-fe:8030/api/mydb/mytable/_stream_load
# Airflow PythonOperator: def stream_load_to_doris(**ctx):
import httpx
with open("/data/export.csv", "rb") as f:
resp = httpx.put(
"http://doris-fe:8030/api/mydb/mytable/_stream_load",
content=f.read(),
headers={ "format": "csv",
"column_separator": ",",
"max_filter_ratio": "0.1" },
auth=("root", ""),
timeout=300
)
assert resp.json()["Status"] == "Success" 5. Когда что использовать — матрица решений
| Задача | Инструмент | Почему |
|---|---|---|
| Обновление MV по расписанию | Doris Job Scheduler | Один SQL, нет зависимостей |
| ETL из внешних API | Airflow | Python, ретраи, алерты |
| Spark → Iceberg → Doris | Airflow | Зависимости между системами |
| Data Quality проверки | Airflow + Great Expectations | Интеграция с GE/Soda |
| CDC синхронизация | Flink CDC / Kafka Connect | Real-time, не batch |
FAQ
Есть ли официальный Airflow Provider для Doris?
Нет, и он не нужен. Doris совместим с MySQL Protocol, поэтому стандартный apache-airflow-providers-mysql работает из коробки. Подключение на порт 9030 (Doris FE).
Какой порт использовать для подключения?
9030 — это MySQL Protocol порт Doris FE. 8030 — HTTP-порт для Stream Load API. Не путайте с 3306 (стандартный MySQL).
Можно ли использовать dbt вместо чистого SQL в Airflow?
Да. dbt поддерживает Doris через адаптер dbt-doris. Airflow может вызывать dbt через BashOperator или специализированный DbtCloudRunJobOperator.
Как мониторить пайплайны?
Airflow предоставляет Web UI с визуализацией DAG, логами задач, алертами в Slack/Email/Telegram. Для мониторинга самого Doris используйте Grafana + Prometheus.
Хотите настроить Airflow + VeloDB на ваших данных?
./ЗАПРОСИТЬ_ПИЛОТ.shАрхитектура, сценарии использования, ключевые возможности. 5000+ компаний в production.
Data Warehouse vs Data Lake vs Lakehouse. Open table formats. Пять неочевидных преимуществ.
SuperSet, PowerBI, Tableau тормозит? Sub-100ms, Auto Query Rewrite, бесплатный пилот.
Kafka -> Doris: standalone/distributed, SSL, DLQ, schema evolution и best practices.