Stream Servisi
Stream Servisi, cinga.ingest.raw eventlerini tüketip cihazdan parçalı gelen paketleri tek bir stream setinde birleştiren ve stream kimliğini (stream_id) üreten mikroservistir.
Bu servis ham segment tablolarına yazım yapmaz. Görevi, assembly/finalize kararını verip downstream katmanları tetiklemektir.
Sorumluluk
cinga.ingest.raweventini consume etmek- Redis
device_buffer:{device_id}içinden state okuyup assembly merge yapmak Stream_Endve timeout kuralları ile completion kararını vermekis_partialvemissing_variablesdeğerlerini belirlemekstreamstablosuna stream kaydı açmak (stream_idüretmek)- Başarıda
stream.assembled.v1, hatadastream.failed.v1üretmek
İşlem Akışı
Redis Okuma/Yazma Modeli
Ana key:
device_buffer:{device_id}
Kullanılan alanlar:
raw_lastassembly.parts_receivedassembly.stream_end_seenassembly.payload_mergedassembly.updated_at
Ownership:
- Ingest Servisi:
raw_last - Stream Servisi:
assembly,raw_buffer
Assembly Kuralları
Packdeğerleri:BASIC,DETAIL,EXPERIMENT- Her gelen pakette
parts_receivedseti güncellenir - Alan birleştirmede
last-write-winsuygulanır Stream_End=truegelirsestream_end_seen=trueişaretlenir
Varsayılan Konfigürasyon
stream_policy:
assembly_timeout_minutes: 2
required_min_variables:
- VRMS_R
- VRMS_S
- VRMS_T
late_packet_policy: ignore_and_log
Not:
required_min_variableskaynağı merkezi config/DB olmalıdır.- Kod içinde hardcoded set tutulmamalıdır.
Completion Policy
Stream_End=trueve minimum set varsa finalize- Timeout + minimum set varsa
is_partial=trueile finalize - Minimum set yoksa
stream.failed.v1
stream_id Üretimi
stream_idDB sequence (bigserial) ile üretilir.- Numeric sequence, index ve join performansı için varsayılan seçimdir.
Stream Tablo Şeması
| Kolon | Tip | Not |
|---|---|---|
id | bigint PK | Stream kimliği |
device_id | varchar(21) FK | devices.id |
pack_type | varchar(16) | BASIC/DETAIL/EXPERIMENT |
iccid | varchar(25) FK | sims.iccid |
ip_address | varchar(16) | Kaynak IP |
size | integer | Paket boyutu |
process_time | double precision | İşlem süresi |
device_time | timestamptz | Cihaz zamanı |
stream_time | timestamptz | Sistem zamanı |
is_partial | boolean | Kısmi finalize işareti |
missing_variables | jsonb | Eksik değişken listesi |
create_time | timestamptz | Oluşturma zamanı |
Late Packet Davranışı
- Finalize edilmiş stream setine sonradan gelen paketler varsayılan olarak işlenmez.
- Bu paketler
late_packetolarak loglanır (late_packet_policy: ignore_and_log). - İhtiyaç halinde politika konfigürasyonla değiştirilebilir.
Event Sözleşmeleri
stream.assembled.v1
{
"event": "stream.assembled.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "stream-service",
"produced_at": "2026-03-11T15:22:08.120Z",
"process_ms": 11
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:08Z"
},
"data": {
"is_partial": false,
"missing_variables": null,
"calc_version": 1
},
"error": null
}
stream.failed.v1
{
"event": "stream.failed.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "stream-service",
"produced_at": "2026-03-11T15:24:10.055Z",
"process_ms": 120014
},
"context": {
"device_id": "400000011D081B70",
"stream_id": null,
"device_time": "2026-03-11T15:22:06Z"
},
"error": {
"failed_stage": "stream",
"error_code": "MIN_SET_MISSING",
"error_message": "required_min_variables not satisfied",
"failed_at": "2026-03-11T15:24:10Z",
"missing_variables": [
"VRMS_R",
"VRMS_S",
"VRMS_T"
],
"is_partial": false
}
}