Kafka
Bu sayfa bir servis kontratı değil, Cınga mimarisindeki servisler arası event taşıma katmanının (Kafka) teknik referansıdır.
Amaç; hangi servis hangi topic’e yazar, hangi servis hangi topic’ten okur, payload içinde hangi alanlar taşınır ve hata akışı nasıl yönetilir sorularını tek yerden netleştirmektir.
Rol ve Sınır
Kafka katmanı:
- Servisler arası tetik ve durum eventlerini taşır.
- Büyük ham payload’ı taşımaz; ham/kısa geçmiş Redis + DB tarafında kalır.
- Device-level ordering sağlar (partition key =
device_id).
Kafka katmanı yapmaz:
- İş kuralı hesaplaması
- Kalibrasyon/sentez/window hesaplama
- Uzun süreli state saklama
Akış Özeti (Producer → Topic → Consumer)
- Ingest Servisi ->
cinga.ingest.raw-> Stream Servisi - Stream Servisi ->
cinga.stream.assembled-> Calibration Servisi - Calibration Servisi ->
cinga.calibration.ready-> Raw Writer Servisi - Raw Writer Servisi ->
cinga.raw.persisted-> Sentez Servisi - Sentez Servisi ->
cinga.synth.ready-> Window Servisi - Window Servisi ->
cinga.window.ready-> (opsiyonel) Score/Reporting katmanı - Heartbeat Servisi ->
cinga.heartbeat.*-> Observer Servisi
Hata akışları:
- Her servis kendi
cinga.*.failedtopic’ine hata eventi üretir. - Retry limiti aşılırsa
cinga.dlq.*topic’ine taşınır.
Producer / Consumer Matrisi
| Topic | Producer | Consumer | Amaç |
|---|---|---|---|
cinga.ingest.raw | Ingest Servisi | Stream Servisi | Cihaz paketi kabul edildi tetik olayı |
cinga.stream.assembled | Stream Servisi | Calibration Servisi | Stream birleştirme/finalize tamamlandı |
cinga.calibration.ready | Calibration Servisi | Raw Writer Servisi | Kalibrasyon tamamlandı |
cinga.raw.persisted | Raw Writer Servisi | Sentez Servisi | Ham segment DB yazımı tamamlandı |
cinga.synth.ready | Sentez Servisi | Window Servisi | Sentez hesapları tamamlandı |
cinga.window.ready | Window Servisi | Opsiyonel downstream | Window güncelleme/finalize tamamlandı |
cinga.ingest.failed | Ingest Servisi | Ops/Replay Worker | Ingest aşaması hata |
cinga.stream.failed | Stream Servisi | Ops/Replay Worker | Stream aşaması hata |
cinga.calibration.failed | Calibration Servisi | Ops/Replay Worker | Calibration aşaması hata |
cinga.raw.failed | Raw Writer Servisi | Ops/Replay Worker | Raw Writer aşaması hata |
cinga.synth.failed | Sentez Servisi | Ops/Replay Worker | Sentez aşaması hata |
cinga.window.failed | Window Servisi | Ops/Replay Worker | Window aşaması hata |
cinga.dlq.* | Retry/Worker katmanı | Ops/Replay Worker | Kalıcı hata izolasyonu |
Ordering, Partition, Consumer Group Kuralları
- Partition key daima
device_idolmalıdır. - Aynı
device_ideventleri aynı partition içinde sıralı kalmalıdır. - Consumer group’lar servis bazında ayrılmalıdır:
cg.cinga.streamcg.cinga.calibrationcg.cinga.raw-writercg.cinga.synthcg.cinga.windowcg.cinga.observer(read-only gözlem)
- Yatay ölçek aynı consumer group içinde instance artırılarak yapılır.
- Observer group’u ana iş akışını etkilemez; yalnızca ölçüm için tüketim yapar.
Event Envelope Standardı
Her event payload’ı en az şu alanları taşımalıdır:
{
"event": "<name>.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "synth-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z"
},
"error": null
}
Notlar:
stream_idingest öncesi aşamadanullolabilir.- Observer için eklenen ölçüm alanları
observerobjesi altında gruplanmalıdır. - Süre alanlarında tek standart kullanılır:
observer.process_ms. write_msgibi stage-özel süre alanları örnek payload'larda kullanılmaz.- Event versiyonu payload sözleşmesidir (
*.v1).
Topic Veri Yapıları (Örnek Payloadlar)
1) cinga.ingest.raw -> ingest.received.v1
{
"event": "ingest.received.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "ingest-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"device_time": "2026-03-11T15:22:06Z"
},
"data": {
"pack": "DETAIL",
"Stream_End": false,
"received_at": "2026-03-11T15:22:08Z",
"meta": {
"imei": "354485417650889",
"iccid": "8946120802005507363",
"firmware": "1.0.7",
"ip": "x.x.x.x",
"size": 1240
}
},
"error": null
}
2) cinga.stream.assembled -> stream.assembled.v1
{
"event": "stream.assembled.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "stream-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:08Z"
},
"data": {
"is_partial": false,
"missing_variables": null,
"calc_version": 1
},
"error": null
}
3) cinga.calibration.ready -> calibration.ready.v1
{
"event": "calibration.ready.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "calibration-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:09Z"
},
"data": {
"calibration_version": 12,
"warnings": [
"CT_CONFIG_MISS"
]
},
"error": null
}
4) cinga.raw.persisted -> raw.persisted.v1
{
"event": "raw.persisted.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "raw-writer-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z"
},
"data": {
"written_segments": [
"voltage",
"current",
"power",
"energy"
]
},
"error": null
}
5) cinga.synth.ready -> synth.ready.v1
{
"event": "synth.ready.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "synth-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z"
},
"data": {
"rule_hash": "7b6f...",
"calc_version": 1,
"quality_flags": {
"skipped_rules": [
"THD_I_EQ"
]
}
},
"error": null
}
6) cinga.window.ready -> window.ready.v1
{
"event": "window.ready.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "window-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412
},
"data": {
"calc_version": 1,
"window_types_updated": [
"1D_Last",
"1W_Last",
"1M_Last"
],
"finalized_windows": [
"1D"
]
},
"error": null
}
Failed Event Standardı
Başarısız eventlerde önerilen minimum alanlar:
{
"event": "<stage>.failed.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "<stage>-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412
},
"error": {
"failed_stage": "synthesis",
"error_code": "SYNTH_DB_WRITE_FAIL",
"error_message": "upsert failed",
"retryable": true,
"failed_at": "2026-03-11T15:22:12Z"
}
}
Hata Yönetimi ve DLQ
- Retryable hatalar exponential backoff ile tekrar denenir.
- Non-retryable hatalar doğrudan
cinga.*.failed+ DLQ yoluna gider. - Retry limiti aşıldığında event
cinga.dlq.<stage>topic’ine taşınır.
Önerilen DLQ payload ek alanları:
retry_countfirst_failed_atlast_failed_atconsumer_grouppartition,offset
Redis / DB ile İlişki (Hybrid Model)
Kafka eventler hafif tutulur. Büyük state bu katmanlarda taşınır:
- Redis
device_buffer:{device_id}raw_last,raw_buffer,synth_buffer,window_buffer,state_version
- DB
streams,energy_*_measurements,energy_synth_results,energy_windows
Bu modelde event yalnızca tetikleyicidir; hesap için gereken ağır veri Redis/DB’den okunur.
Uçtan Uca Akış Diyagramı
Servis Bazlı Kısa Kontrol Listesi
- Producer tarafı
- Event schema doğrulaması
- Zorunlu alan kontrolü (
event,device_id, zaman alanları) - Partition key:
device_id
- Consumer tarafı
- Idempotent işlem
- Başarısız event üretimi (
*.failed.v1) - Retry/DLQ politikası
- Gözlemlenebilirlik
- consumer lag
- process latency
- retry count
- DLQ rate
Bu sayfa güncel tutulduğunda, servis ekipleri arasında event sözleşmesi kaynaklı entegrasyon sorunları ciddi biçimde azalır.