1. Аналитикалық қоймада CDC не үшін қажет
Change Data Capture (CDC) — OLTP-базалардан (MySQL, PostgreSQL, Oracle) өзгерістерді нақты уақытта ұстау механизмі. Сағат/күн сайынғы batch ETL орнына деректер секунд ішінде аналитикаға түседі.
CDC маңызды болатын типтік сценарийлер:
- Антифрод: транзакция risk-моделге минуттар емес, секунд ішінде түсуі керек.
- Операциялық аналитика: дашбордтар бизнестің ағымдағы күйін көрсетеді, кешегі snapshot-ты емес.
- Деректерді біріктіру: ондаған OLTP-дереккөз қолмен ETL-сіз бір қоймаға құйылады.
- Compliance: өзгерістердің аудиторлық ізі жоғалтусыз.
Apache Flink CDC + Apache Doris — бұл тапсырма үшін стандартты байланыс. Flink binlog/WAL оқиды, деректерді трансформациялайды және нативті коннектор арқылы Doris-ке жүктейді.
2. Пайплайн архитектурасы
Типтік real-time CDC пайплайн төрт деңгейден тұрады:
| Data Collection | Flink CDC binlog (MySQL), WAL (PostgreSQL) немесе LogMiner (Oracle) оқиды. Толық және инкрементальді синхронизация қолдау. |
| Data Processing | Apache Flink: трансформациялар, сүзгілеу, байыту, multi-stream join. Чекпоинттар арқылы exactly-once семантикасы. |
| Data Loading | Flink Doris Connector: Flink-те batch-жинау, содан кейін Doris-ке Stream Load. UPSERT, DELETE, partial update қолдау. |
| Data Serving | Apache Doris: SQL-аналитика, BI-дашбордтар, ad-hoc сұраныстар. Кез келген BI-құрал үшін MySQL-протоколы. |
Негізгі артықшылық: Flink CDC CDC үшін бөлек Kafka-кластер талап етпейді. Flink binlog-ты тікелей оқиды, бұл архитектураны жеңілдетеді және latency-ді азайтады.
3. Flink Doris Connector: негізгі мүмкіндіктер
Apache Doris-тің Flink-ке арналған нативті коннекторы (26.0.0 нұсқасы, Flink 1.15–2.2 қолдау):
- Stream Load: configurable batch size және interval-мен batch-жазу.
- UPSERT/DELETE: дұрыс CDC үшін Doris Unique Key кестелерін қолдау.
- Partial Column Update: бүкіл жолды қайта жазбай таңдаулы колонкаларды жаңарту.
- Auto Table Creation: бірінші sync кезінде Doris-те кестелерді автоматты құру (Flink CDC 3.0+).
- Whole Database Sync: бір джобпен бүкіл базаны синхронизациялау, DDL-операцияларды қоса.
- ADBC Protocol: FE-ботнекті айналып, BE-нодтардан тікелей деректер оқу (Doris 2.1+).
Коннектор Maven Central арқылы қолжетімді, Java 8 (Flink 1.x) және Java 17 (Flink 2.x) үйлесімді.
4. Schema Evolution: кестелер өзгереді — пайплайн жұмыс істейді
CDC-пайплайндардың басты мәселелерінің бірі — дереккөздегі схеманың өзгеруі. MySQL-ға колонка қостыңыз — ETL бұзылды.
Flink CDC 3.0+ мұны Schema Evolution арқылы шешеді:
- ADD COLUMN: жаңа колонка Doris кестесіне автоматты қосылады.
- RENAME COLUMN: атауды өзгерту downstream-ға өтеді.
- DROP COLUMN: баптауға байланысты саясатпен өңделеді (елемеу немесе каскад).
- ALTER TYPE: тип кеңейтілуі (VARCHAR(50) → VARCHAR(200)) автоматты қолданылады.
Production үшін маңызды: DBA MySQL схемасын өзгертеді, аналитикалық қойма қолмен араласусыз бейімделеді.
5. Үш қолдану паттерні
Real-time ETL
Flink MySQL-дан CDC оқиды, трансформациялайды (JOIN, сүзгілеу, агрегация) және Doris-ке жазады. Операциялық аналитика үшін классикалық паттерн.
Lookup Join
Стриминг деректер Doris-тегі анықтамалықтармен байытылады: Flink асинхронды batched lookup жасайды. Real-time enrichment үшін жарамды.
Whole Database Sync
Бір Flink-джоб MySQL-базаның барлық кестелерін Doris-ке синхронизациялайды: initial snapshot + incremental CDC. Legacy DWH-ды көшіру үшін тамаша.
6. SQL-мысал: MySQL to Doris
Транзакция кестесін синхронизациялаудың минималды Flink SQL мысалы:
// FLINK SQL: MYSQL-ДАН DORIS-КЕ CDC -- Source: MySQL CDC CREATE TABLE source_transactions (
id BIGINT,
amount DECIMAL(18,2),
status VARCHAR(20),
region VARCHAR(50),
tx_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql-host',
'port' = '3306',
'database-name' = 'payments',
'table-name' = 'transactions'
);
-- Sink: Apache Doris CREATE TABLE doris_transactions (
id BIGINT,
amount DECIMAL(18,2),
status VARCHAR(20),
region VARCHAR(50),
tx_time TIMESTAMP(3)
) WITH (
'connector' = 'doris',
'fenodes' = 'doris-fe:8030',
'table.identifier' = 'analytics.transactions',
'sink.enable-delete' = 'true',
'sink.properties.format' = 'json',
'sink.label-prefix' = 'flink_cdc_tx'
);
INSERT INTO doris_transactions
SELECT * FROM source_transactions; Бұл джоб MySQL binlog оқиды, INSERT, UPDATE және DELETE автоматты өңдейді, нәтижені exactly-once кепілдігімен Doris Unique Key кестесіне жазады.
7. Мониторинг және Production ұсыныстары
Flink CDC + Doris production-эксплуатациясы үшін ұсыныстар:
- Checkpoint interval: 60–120 секунд. Тым жиі чекпоинттар дереккөзге жүктеме артады.
- Batch size: 50K–200K жол немесе 10–50 MB. Doris Stream Load ірі batch-тарда тиімдірек.
- Parallelism: дереккөз-кестелер немесе Doris BE-нодтар санына тең (қайсысы аз болса).
- Мониторинг: Flink Web UI + Doris FE Audit Log. Негізгі метрикалар: checkpoint duration, records lag, Stream Load latency.
- Backpressure: Flink Doris sink-те backpressure көрсетсе — batch interval немесе BE-ресурстарды арттырыңыз.
- Exactly-once: қаржы деректері үшін міндетті. Flink two-phase commit + Doris label mechanism арқылы қосылады.