1. Analitik omborida CDC nima uchun kerak
Change Data Capture (CDC) — OLTP-bazalardan (MySQL, PostgreSQL, Oracle) oʼzgarishlarni real vaqtda ushlash mexanizmi. Soat/kun sayingi batch ETL oʼrniga maʼlumotlar soniyalar ichida analitikaga tushadi.
CDC muhim boʼladigan tipik ssenariylar:
- Antifraud: tranzaksiya risk-modelga minutlar emas, soniyalar ichida tushishi kerak.
- Operatsion analitika: dashboardlar biznesning joriy holatini koʼrsatadi, kechagi snapshotni emas.
- Maʼlumotlarni birlashtirish: oʼnlab OLTP-manbalar qoʼlda ETL siz bitta omborga quyiladi.
- Compliance: oʼzgarishlarning audit izi yoʼqolishsiz.
Apache Flink CDC + Apache Doris — bu vazifa uchun standart bogʼlam. Flink binlog/WAL oʼqiydi, maʼlumotlarni transformatsiya qiladi va nativ konnector orqali Doris ga yuklaydi.
2. Pipeline arxitekturasi
Tipik real-time CDC pipeline toʼrt qatlamdan iborat:
| Data Collection | Flink CDC binlog (MySQL), WAL (PostgreSQL) yoki LogMiner (Oracle) oʼqiydi. Toʼliq va inkremental sinxronizatsiya qoʼllanadi. |
| Data Processing | Apache Flink: transformatsiyalar, filtrlash, boyitish, multi-stream join. Checkpointlar orqali exactly-once semantikasi. |
| Data Loading | Flink Doris Connector: Flink da batch-toʼplash, keyin Doris ga Stream Load. UPSERT, DELETE, partial update qoʼllanadi. |
| Data Serving | Apache Doris: SQL-analitika, BI-dashboardlar, ad-hoc soʼrovlar. Har qanday BI-asbob uchun MySQL-protokol. |
Asosiy afzallik: Flink CDC CDC uchun alohida Kafka-klaster talab qilmaydi. Flink binlog ni toʼgʼridan-toʼgʼri oʼqiydi, bu arxitekturani soddalashtiradi.
3. Flink Doris Connector: asosiy imkoniyatlar
Apache Doris ning Flink uchun nativ konnektori (26.0.0 versiyasi, Flink 1.15–2.2 qoʼllanadi):
- Stream Load: sozlanadigan batch size va interval bilan batch-yozish.
- UPSERT/DELETE: toʼgʼri CDC uchun Doris Unique Key jadvallarini qoʼllash.
- Partial Column Update: butun qatorni qayta yozmasdan tanlangan ustunlarni yangilash.
- Auto Table Creation: birinchi sync da Doris da jadvallarni avtomatik yaratish (Flink CDC 3.0+).
- Whole Database Sync: bitta job bilan butun bazani sinxronizatsiya, DDL-operatsiyalarni qoʼshgan holda.
- ADBC Protocol: FE-bottleneck ni chetlab oʼtib, BE-nodlardan toʼgʼridan-toʼgʼri maʼlumot oʼqish (Doris 2.1+).
Konnector Maven Central orqali mavjud, Java 8 (Flink 1.x) va Java 17 (Flink 2.x) bilan mos.
4. Schema Evolution: jadvallar oʼzgaradi — pipeline ishlaydi
CDC-pipelinelarning asosiy muammolaridan biri — manbadagi sxemaning oʼzgarishi. MySQL ga ustun qoʼshdingiz — ETL buzildi.
Flink CDC 3.0+ buni Schema Evolution orqali hal qiladi:
- ADD COLUMN: yangi ustun Doris jadvaliga avtomatik qoʼshiladi.
- RENAME COLUMN: nom oʼzgarishi downstream ga oʼtadi.
- DROP COLUMN: sozlanadigan siyosat bilan qayta ishlanadi (eʼtiborga olmaslik yoki kaskad).
- ALTER TYPE: tip kengayishi (VARCHAR(50) → VARCHAR(200)) avtomatik qoʼllaniladi.
Production uchun muhim: DBA MySQL sxemasini oʼzgartiradi, analitik ombor qoʼlda aralashusiz moslashadi.
5. Uchta foydalanish patterni
Real-time ETL
Flink MySQL dan CDC oʼqiydi, transformatsiya qiladi (JOIN, filtrlash, agregatsiya) va Doris ga yozadi. Operatsion analitika uchun klassik pattern.
Lookup Join
Streaming maʼlumotlar Doris dagi maʼlumotnomalar bilan boyitiladi: Flink asinxron batched lookup qiladi. Real-time enrichment uchun mos.
Whole Database Sync
Bitta Flink-job MySQL-bazaning barcha jadvallarini Doris ga sinxronizatsiya qiladi: initial snapshot + incremental CDC. Legacy DWH koʼchirish uchun ideal.
6. SQL-misol: MySQL to Doris
Tranzaksiya jadvalini sinxronizatsiya qilishning minimal Flink SQL misoli:
// FLINK SQL: MYSQL DAN DORIS GA CDC -- Source: MySQL CDC CREATE TABLE source_transactions (
id BIGINT,
amount DECIMAL(18,2),
status VARCHAR(20),
region VARCHAR(50),
tx_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql-host',
'port' = '3306',
'database-name' = 'payments',
'table-name' = 'transactions'
);
-- Sink: Apache Doris CREATE TABLE doris_transactions (
id BIGINT,
amount DECIMAL(18,2),
status VARCHAR(20),
region VARCHAR(50),
tx_time TIMESTAMP(3)
) WITH (
'connector' = 'doris',
'fenodes' = 'doris-fe:8030',
'table.identifier' = 'analytics.transactions',
'sink.enable-delete' = 'true',
'sink.properties.format' = 'json',
'sink.label-prefix' = 'flink_cdc_tx'
);
INSERT INTO doris_transactions
SELECT * FROM source_transactions; Bu job MySQL binlog oʼqiydi, INSERT, UPDATE va DELETE avtomatik qayta ishlaydi, natijani exactly-once kafolat bilan Doris Unique Key jadvaliga yozadi.
7. Monitoring va Production tavsiyalari
Flink CDC + Doris production-ekspluatatsiyasi uchun tavsiyalar:
- Checkpoint interval: 60–120 soniya. Haddan tashqari tez-tez checkpointlar manbaga yukni oshiradi.
- Batch size: 50K–200K qator yoki 10–50 MB. Doris Stream Load katta batchlarda samaraliroq.
- Parallelism: manba-jadvallar yoki Doris BE-nodlar soniga teng (qaysi biri kamroq boʼlsa).
- Monitoring: Flink Web UI + Doris FE Audit Log. Asosiy metrikalar: checkpoint duration, records lag, Stream Load latency.
- Backpressure: Flink Doris sink da backpressure koʼrsatsa — batch interval yoki BE-resurslarni oshiring.
- Exactly-once: moliyaviy maʼlumotlar uchun majburiy. Flink two-phase commit + Doris label mechanism orqali yoqiladi.