Ana içeriğe geç

Stream Servisi

Stream Servisi, qapu.ingest.raw eventlerini tüketip device_buffer:{device_id} içindeki güncel durumu işleyerek stream_id üreten mikroservistir.

Bu servis, Ingest tarafından normalize edilip Redis'e yazılmış veriden tek bir stream kaydı üretir. Stream kaydı açmadan önce cihaz envanterini kontrol eder; gerekirse yeni kayıt açar veya modem/SIM/firmware değişimini günceller. Envanter adımı tamamlanınca stream kaydını açar, stream_id bilgisini Redis cache'e geri yazar ve downstream akışın devamı için Kafka eventi üretir.

Kanonik State Sozlesmesi

device_buffer alanlari, ownership ve stage gecisleri icin Device Buffer Redis Kaydi Sozlesmesi sayfasina bakiniz.

Sorumluluk

  • qapu.ingest.raw eventini consume etmek
  • Redis device_buffer:{device_id} içinden güncel state okumak
  • device_id için cihaz kaydı var mı kontrol etmek; yoksa yeni cihaz kaydı oluşturmak
  • Paket içindeki imei, iccid ve firmware değerlerini mevcut envanter ile karşılaştırmak
  • Yeni modem/SIM kaydı açmak veya değişiklikte mevcut kaydı güncellemek
  • Firmware bilgisi yeniyse kaydetmek, değiştiyse güncellemek
  • Envanter değerleri mevcut kayıtla aynıysa no-op ilerlemek (update yapmamak)
  • Envanterdeki create/update/değişim işlemlerini event log olarak üretmek
  • streams tablosuna stream kaydı açmak (stream_id üretmek)
  • Üretilen stream_id değerini Redis device_buffer:{device_id} içine geri yazmak
  • Başarıda stream.created.v1 üretmek
  • Teknik hata durumunda stream.failed.v1 üretmek

İşlem Akışı

Akış Diyagramı

Detaylı işlem akışı için Stream Servisi Akış Diyagramı sayfasına bakınız.

Veri Katmanları Yazım Sırası

Stream servisinde envanter, streams, Redis ve Kafka yazım sırasını görmek için Veri Katmanları Arası Yazım Sırası sayfasına bakınız.

Redis Okuma/Yazma Modeli

Ana key:

  • device_buffer:{device_id}

Stream servisi pratikte device_id, device_time, ingest.*, buffers.measurements ve state_version alanlarini okur; stream.* blogunu ve current_stage degerini gunceller.

Kanonik alan listesi ve ownership sinirlari icin Device Buffer Redis Kaydi Sozlesmesi esas alinmalidir.

Device Buffer Flow

ingest.accepted.v1 sonrasinda Stream Servisinin device_buffer uzerinde olusturdugu ornek fixture:

Tum asamalari tek sayfada gormek icin Device Buffer Flow dokumanina bakiniz.

{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "streamed",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"stream": {
"stream_id": 10248764,
"stream_time": "2026-04-13T11:18:11Z",
"process_time_ms": 13
},
"buffers": {
"measurements": {
"VRMS_R": 220.006,
"VRMS_S": 223.792,
"VRMS_T": 221.671,
"IRMS_R": 3.995,
"IRMS_S": 4.073,
"IRMS_T": 3.887,
"IFUND_R": 3.999,
"IFUND_S": 4.067,
"IFUND_T": 3.863,
"IPEAK_R": 6.012,
"IPEAK_S": 6.246,
"IPEAK_T": 5.925,
"VFUND_R": 219.997,
"VFUND_S": 223.793,
"VFUND_T": 221.752,
"P_R": 746.184,
"P_S": 764.792,
"P_T": 777.262,
"Q_R": 467.521,
"Q_S": 496.76,
"Q_T": 360.724,
"S_R": 879.551,
"S_S": 911.181,
"S_T": 861.344,
"PF_R": 0.85,
"PF_S": 0.84,
"PF_T": 0.9,
"AE_R": 62,
"AE_S": 64,
"AE_T": 65,
"FQ": 49.98,
"STOP": 36702270,
"IHARM_R_3": 0.036,
"IHARM_R_5": 0.059,
"IHARM_R_7": 0.07,
"IHARM_R_9": 0.004,
"IHARM_S_3": 0.143,
"IHARM_S_5": 0.093,
"IHARM_S_7": 0.061,
"IHARM_S_9": 0.008,
"IHARM_T_3": 0.362,
"IHARM_T_5": 0.105,
"IHARM_T_7": 0.17,
"IHARM_T_9": 0.055,
"VHARM_R_3": 2.598,
"VHARM_R_5": 0.951,
"VHARM_R_7": 4.972,
"VHARM_R_9": 0.205,
"VHARM_S_3": 1.999,
"VHARM_S_5": 4.082,
"VHARM_S_7": 3.597,
"VHARM_S_9": 0.537,
"VHARM_T_3": 3.705,
"VHARM_T_5": 4.228,
"VHARM_T_7": 0.372,
"VHARM_T_9": 0.92,
"PCB_T": 22.95,
"PCB_H": 48.34,
"RSSI": 6,
"STATUS": 1073741839
},
"synthesis": {},
"window": {}
},
"device_metadata": {
"device_id": "64000000C466EF70",
"iccid": "899001190805082918",
"firmware": "05.00.05",
"device_type": "multi_function_meter"
},
"schema_version": 2,
"state_version": 2
}

Bu dosya, bir sonraki adimda Calibration Servisine input olarak kullanilabilir.

Envanter Kontrol Akışı

Stream kaydı açılmadan önce aşağıdaki sıra çalıştırılır:

  1. device_id ile devices tablosunda kayıt aranır.
  2. Kayıt yoksa yeni cihaz kaydı açılır ve create event log üretilir.
  3. Paket içinde imei varsa modems tablosunda kontrol edilir.
  4. Paket içinde iccid varsa sims tablosunda kontrol edilir.
  5. Paket içinde firmware varsa cihazın son firmware bilgisi ile karşılaştırılır.
  6. Mevcut kayıt ile farklı modem/SIM bilgisi görülürse ilişki güncellenir ve change event log üretilir.
  7. Firmware bilgisi yeni/değişmiş ise cihaz kaydı güncellenir ve firmware event log üretilir.
  8. Değerler aynıysa update yapılmaz; yalnız karşılaştırma tamamlanır ve akış devam eder.
  9. Envanter adımı bittikten sonra streams kaydı oluşturulur.
  10. Oluşturulan stream_id, Redis device_buffer:{device_id} içine yazılır.
  11. Akışın devamı için Kafka'ya stream.created.v1 eventi yayınlanır.

Bu adımlar, telemetri akışını zenginleştiren envanter senkronizasyonudur; paket birleştirme mantığı içermez.

Duplicate Paket Notu

Duplicate paket kontrolü Ingest katmanında yapıldığı için Stream Servisi'ne normal akışta aynı paket ikinci kez gelmez. Bu nedenle Stream Servisi duplicate eleme yapmaz.

Envanter Tabloları

Envanter alan yapıları için aşağıdaki data-model belgelerine bakınız:

stream_id Üretimi

  • stream_id DB sequence (bigserial) ile üretilir.
  • Numeric sequence, index ve join performansı için varsayılan seçimdir.

streams Alan Kaynak Eşlemesi

Stream Servisi, streams kaydı oluştururken alanları aşağıdaki kaynaklardan doldurur:

streams kolonuKaynakKural
idDB sequenceOtomatik üretilir
device_iddevice_buffer:{device_id} içindeki cihaz kimliğiZorunlu, devices.id ile eşleşmeli
message_type_idCPS mesaj tipi -> message_types lookupAlan boşsa null; alan dolu ama sözlükte yoksa null yazılır ve inceleme eventi üretilir
sim_idiccid üzerinden sims / sim_assignments çözümlemesiICCID yoksa null olabilir
ip_addressIngest tarafında taşınan kaynak IPIP yoksa null olabilir
payload_sizeIngest hesaplanan payload boyutuByte cinsinden yazılır
process_time_msStream servisinde işlem başlangıç-bitiş farkıms cinsinden yazılır
device_timeCPS meta.device_timeZorunlu
stream_timeStream insert zamanı (server clock)Zorunlu
raw_idIngest tarafından oluşturulan raw kayıt referansıVarsa yazılır, yoksa null
current_stageStream insert default stageİlk değer accepted
stage_update_timeStage güncelleme zamanıİlk insertte daima stream_time yazılır
ledger_statusStream insert default ledger stateİlk değer pending

Tutarlılık notları:

  • streams inserti tek noktadan Stream Servisi tarafından yapılır.
  • device_id, device_time, stream_time olmadan insert yapılmaz.
  • sim_id, message_type_id, ip_address, raw_id alanları veri yoksa null kalabilir; bu durum hata değildir.
  • message_type alanı pakette yoksa reject edilmez; akış normal devam eder.
  • message_type pakette varsa ama sözlükte eşleşmiyorsa paket reject edilmez; message_type_id = null yazılır ve stream.message_type.unmapped.v1 inceleme eventi üretilir.

Zaman Semantiği Notu

  • CPS tarafındaki TimeStamp cihazın timezone içermeyen yerel üretim zamanıdır.
  • Ingest, bu alanı canonical modelde device_time olarak taşır ve Stream Servisi bu canonical değeri kullanır.
  • Stream event örneklerinde gösterilen UTC biçimi (Z), backend kayıt/raporlama standardı için normalize edilmiş gösterimdir.
  • Esas kural: olay sıralaması için cihaz üretim zamanı device_time, sistem işleme zamanı için stream_time kullanılır.
Streams Tablosu

streams tablosunun alan yapısı, tipleri, indeksleri ve tasarım notları için streams Tablosu belgesine bakınız.

Envanter Log Eventleri

Stream Servisi, envanter işlemlerini audit amaçlı event olarak loglar:

DurumÜretilen Event
device_id bulunamadı, yeni cihaz açıldıdevice.inventory.created.v1
Cihaz kaydı metadata güncellendidevice.inventory.updated.v1
Yeni IMEI görüldü, modem kaydı açıldımodem.inventory.created.v1
Cihazın modemi değiştidevice.modem.changed.v1
Yeni ICCID görüldü, SIM kaydı açıldısim.inventory.created.v1
Cihazın SIM'i değiştidevice.sim.changed.v1
Cihaz için ilk firmware bilgisi kaydedildidevice.firmware.recorded.v1
Cihaz firmware versiyonu değiştidevice.firmware.changed.v1

Bu eventler operasyonel izleme/audit içindir. Stream kaydı oluşturma adımı, envanter işlemi başarıyla tamamlandığında devam eder.

İnceleme Eventleri

DurumÜretilen EventDavranış
message_type alanı pakette yokEvent üretilmezAkış normal devam eder, message_type_id = null
message_type alanı var ama sözlükte karşılığı yokstream.message_type.unmapped.v1Akış normal devam eder, message_type_id = null

Hata Matrisi

Stream Servisi asenkron çalışır; HTTP ACK üretmez. Aşağıdaki tablo, her hata türünde davranışı ve tekrar tetikleme stratejisini özetler.

AşamaHata DurumuÜretilen KayıtSonraki DavranışTekrar Tetikleme
Redis okumadevice_buffer bulunamadıstream.failed.v1 + error logİşlem sonlandırılırraw_data kaydı ve ingest log/event üzerinden replay servisi tetikler
Envanter kontrolüdevices lookup/insert hatasıinventory error log + stream.failed.v1Stream insert yapılmazRetry/backoff sonrası tekrar denenir, gerekirse replay
Modem kontrolümodems create/update hatasımodem.inventory.* hata logu + stream.failed.v1Stream insert yapılmazRetry/backoff ve idempotent upsert
SIM kontrolüsims/sim_assignments hatasısim.inventory.* hata logu + stream.failed.v1Stream insert yapılmazRetry/backoff ve idempotent upsert
Firmware kontrolüfirmware eşleme/update hatasıdevice.firmware.* hata logu + stream.failed.v1Stream insert yapılmazRetry/backoff; replay ile yeniden işlenebilir
Mesaj tipi eşlememessage_type sözlükte yokstream.message_type.unmapped.v1 + warning logPaket reject edilmez, stream insert devam ederOperasyon inceleme listesine düşer
Stream insertstreams tablosuna yazılamadıstream.failed.v1 + DB error logRedis/Kafka adımına geçilmezReplay servisi yeni istek oluşturarak süreci tekrar tetikler
Redis güncellemestream_id cache'e yazılamadıerror log + stream.failed.v1Kafka success eventi yayınlanmazRetry queue; düzeldiğinde publish devam eder
Kafka publishstream.created.v1 yayınlanamadıKafka error log + retry/outbox kaydıDB kaydı korunur, downstream beklerOutbox/worker yeniden yayınlar

Operasyonel Not

  • Paket işlenemese bile Ingest katmanındaki raw_data kaydı ve event/log izi mevcut olduğu için akış, ayrı bir replay/retry servisi ile güvenli şekilde yeniden tetiklenebilir.

Retry ve Replay Notu

Bu mimaride normal akışta aynı paket Stream Servisi'ne ikinci kez gelmez. Hata sonrası yeniden işleme, Ingest katmanındaki raw_data kaydı ve log/event izi üzerinden ayrı replay servisi tarafından yeni bir istek olarak başlatılır.

Bu nedenle Stream Servisi tarafında öncelik, hatayı doğru loglamak ve replay servisine net tetikleme bilgisi bırakmaktır.

Event Sözleşmeleri

Stream Event Sözleşmeleri

stream.created.v1, stream.failed.v1, envanter audit eventleri ve stream.message_type.unmapped.v1 dahil tüm event payload sözleşmeleri için Event Sözleşmeleri sayfasına bakınız.

Sık Sorulan Sorular

Stream FAQ

Troubleshooting ve operasyonel sık sorular için Sık Sorulan Sorular sayfasına bakınız.