Stream Servisi
Stream Servisi, qapu.ingest.raw eventlerini tüketip device_buffer:{device_id} içindeki güncel durumu işleyerek stream_id üreten mikroservistir.
Bu servis, Ingest tarafından normalize edilip Redis'e yazılmış veriden tek bir stream kaydı üretir. Stream kaydı açmadan önce cihaz envanterini kontrol eder; gerekirse yeni kayıt açar veya modem/SIM/firmware değişimini günceller. Envanter adımı tamamlanınca stream kaydını açar, stream_id bilgisini Redis cache'e geri yazar ve downstream akışın devamı için Kafka eventi üretir.
device_buffer alanlari, ownership ve stage gecisleri icin Device Buffer Redis Kaydi Sozlesmesi sayfasina bakiniz.
Sorumluluk
qapu.ingest.raweventini consume etmek- Redis
device_buffer:{device_id}içinden güncel state okumak device_idiçin cihaz kaydı var mı kontrol etmek; yoksa yeni cihaz kaydı oluşturmak- Paket içindeki
imei,iccidvefirmwaredeğerlerini mevcut envanter ile karşılaştırmak - Yeni modem/SIM kaydı açmak veya değişiklikte mevcut kaydı güncellemek
- Firmware bilgisi yeniyse kaydetmek, değiştiyse güncellemek
- Envanter değerleri mevcut kayıtla aynıysa no-op ilerlemek (update yapmamak)
- Envanterdeki create/update/değişim işlemlerini event log olarak üretmek
streamstablosuna stream kaydı açmak (stream_idüretmek)- Üretilen
stream_iddeğerini Redisdevice_buffer:{device_id}içine geri yazmak - Başarıda
stream.created.v1üretmek - Teknik hata durumunda
stream.failed.v1üretmek
İşlem Akışı
Detaylı işlem akışı için Stream Servisi Akış Diyagramı sayfasına bakınız.
Stream servisinde envanter, streams, Redis ve Kafka yazım sırasını görmek için Veri Katmanları Arası Yazım Sırası sayfasına bakınız.
Redis Okuma/Yazma Modeli
Ana key:
device_buffer:{device_id}
Stream servisi pratikte device_id, device_time, ingest.*, buffers.measurements ve state_version alanlarini okur;
stream.* blogunu ve current_stage degerini gunceller.
Kanonik alan listesi ve ownership sinirlari icin Device Buffer Redis Kaydi Sozlesmesi esas alinmalidir.
ingest.accepted.v1 sonrasinda Stream Servisinin device_buffer uzerinde olusturdugu ornek fixture:
Tum asamalari tek sayfada gormek icin Device Buffer Flow dokumanina bakiniz.
{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "streamed",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"stream": {
"stream_id": 10248764,
"stream_time": "2026-04-13T11:18:11Z",
"process_time_ms": 13
},
"buffers": {
"measurements": {
"VRMS_R": 220.006,
"VRMS_S": 223.792,
"VRMS_T": 221.671,
"IRMS_R": 3.995,
"IRMS_S": 4.073,
"IRMS_T": 3.887,
"IFUND_R": 3.999,
"IFUND_S": 4.067,
"IFUND_T": 3.863,
"IPEAK_R": 6.012,
"IPEAK_S": 6.246,
"IPEAK_T": 5.925,
"VFUND_R": 219.997,
"VFUND_S": 223.793,
"VFUND_T": 221.752,
"P_R": 746.184,
"P_S": 764.792,
"P_T": 777.262,
"Q_R": 467.521,
"Q_S": 496.76,
"Q_T": 360.724,
"S_R": 879.551,
"S_S": 911.181,
"S_T": 861.344,
"PF_R": 0.85,
"PF_S": 0.84,
"PF_T": 0.9,
"AE_R": 62,
"AE_S": 64,
"AE_T": 65,
"FQ": 49.98,
"STOP": 36702270,
"IHARM_R_3": 0.036,
"IHARM_R_5": 0.059,
"IHARM_R_7": 0.07,
"IHARM_R_9": 0.004,
"IHARM_S_3": 0.143,
"IHARM_S_5": 0.093,
"IHARM_S_7": 0.061,
"IHARM_S_9": 0.008,
"IHARM_T_3": 0.362,
"IHARM_T_5": 0.105,
"IHARM_T_7": 0.17,
"IHARM_T_9": 0.055,
"VHARM_R_3": 2.598,
"VHARM_R_5": 0.951,
"VHARM_R_7": 4.972,
"VHARM_R_9": 0.205,
"VHARM_S_3": 1.999,
"VHARM_S_5": 4.082,
"VHARM_S_7": 3.597,
"VHARM_S_9": 0.537,
"VHARM_T_3": 3.705,
"VHARM_T_5": 4.228,
"VHARM_T_7": 0.372,
"VHARM_T_9": 0.92,
"PCB_T": 22.95,
"PCB_H": 48.34,
"RSSI": 6,
"STATUS": 1073741839
},
"synthesis": {},
"window": {}
},
"device_metadata": {
"device_id": "64000000C466EF70",
"iccid": "899001190805082918",
"firmware": "05.00.05",
"device_type": "multi_function_meter"
},
"schema_version": 2,
"state_version": 2
}
Bu dosya, bir sonraki adimda Calibration Servisine input olarak kullanilabilir.
Envanter Kontrol Akışı
Stream kaydı açılmadan önce aşağıdaki sıra çalıştırılır:
device_idiledevicestablosunda kayıt aranır.- Kayıt yoksa yeni cihaz kaydı açılır ve create event log üretilir.
- Paket içinde
imeivarsamodemstablosunda kontrol edilir. - Paket içinde
iccidvarsasimstablosunda kontrol edilir. - Paket içinde
firmwarevarsa cihazın son firmware bilgisi ile karşılaştırılır. - Mevcut kayıt ile farklı modem/SIM bilgisi görülürse ilişki güncellenir ve change event log üretilir.
- Firmware bilgisi yeni/değişmiş ise cihaz kaydı güncellenir ve firmware event log üretilir.
- Değerler aynıysa update yapılmaz; yalnız karşılaştırma tamamlanır ve akış devam eder.
- Envanter adımı bittikten sonra
streamskaydı oluşturulur. - Oluşturulan
stream_id, Redisdevice_buffer:{device_id}içine yazılır. - Akışın devamı için Kafka'ya
stream.created.v1eventi yayınlanır.
Bu adımlar, telemetri akışını zenginleştiren envanter senkronizasyonudur; paket birleştirme mantığı içermez.
Duplicate paket kontrolü Ingest katmanında yapıldığı için Stream Servisi'ne normal akışta aynı paket ikinci kez gelmez. Bu nedenle Stream Servisi duplicate eleme yapmaz.
Envanter alan yapıları için aşağıdaki data-model belgelerine bakınız:
stream_id Üretimi
stream_idDB sequence (bigserial) ile üretilir.- Numeric sequence, index ve join performansı için varsayılan seçimdir.
streams Alan Kaynak Eşlemesi
Stream Servisi, streams kaydı oluştururken alanları aşağıdaki kaynaklardan doldurur:
| streams kolonu | Kaynak | Kural |
|---|---|---|
id | DB sequence | Otomatik üretilir |
device_id | device_buffer:{device_id} içindeki cihaz kimliği | Zorunlu, devices.id ile eşleşmeli |
message_type_id | CPS mesaj tipi -> message_types lookup | Alan boşsa null; alan dolu ama sözlükte yoksa null yazılır ve inceleme eventi üretilir |
sim_id | iccid üzerinden sims / sim_assignments çözümlemesi | ICCID yoksa null olabilir |
ip_address | Ingest tarafında taşınan kaynak IP | IP yoksa null olabilir |
payload_size | Ingest hesaplanan payload boyutu | Byte cinsinden yazılır |
process_time_ms | Stream servisinde işlem başlangıç-bitiş farkı | ms cinsinden yazılır |
device_time | CPS meta.device_time | Zorunlu |
stream_time | Stream insert zamanı (server clock) | Zorunlu |
raw_id | Ingest tarafından oluşturulan raw kayıt referansı | Varsa yazılır, yoksa null |
current_stage | Stream insert default stage | İlk değer accepted |
stage_update_time | Stage güncelleme zamanı | İlk insertte daima stream_time yazılır |
ledger_status | Stream insert default ledger state | İlk değer pending |
Tutarlılık notları:
streamsinserti tek noktadan Stream Servisi tarafından yapılır.device_id,device_time,stream_timeolmadan insert yapılmaz.sim_id,message_type_id,ip_address,raw_idalanları veri yoksanullkalabilir; bu durum hata değildir.message_typealanı pakette yoksa reject edilmez; akış normal devam eder.message_typepakette varsa ama sözlükte eşleşmiyorsa paket reject edilmez;message_type_id = nullyazılır vestream.message_type.unmapped.v1inceleme eventi üretilir.
Zaman Semantiği Notu
- CPS tarafındaki
TimeStampcihazın timezone içermeyen yerel üretim zamanıdır. - Ingest, bu alanı canonical modelde
device_timeolarak taşır ve Stream Servisi bu canonical değeri kullanır. - Stream event örneklerinde gösterilen UTC biçimi (
Z), backend kayıt/raporlama standardı için normalize edilmiş gösterimdir. - Esas kural: olay sıralaması için cihaz üretim zamanı
device_time, sistem işleme zamanı içinstream_timekullanılır.
streams tablosunun alan yapısı, tipleri, indeksleri ve tasarım notları için streams Tablosu belgesine bakınız.
Envanter Log Eventleri
Stream Servisi, envanter işlemlerini audit amaçlı event olarak loglar:
| Durum | Üretilen Event |
|---|---|
device_id bulunamadı, yeni cihaz açıldı | device.inventory.created.v1 |
| Cihaz kaydı metadata güncellendi | device.inventory.updated.v1 |
| Yeni IMEI görüldü, modem kaydı açıldı | modem.inventory.created.v1 |
| Cihazın modemi değişti | device.modem.changed.v1 |
| Yeni ICCID görüldü, SIM kaydı açıldı | sim.inventory.created.v1 |
| Cihazın SIM'i değişti | device.sim.changed.v1 |
| Cihaz için ilk firmware bilgisi kaydedildi | device.firmware.recorded.v1 |
| Cihaz firmware versiyonu değişti | device.firmware.changed.v1 |
Bu eventler operasyonel izleme/audit içindir. Stream kaydı oluşturma adımı, envanter işlemi başarıyla tamamlandığında devam eder.
İnceleme Eventleri
| Durum | Üretilen Event | Davranış |
|---|---|---|
message_type alanı pakette yok | Event üretilmez | Akış normal devam eder, message_type_id = null |
message_type alanı var ama sözlükte karşılığı yok | stream.message_type.unmapped.v1 | Akış normal devam eder, message_type_id = null |
Hata Matrisi
Stream Servisi asenkron çalışır; HTTP ACK üretmez. Aşağıdaki tablo, her hata türünde davranışı ve tekrar tetikleme stratejisini özetler.
| Aşama | Hata Durumu | Üretilen Kayıt | Sonraki Davranış | Tekrar Tetikleme |
|---|---|---|---|---|
| Redis okuma | device_buffer bulunamadı | stream.failed.v1 + error log | İşlem sonlandırılır | raw_data kaydı ve ingest log/event üzerinden replay servisi tetikler |
| Envanter kontrolü | devices lookup/insert hatası | inventory error log + stream.failed.v1 | Stream insert yapılmaz | Retry/backoff sonrası tekrar denenir, gerekirse replay |
| Modem kontrolü | modems create/update hatası | modem.inventory.* hata logu + stream.failed.v1 | Stream insert yapılmaz | Retry/backoff ve idempotent upsert |
| SIM kontrolü | sims/sim_assignments hatası | sim.inventory.* hata logu + stream.failed.v1 | Stream insert yapılmaz | Retry/backoff ve idempotent upsert |
| Firmware kontrolü | firmware eşleme/update hatası | device.firmware.* hata logu + stream.failed.v1 | Stream insert yapılmaz | Retry/backoff; replay ile yeniden işlenebilir |
| Mesaj tipi eşleme | message_type sözlükte yok | stream.message_type.unmapped.v1 + warning log | Paket reject edilmez, stream insert devam eder | Operasyon inceleme listesine düşer |
| Stream insert | streams tablosuna yazılamadı | stream.failed.v1 + DB error log | Redis/Kafka adımına geçilmez | Replay servisi yeni istek oluşturarak süreci tekrar tetikler |
| Redis güncelleme | stream_id cache'e yazılamadı | error log + stream.failed.v1 | Kafka success eventi yayınlanmaz | Retry queue; düzeldiğinde publish devam eder |
| Kafka publish | stream.created.v1 yayınlanamadı | Kafka error log + retry/outbox kaydı | DB kaydı korunur, downstream bekler | Outbox/worker yeniden yayınlar |
Operasyonel Not
- Paket işlenemese bile Ingest katmanındaki
raw_datakaydı ve event/log izi mevcut olduğu için akış, ayrı bir replay/retry servisi ile güvenli şekilde yeniden tetiklenebilir.
Retry ve Replay Notu
Bu mimaride normal akışta aynı paket Stream Servisi'ne ikinci kez gelmez. Hata sonrası yeniden işleme, Ingest katmanındaki raw_data kaydı ve log/event izi üzerinden ayrı replay servisi tarafından yeni bir istek olarak başlatılır.
Bu nedenle Stream Servisi tarafında öncelik, hatayı doğru loglamak ve replay servisine net tetikleme bilgisi bırakmaktır.
Event Sözleşmeleri
stream.created.v1, stream.failed.v1, envanter audit eventleri ve stream.message_type.unmapped.v1 dahil tüm event payload sözleşmeleri için Event Sözleşmeleri sayfasına bakınız.
Sık Sorulan Sorular
Troubleshooting ve operasyonel sık sorular için Sık Sorulan Sorular sayfasına bakınız.