Skip to main content

Kafka

Bu sayfa bir servis kontratı değil, Cınga mimarisindeki servisler arası event taşıma katmanının (Kafka) teknik referansıdır.

Amaç; hangi servis hangi topic’e yazar, hangi servis hangi topic’ten okur, payload içinde hangi alanlar taşınır ve hata akışı nasıl yönetilir sorularını tek yerden netleştirmektir.

Rol ve Sınır

Kafka katmanı:

  • Servisler arası tetik ve durum eventlerini taşır.
  • Büyük ham payload’ı taşımaz; ham/kısa geçmiş Redis + DB tarafında kalır.
  • Device-level ordering sağlar (partition key = device_id).

Kafka katmanı yapmaz:

  • İş kuralı hesaplaması
  • Kalibrasyon/sentez/window hesaplama
  • Uzun süreli state saklama

Akış Özeti (Producer → Topic → Consumer)

  1. Ingest Servisi -> cinga.ingest.raw -> Stream Servisi
  2. Stream Servisi -> cinga.stream.assembled -> Calibration Servisi
  3. Calibration Servisi -> cinga.calibration.ready -> Raw Writer Servisi
  4. Raw Writer Servisi -> cinga.raw.persisted -> Sentez Servisi
  5. Sentez Servisi -> cinga.synth.ready -> Window Servisi
  6. Window Servisi -> cinga.window.ready -> (opsiyonel) Score/Reporting katmanı
  7. Heartbeat Servisi -> cinga.heartbeat.* -> Observer Servisi

Hata akışları:

  • Her servis kendi cinga.*.failed topic’ine hata eventi üretir.
  • Retry limiti aşılırsa cinga.dlq.* topic’ine taşınır.

Producer / Consumer Matrisi

TopicProducerConsumerAmaç
cinga.ingest.rawIngest ServisiStream ServisiCihaz paketi kabul edildi tetik olayı
cinga.stream.assembledStream ServisiCalibration ServisiStream birleştirme/finalize tamamlandı
cinga.calibration.readyCalibration ServisiRaw Writer ServisiKalibrasyon tamamlandı
cinga.raw.persistedRaw Writer ServisiSentez ServisiHam segment DB yazımı tamamlandı
cinga.synth.readySentez ServisiWindow ServisiSentez hesapları tamamlandı
cinga.window.readyWindow ServisiOpsiyonel downstreamWindow güncelleme/finalize tamamlandı
cinga.ingest.failedIngest ServisiOps/Replay WorkerIngest aşaması hata
cinga.stream.failedStream ServisiOps/Replay WorkerStream aşaması hata
cinga.calibration.failedCalibration ServisiOps/Replay WorkerCalibration aşaması hata
cinga.raw.failedRaw Writer ServisiOps/Replay WorkerRaw Writer aşaması hata
cinga.synth.failedSentez ServisiOps/Replay WorkerSentez aşaması hata
cinga.window.failedWindow ServisiOps/Replay WorkerWindow aşaması hata
cinga.dlq.*Retry/Worker katmanıOps/Replay WorkerKalıcı hata izolasyonu

Ordering, Partition, Consumer Group Kuralları

  • Partition key daima device_id olmalıdır.
  • Aynı device_id eventleri aynı partition içinde sıralı kalmalıdır.
  • Consumer group’lar servis bazında ayrılmalıdır:
    • cg.cinga.stream
    • cg.cinga.calibration
    • cg.cinga.raw-writer
    • cg.cinga.synth
    • cg.cinga.window
    • cg.cinga.observer (read-only gözlem)
  • Yatay ölçek aynı consumer group içinde instance artırılarak yapılır.
  • Observer group’u ana iş akışını etkilemez; yalnızca ölçüm için tüketim yapar.

Event Envelope Standardı

Her event payload’ı en az şu alanları taşımalıdır:

{
"event": "<name>.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "synth-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z"
},
"error": null
}

Notlar:

  • stream_id ingest öncesi aşamada null olabilir.
  • Observer için eklenen ölçüm alanları observer objesi altında gruplanmalıdır.
  • Süre alanlarında tek standart kullanılır: observer.process_ms.
  • write_ms gibi stage-özel süre alanları örnek payload'larda kullanılmaz.
  • Event versiyonu payload sözleşmesidir (*.v1).

Topic Veri Yapıları (Örnek Payloadlar)

1) cinga.ingest.raw -> ingest.received.v1

{
"event": "ingest.received.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "ingest-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"device_time": "2026-03-11T15:22:06Z"
},
"data": {
"pack": "DETAIL",
"Stream_End": false,
"received_at": "2026-03-11T15:22:08Z",
"meta": {
"imei": "354485417650889",
"iccid": "8946120802005507363",
"firmware": "1.0.7",
"ip": "x.x.x.x",
"size": 1240
}
},
"error": null
}

2) cinga.stream.assembled -> stream.assembled.v1

{
"event": "stream.assembled.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "stream-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:08Z"
},
"data": {
"is_partial": false,
"missing_variables": null,
"calc_version": 1
},
"error": null
}

3) cinga.calibration.ready -> calibration.ready.v1

{
"event": "calibration.ready.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "calibration-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:09Z"
},
"data": {
"calibration_version": 12,
"warnings": [
"CT_CONFIG_MISS"
]
},
"error": null
}

4) cinga.raw.persisted -> raw.persisted.v1

{
"event": "raw.persisted.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "raw-writer-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z"
},
"data": {
"written_segments": [
"voltage",
"current",
"power",
"energy"
]
},
"error": null
}

5) cinga.synth.ready -> synth.ready.v1

{
"event": "synth.ready.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "synth-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z"
},
"data": {
"rule_hash": "7b6f...",
"calc_version": 1,
"quality_flags": {
"skipped_rules": [
"THD_I_EQ"
]
}
},
"error": null
}

6) cinga.window.ready -> window.ready.v1

{
"event": "window.ready.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "window-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412
},
"data": {
"calc_version": 1,
"window_types_updated": [
"1D_Last",
"1W_Last",
"1M_Last"
],
"finalized_windows": [
"1D"
]
},
"error": null
}

Failed Event Standardı

Başarısız eventlerde önerilen minimum alanlar:

{
"event": "<stage>.failed.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "<stage>-service",
"produced_at": "2026-03-11T15:22:10.121Z",
"process_ms": 18
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412
},
"error": {
"failed_stage": "synthesis",
"error_code": "SYNTH_DB_WRITE_FAIL",
"error_message": "upsert failed",
"retryable": true,
"failed_at": "2026-03-11T15:22:12Z"
}
}

Hata Yönetimi ve DLQ

  • Retryable hatalar exponential backoff ile tekrar denenir.
  • Non-retryable hatalar doğrudan cinga.*.failed + DLQ yoluna gider.
  • Retry limiti aşıldığında event cinga.dlq.<stage> topic’ine taşınır.

Önerilen DLQ payload ek alanları:

  • retry_count
  • first_failed_at
  • last_failed_at
  • consumer_group
  • partition, offset

Redis / DB ile İlişki (Hybrid Model)

Kafka eventler hafif tutulur. Büyük state bu katmanlarda taşınır:

  • Redis device_buffer:{device_id}
    • raw_last, raw_buffer, synth_buffer, window_buffer, state_version
  • DB
    • streams, energy_*_measurements, energy_synth_results, energy_windows

Bu modelde event yalnızca tetikleyicidir; hesap için gereken ağır veri Redis/DB’den okunur.

Uçtan Uca Akış Diyagramı

Servis Bazlı Kısa Kontrol Listesi

  • Producer tarafı
    • Event schema doğrulaması
    • Zorunlu alan kontrolü (event, device_id, zaman alanları)
    • Partition key: device_id
  • Consumer tarafı
    • Idempotent işlem
    • Başarısız event üretimi (*.failed.v1)
    • Retry/DLQ politikası
  • Gözlemlenebilirlik
    • consumer lag
    • process latency
    • retry count
    • DLQ rate

Bu sayfa güncel tutulduğunda, servis ekipleri arasında event sözleşmesi kaynaklı entegrasyon sorunları ciddi biçimde azalır.