[VELODB.IO]
DATANOMIX.PRO // БЛОГ // DATA ENGINEERING

Airflow + Apache Doris / VeloDB

Как оркестрировать пайплайны в Lakehouse через MySQL Protocol — без кастомного провайдера

Подготовлено:
Datanomix.pro
Время чтения:
~10 мин
СОДЕРЖАНИЕ:
01 / Зачем Airflow, если есть Job Scheduler
02 / Подключение через MySQL Protocol
03 / Три паттерна DAG
04 / Stream Load через HTTP
05 / Когда что использовать
FAQ

1. Зачем Airflow, если в Doris есть встроенный Job Scheduler

С версии 2.1 Apache Doris имеет встроенный Job Scheduler — можно планировать SQL-задачи по расписанию прямо внутри движка. Для простых сценариев (регулярный INSERT INTO SELECT, обновление материализованных представлений) этого достаточно.

Но в enterprise-реальности пайплайн — это не один SQL-запрос. Это цепочка:

  • Забрать данные из API / S3 / Kafka
  • Трансформировать (dbt, Spark, Python)
  • Загрузить в Doris (Stream Load / INSERT)
  • Проверить качество данных (Great Expectations, Soda)
  • Обновить витрины и материализованные представления
  • Уведомить команду в Slack / Telegram при ошибке

Job Scheduler этого не умеет. Airflow — умеет. Он создан для оркестрации сложных DAG-ов с зависимостями, ретраями, алертами и мониторингом.

Правило: используйте Job Scheduler для внутренних задач Doris (агрегации, materialized views). Используйте Airflow для всего, что выходит за пределы одного SQL-запроса.

2. Подключение: MySQL Protocol = Zero Custom Code

Главная новость: официального Airflow Provider для Apache Doris не существует. И он не нужен.

Doris/VeloDB полностью совместим с MySQL Protocol. Это значит, что стандартный apache-airflow-providers-mysql работает из коробки. Вы подключаетесь к Doris как к обычному MySQL.

// AIRFLOW CONNECTION SETUP
# pip install apache-airflow-providers-mysql # Airflow UI → Admin → Connections → Add Connection Id: doris_default Connection Type: MySQL Host: your-doris-fe-host Schema: your_database Login: root Port: 9030 # Doris MySQL port (не 3306!)
Порт 9030, не 3306. Doris FE слушает MySQL Protocol на порту 9030 по умолчанию.

3. Три паттерна DAG для Doris

Паттерн 1: SQL-запросы через MySqlOperator

Самый простой вариант — выполнять SQL напрямую. Подходит для создания таблиц, обновления витрин, INSERT INTO SELECT.

// DAG: SQL VIA MYSQL OPERATOR
from airflow import DAG from airflow.providers.mysql.operators.mysql import MySqlOperator from datetime import datetime with DAG("doris_refresh_marts", schedule="0 6 * * *", # каждый день в 6:00 start_date=datetime(2026, 1, 1), catchup=False): refresh_sales = MySqlOperator( task_id="refresh_sales_mart", mysql_conn_id="doris_default", sql=""" INSERT INTO sales_mart SELECT date, region, SUM(amount) FROM raw_transactions WHERE date = CURRENT_DATE() - 1 GROUP BY date, region; """ ) refresh_users = MySqlOperator( task_id="refresh_user_profiles", mysql_conn_id="doris_default", sql="REFRESH MATERIALIZED VIEW mv_user_profiles;" ) refresh_sales >> refresh_users

Паттерн 2: Полный ETL-пайплайн

Когда данные нужно сначала забрать из внешнего источника, трансформировать и загрузить. Классический Extract → Transform → Load.

// DAG: ETL PIPELINE (API → TRANSFORM → DORIS)
from airflow.decorators import dag, task from airflow.providers.mysql.operators.mysql import MySqlOperator @dag(schedule="@hourly", catchup=False) def api_to_doris(): @task() def extract(): # Забираем данные из REST API import httpx resp = httpx.get("https://api.example.com/events") return resp.json() @task() def transform(raw_data): # Чистим, маппим поля return [ (e["id"], e["type"], e["amount"]) for e in raw_data if e["status"] == "completed" ] @task() def load(rows): # Stream Load через HTTP API Doris import httpx, csv, io buf = io.StringIO() writer = csv.writer(buf) writer.writerows(rows) httpx.put( "http://doris-fe:8030/api/db/tbl/_stream_load", content=buf.getvalue(), headers={"format": "csv"}, auth=("root", "") ) data = extract() clean = transform(data) load(clean) api_to_doris()

Паттерн 3: Оркестрация Lakehouse (Iceberg + Doris)

Когда Doris работает как query engine поверх Data Lake (Iceberg/Hudi). Airflow координирует: Spark пишет данные в Iceberg, Doris читает через каталог.

// DAG: LAKEHOUSE ORCHESTRATION
# Spark записывает → Iceberg хранит → Doris читает spark_write = SparkSubmitOperator( task_id="spark_to_iceberg", application="/jobs/etl_to_iceberg.py", conf={"spark.sql.catalog.lakehouse": "org.apache.iceberg.spark.SparkCatalog"} ) refresh_catalog = MySqlOperator( task_id="refresh_doris_catalog", mysql_conn_id="doris_default", sql="REFRESH CATALOG iceberg_catalog;" ) check_freshness = MySqlOperator( task_id="check_data_freshness", mysql_conn_id="doris_default", sql="""SELECT COUNT(*) FROM iceberg_catalog.db.events WHERE dt = CURRENT_DATE();""" ) spark_write >> refresh_catalog >> check_freshness

4. Stream Load через HTTP — загрузка больших объёмов

Для загрузки файлов (CSV, JSON, Parquet) в Doris используется Stream Load API — HTTP PUT-запрос на FE-ноду. Это самый быстрый способ bulk-загрузки.

Из Airflow вызывается через SimpleHttpOperator или PythonOperator с библиотекой httpx/requests.

// STREAM LOAD VIA HTTP
# curl-эквивалент: curl -u root: -H "format: csv" \ -H "column_separator: ," \ -T data.csv \ http://doris-fe:8030/api/mydb/mytable/_stream_load # Airflow PythonOperator: def stream_load_to_doris(**ctx): import httpx with open("/data/export.csv", "rb") as f: resp = httpx.put( "http://doris-fe:8030/api/mydb/mytable/_stream_load", content=f.read(), headers={ "format": "csv", "column_separator": ",", "max_filter_ratio": "0.1" }, auth=("root", ""), timeout=300 ) assert resp.json()["Status"] == "Success"

5. Когда что использовать — матрица решений

Задача Инструмент Почему
Обновление MV по расписанию Doris Job Scheduler Один SQL, нет зависимостей
ETL из внешних API Airflow Python, ретраи, алерты
Spark → Iceberg → Doris Airflow Зависимости между системами
Data Quality проверки Airflow + Great Expectations Интеграция с GE/Soda
CDC синхронизация Flink CDC / Kafka Connect Real-time, не batch

FAQ

Есть ли официальный Airflow Provider для Doris?

Нет, и он не нужен. Doris совместим с MySQL Protocol, поэтому стандартный apache-airflow-providers-mysql работает из коробки. Подключение на порт 9030 (Doris FE).

Какой порт использовать для подключения?

9030 — это MySQL Protocol порт Doris FE. 8030 — HTTP-порт для Stream Load API. Не путайте с 3306 (стандартный MySQL).

Можно ли использовать dbt вместо чистого SQL в Airflow?

Да. dbt поддерживает Doris через адаптер dbt-doris. Airflow может вызывать dbt через BashOperator или специализированный DbtCloudRunJobOperator.

Как мониторить пайплайны?

Airflow предоставляет Web UI с визуализацией DAG, логами задач, алертами в Slack/Email/Telegram. Для мониторинга самого Doris используйте Grafana + Prometheus.

Хотите настроить Airflow + VeloDB на ваших данных?

./ЗАПРОСИТЬ_ПИЛОТ.sh
© 2026 DATANOMIX.PRO — ЭКСКЛЮЗИВНЫЙ ПАРТНЁР VELODB В ЦЕНТРАЛЬНОЙ АЗИИ
VeloDB — Data Engineering ГЛАВНАЯ