[VELODB.IO]
DATANOMIX.PRO // ИНТЕГРАЦИИ // REAL-TIME CDC

Flink CDC + Apache Doris: real-time пайплайн из OLTP в аналитику

Полная синхронизация баз, schema evolution, auto-table creation. Практический гайд.

Подготовлено:
Datanomix.pro
Время чтения:
~10 мин

Change Data Capture (CDC) — механизм захвата изменений из OLTP-баз (MySQL, PostgreSQL, Oracle) в реальном времени. Вместо batch ETL раз в час/день, данные попадают в аналитику за секунды.

Типичные сценарии, где CDC критичен:

  • Антифрод: транзакция должна попасть в risk-модель в течение секунд, не минут.
  • Операционная аналитика: дашборды отражают текущее состояние бизнеса, а не вчерашний снимок.
  • Консолидация данных: десятки OLTP-источников сливаются в единое хранилище без ручного ETL.
  • Compliance: аудиторский след изменений без потерь.

Apache Flink CDC + Apache Doris — стандартная связка для этой задачи. Flink захватывает binlog/WAL, трансформирует данные и загружает в Doris через нативный коннектор.

Типичный real-time CDC пайплайн состоит из четырёх слоёв:

Data CollectionFlink CDC читает binlog (MySQL), WAL (PostgreSQL) или LogMiner (Oracle). Поддерживается полная и инкрементальная синхронизация.
Data ProcessingApache Flink: трансформации, фильтрация, обогащение, multi-stream join. Exactly-once семантика через чекпоинты.
Data LoadingFlink Doris Connector: batch-накопление в Flink, затем Stream Load в Doris. Поддержка UPSERT, DELETE, partial update.
Data ServingApache Doris: SQL-аналитика, BI-дашборды, ad-hoc запросы. MySQL-протокол для любого BI-инструмента.

Ключевое преимущество: Flink CDC не требует отдельного Kafka-кластера для CDC. Flink читает binlog напрямую, что упрощает архитектуру и снижает latency.

Нативный коннектор Apache Doris для Flink (версия 26.0.0, поддержка Flink 1.15–2.2):

  • Stream Load: batch-запись с configurable batch size и interval.
  • UPSERT/DELETE: поддержка Unique Key таблиц Doris для корректного CDC.
  • Partial Column Update: обновление выбранных колонок без перезаписи всей строки.
  • Auto Table Creation: автоматическое создание таблиц в Doris при первом sync (Flink CDC 3.0+).
  • Whole Database Sync: синхронизация всей базы одним джобом, включая DDL-операции.
  • ADBC Protocol: прямое чтение данных из BE-нод, минуя FE-ботнек (Doris 2.1+).

Connector доступен через Maven Central и совместим с Java 8 (Flink 1.x) и Java 17 (Flink 2.x).

Одна из главных проблем CDC-пайплайнов — изменение схемы в источнике. Добавили колонку в MySQL — и ETL сломался.

Flink CDC 3.0+ решает это через Schema Evolution:

  • ADD COLUMN: новая колонка автоматически добавляется в таблицу Doris.
  • RENAME COLUMN: переименование пробрасывается downstream.
  • DROP COLUMN: обрабатывается с настраиваемой политикой (игнор или каскад).
  • ALTER TYPE: расширение типа (VARCHAR(50) → VARCHAR(200)) применяется автоматически.

Это критично для production: DBA меняют схему MySQL, и аналитическое хранилище адаптируется без ручного вмешательства.

Real-time ETL

Flink читает CDC из MySQL, трансформирует (JOIN с dimension tables, фильтрация, агрегация) и пишет в Doris. Классический паттерн для операционной аналитики.

Lookup Join

Стриминговые данные обогащаются справочниками из Doris: Flink делает асинхронный batched lookup в Doris, не блокируя основной поток. Подходит для real-time enrichment.

Whole Database Sync

Один Flink-джоб синхронизирует все таблицы MySQL-базы в Doris: initial snapshot + incremental CDC. Идеально для миграции legacy DWH.

Минимальный пример Flink SQL для синхронизации таблицы транзакций:

// FLINK SQL: CDC ИЗ MYSQL В DORIS -- Source: MySQL CDC CREATE TABLE source_transactions ( id BIGINT, amount DECIMAL(18,2), status VARCHAR(20), region VARCHAR(50), tx_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'mysql-host', 'port' = '3306', 'database-name' = 'payments', 'table-name' = 'transactions' ); -- Sink: Apache Doris CREATE TABLE doris_transactions ( id BIGINT, amount DECIMAL(18,2), status VARCHAR(20), region VARCHAR(50), tx_time TIMESTAMP(3) ) WITH ( 'connector' = 'doris', 'fenodes' = 'doris-fe:8030', 'table.identifier' = 'analytics.transactions', 'sink.enable-delete' = 'true', 'sink.properties.format' = 'json', 'sink.label-prefix' = 'flink_cdc_tx' ); INSERT INTO doris_transactions SELECT * FROM source_transactions;

Этот джоб читает binlog MySQL, автоматически обрабатывает INSERT, UPDATE и DELETE, и пишет результат в Doris Unique Key таблицу с exactly-once гарантией.

Рекомендации для production-эксплуатации Flink CDC + Doris:

  1. Checkpoint interval: 60–120 секунд. Слишком частые чекпоинты увеличивают нагрузку на источник.
  2. Batch size: 50K–200K строк или 10–50 MB. Doris Stream Load эффективнее на крупных batch-ах.
  3. Parallelism: равен числу таблиц-источников или числу BE-нод Doris (что меньше).
  4. Мониторинг: Flink Web UI + Doris FE Audit Log. Ключевые метрики: checkpoint duration, records lag, Stream Load latency.
  5. Backpressure: если Flink показывает backpressure на Doris sink — увеличивайте batch interval или BE-ресурсы.
  6. Exactly-once: обязательно для финансовых данных. Включается через Flink two-phase commit + Doris label mechanism.

Хотите построить real-time CDC пайплайн?

./ЗАПРОСИТЬ_АРХИТЕКТУРУ.sh
© 2026 DATANOMIX.PRO — ЭКСКЛЮЗИВНЫЙ ПАРТНЁР VELODB В ЦЕНТРАЛЬНОЙ АЗИИ
VeloDB — Real-Time Analytics /ГЛАВНАЯ