Skip to main content

Stream Servisi

Stream Servisi, cinga.ingest.raw eventlerini tüketip cihazdan parçalı gelen paketleri tek bir stream setinde birleştiren ve stream kimliğini (stream_id) üreten mikroservistir.

Bu servis ham segment tablolarına yazım yapmaz. Görevi, assembly/finalize kararını verip downstream katmanları tetiklemektir.

Sorumluluk

  • cinga.ingest.raw eventini consume etmek
  • Redis device_buffer:{device_id} içinden state okuyup assembly merge yapmak
  • Stream_End ve timeout kuralları ile completion kararını vermek
  • is_partial ve missing_variables değerlerini belirlemek
  • streams tablosuna stream kaydı açmak (stream_id üretmek)
  • Başarıda stream.assembled.v1, hatada stream.failed.v1 üretmek

İşlem Akışı

Redis Okuma/Yazma Modeli

Ana key:

  • device_buffer:{device_id}

Kullanılan alanlar:

  • raw_last
  • assembly.parts_received
  • assembly.stream_end_seen
  • assembly.payload_merged
  • assembly.updated_at

Ownership:

  • Ingest Servisi: raw_last
  • Stream Servisi: assembly, raw_buffer

Assembly Kuralları

  • Pack değerleri: BASIC, DETAIL, EXPERIMENT
  • Her gelen pakette parts_received seti güncellenir
  • Alan birleştirmede last-write-wins uygulanır
  • Stream_End=true gelirse stream_end_seen=true işaretlenir

Varsayılan Konfigürasyon

stream_policy:
assembly_timeout_minutes: 2
required_min_variables:
- VRMS_R
- VRMS_S
- VRMS_T
late_packet_policy: ignore_and_log

Not:

  • required_min_variables kaynağı merkezi config/DB olmalıdır.
  • Kod içinde hardcoded set tutulmamalıdır.

Completion Policy

  • Stream_End=true ve minimum set varsa finalize
  • Timeout + minimum set varsa is_partial=true ile finalize
  • Minimum set yoksa stream.failed.v1

stream_id Üretimi

  • stream_id DB sequence (bigserial) ile üretilir.
  • Numeric sequence, index ve join performansı için varsayılan seçimdir.

Stream Tablo Şeması

KolonTipNot
idbigint PKStream kimliği
device_idvarchar(21) FKdevices.id
pack_typevarchar(16)BASIC/DETAIL/EXPERIMENT
iccidvarchar(25) FKsims.iccid
ip_addressvarchar(16)Kaynak IP
sizeintegerPaket boyutu
process_timedouble precisionİşlem süresi
device_timetimestamptzCihaz zamanı
stream_timetimestamptzSistem zamanı
is_partialbooleanKısmi finalize işareti
missing_variablesjsonbEksik değişken listesi
create_timetimestamptzOluşturma zamanı

Late Packet Davranışı

  • Finalize edilmiş stream setine sonradan gelen paketler varsayılan olarak işlenmez.
  • Bu paketler late_packet olarak loglanır (late_packet_policy: ignore_and_log).
  • İhtiyaç halinde politika konfigürasyonla değiştirilebilir.

Event Sözleşmeleri

stream.assembled.v1

{
"event": "stream.assembled.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "stream-service",
"produced_at": "2026-03-11T15:22:08.120Z",
"process_ms": 11
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:08Z"
},
"data": {
"is_partial": false,
"missing_variables": null,
"calc_version": 1
},
"error": null
}

stream.failed.v1

{
"event": "stream.failed.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "stream-service",
"produced_at": "2026-03-11T15:24:10.055Z",
"process_ms": 120014
},
"context": {
"device_id": "400000011D081B70",
"stream_id": null,
"device_time": "2026-03-11T15:22:06Z"
},
"error": {
"failed_stage": "stream",
"error_code": "MIN_SET_MISSING",
"error_message": "required_min_variables not satisfied",
"failed_at": "2026-03-11T15:24:10Z",
"missing_variables": [
"VRMS_R",
"VRMS_S",
"VRMS_T"
],
"is_partial": false
}
}