1. Зачем CDC в аналитическом хранилище
Change Data Capture (CDC) — механизм захвата изменений из OLTP-баз (MySQL, PostgreSQL, Oracle) в реальном времени. Вместо batch ETL раз в час/день, данные попадают в аналитику за секунды.
Типичные сценарии, где CDC критичен:
- Антифрод: транзакция должна попасть в risk-модель в течение секунд, не минут.
- Операционная аналитика: дашборды отражают текущее состояние бизнеса, а не вчерашний снимок.
- Консолидация данных: десятки OLTP-источников сливаются в единое хранилище без ручного ETL.
- Compliance: аудиторский след изменений без потерь.
Apache Flink CDC + Apache Doris — стандартная связка для этой задачи. Flink захватывает binlog/WAL, трансформирует данные и загружает в Doris через нативный коннектор.
2. Архитектура пайплайна
Типичный real-time CDC пайплайн состоит из четырёх слоёв:
| Data Collection | Flink CDC читает binlog (MySQL), WAL (PostgreSQL) или LogMiner (Oracle). Поддерживается полная и инкрементальная синхронизация. |
| Data Processing | Apache Flink: трансформации, фильтрация, обогащение, multi-stream join. Exactly-once семантика через чекпоинты. |
| Data Loading | Flink Doris Connector: batch-накопление в Flink, затем Stream Load в Doris. Поддержка UPSERT, DELETE, partial update. |
| Data Serving | Apache Doris: SQL-аналитика, BI-дашборды, ad-hoc запросы. MySQL-протокол для любого BI-инструмента. |
Ключевое преимущество: Flink CDC не требует отдельного Kafka-кластера для CDC. Flink читает binlog напрямую, что упрощает архитектуру и снижает latency.
3. Flink Doris Connector: ключевые возможности
Нативный коннектор Apache Doris для Flink (версия 26.0.0, поддержка Flink 1.15–2.2):
- Stream Load: batch-запись с configurable batch size и interval.
- UPSERT/DELETE: поддержка Unique Key таблиц Doris для корректного CDC.
- Partial Column Update: обновление выбранных колонок без перезаписи всей строки.
- Auto Table Creation: автоматическое создание таблиц в Doris при первом sync (Flink CDC 3.0+).
- Whole Database Sync: синхронизация всей базы одним джобом, включая DDL-операции.
- ADBC Protocol: прямое чтение данных из BE-нод, минуя FE-ботнек (Doris 2.1+).
Connector доступен через Maven Central и совместим с Java 8 (Flink 1.x) и Java 17 (Flink 2.x).
4. Schema Evolution: таблицы меняются — пайплайн работает
Одна из главных проблем CDC-пайплайнов — изменение схемы в источнике. Добавили колонку в MySQL — и ETL сломался.
Flink CDC 3.0+ решает это через Schema Evolution:
- ADD COLUMN: новая колонка автоматически добавляется в таблицу Doris.
- RENAME COLUMN: переименование пробрасывается downstream.
- DROP COLUMN: обрабатывается с настраиваемой политикой (игнор или каскад).
- ALTER TYPE: расширение типа (VARCHAR(50) → VARCHAR(200)) применяется автоматически.
Это критично для production: DBA меняют схему MySQL, и аналитическое хранилище адаптируется без ручного вмешательства.
5. Три паттерна использования
Real-time ETL
Flink читает CDC из MySQL, трансформирует (JOIN с dimension tables, фильтрация, агрегация) и пишет в Doris. Классический паттерн для операционной аналитики.
Lookup Join
Стриминговые данные обогащаются справочниками из Doris: Flink делает асинхронный batched lookup в Doris, не блокируя основной поток. Подходит для real-time enrichment.
Whole Database Sync
Один Flink-джоб синхронизирует все таблицы MySQL-базы в Doris: initial snapshot + incremental CDC. Идеально для миграции legacy DWH.
6. SQL-пример: MySQL to Doris
Минимальный пример Flink SQL для синхронизации таблицы транзакций:
// FLINK SQL: CDC ИЗ MYSQL В DORIS -- Source: MySQL CDC CREATE TABLE source_transactions (
id BIGINT,
amount DECIMAL(18,2),
status VARCHAR(20),
region VARCHAR(50),
tx_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql-host',
'port' = '3306',
'database-name' = 'payments',
'table-name' = 'transactions'
);
-- Sink: Apache Doris CREATE TABLE doris_transactions (
id BIGINT,
amount DECIMAL(18,2),
status VARCHAR(20),
region VARCHAR(50),
tx_time TIMESTAMP(3)
) WITH (
'connector' = 'doris',
'fenodes' = 'doris-fe:8030',
'table.identifier' = 'analytics.transactions',
'sink.enable-delete' = 'true',
'sink.properties.format' = 'json',
'sink.label-prefix' = 'flink_cdc_tx'
);
INSERT INTO doris_transactions
SELECT * FROM source_transactions; Этот джоб читает binlog MySQL, автоматически обрабатывает INSERT, UPDATE и DELETE, и пишет результат в Doris Unique Key таблицу с exactly-once гарантией.
7. Мониторинг и Production Best Practices
Рекомендации для production-эксплуатации Flink CDC + Doris:
- Checkpoint interval: 60–120 секунд. Слишком частые чекпоинты увеличивают нагрузку на источник.
- Batch size: 50K–200K строк или 10–50 MB. Doris Stream Load эффективнее на крупных batch-ах.
- Parallelism: равен числу таблиц-источников или числу BE-нод Doris (что меньше).
- Мониторинг: Flink Web UI + Doris FE Audit Log. Ключевые метрики: checkpoint duration, records lag, Stream Load latency.
- Backpressure: если Flink показывает backpressure на Doris sink — увеличивайте batch interval или BE-ресурсы.
- Exactly-once: обязательно для финансовых данных. Включается через Flink two-phase commit + Doris label mechanism.