Ana içeriğe geç

Ingest Servis

Ingest Servis, cihazdan gelen veri paketini sisteme güvenli ve düşük gecikmeli şekilde kabul eden giriş mikroservisidir. Cınga mimarisinde Docker Swarm ile orkestre edilen mikroservislerden biridir; ana görevi paketi doğrulamak, Redis device_buffer durumunu güncellemek ve işleme hattını tetiklemek için Kafka'ya hafif event üretmektir.

Sorumluluk

  • Cihazdan veri paketini almak
  • Paket için şema doğrulaması yapmak
  • Kimlik doğrulama yapmamak (şemaya uyan tüm device_id değerlerini kabul etmek)
  • Paketi Redis device_buffer alanına yazmak
  • Paketi Kafka ingest topic'ine göndermek
  • Cihaza hızlı ACK dönmek

Bu katman hesap yapmaz, sentez üretmez, window güncellemez. Amaç düşük gecikmeli ve dayanıklı kabul katmanı olmaktır.

İşlem Akışı

  1. Cihaz POST /v1/ingest ile payload gönderir
  2. Servis şema ve zorunlu alan kontrolü yapar (auth yok)
  3. Paket Redis device_buffer:{device_id} alanına yazılır:
    • raw_last (son gelen ham paket)
  4. Kafka'ya hafif event yazılır (cinga.ingest.raw)
  5. Servis 200 OK ACK döner

Cihaz Veri Şeması

Cihaz verisi üç ana bloktan oluşur: Info, Device, Payload.

Payload tarafında esneklik için iki format birlikte desteklenir:

  • Dizi formatı: VRMS: [R,S,T]
  • Tekil format: VRMS_R, VRMS_S, VRMS_T

Ingest servis bu iki formatı ortak canonical alana normalize eder.

{
"Info": {
"Pack": "BASIC",
"TimeStamp": "2026-3-11T15:22:6",
"ID": "400000011D081B70",
"ICCID": "8946120802005507363",
"IMEI": "354485417650889",
"Stream_End": false
},
"Device": {
"Power": {
"B_IV": 2.97,
"B_AC": -286.875,
"B_CS": 0
},
"IoT": {
"RSSI": 31
}
},
"Payload": {
"STATUS": 1073741824,
"STOP": 36702270,
"VRMS": [
0.063,
0.051,
0.055
],
"IRMS": [
0.004,
0.004,
0.004
],
"VFUND": [
0.006,
0.004,
0.005
],
"IFUND": [
0.0,
0.0,
0.0
],
"VHARM_R": [
0.061,
0.063,
0.061
],
"VHARM_S": [
0.055,
0.052,
0.054
],
"VHARM_T": [
0.054,
0.055,
0.055
],
"IHARM_R": [
0.005,
0.005,
0.005
],
"IHARM_S": [
0.004,
0.004,
0.004
],
"IHARM_T": [
0.004,
0.004,
0.004
],
"IPEAK": [
0.014,
0.011,
0.015
],
"P": [
0.0,
0.0,
0.0
],
"Q": [
0.0,
0.0,
0.0
],
"S": [
0.0,
0.0,
0.0
],
"PF": [
0.0,
0.0,
0.0
],
"FREQ": 102.36,
"PCB_T": 125,
"RESET": 1
}
}

Şema Mapping

Ingest servis, dizi veya tekil gelen payload alanlarını backend'de kullanılan standart alanlara açar.

Öncelik kuralı (ikisi birden gelirse):

  • Aynı değişken hem dizi hem tekil formatta gelirse tekil alan önceliklidir.
  • Çakışma meta.mapping_conflict=true olarak işaretlenir ve audit log'a yazılır.

Dizi Faz Seti Alanları

Wire alanıTipMapping
VRMS: [R,S,T]number[3]VRMS_R, VRMS_S, VRMS_T
IRMS: [R,S,T]number[3]IRMS_R, IRMS_S, IRMS_T
VFUND: [R,S,T]number[3]VFUND_R, VFUND_S, VFUND_T
IFUND: [R,S,T]number[3]IFUND_R, IFUND_S, IFUND_T
IPEAK: [R,S,T]number[3]IPEAK_R, IPEAK_S, IPEAK_T
P: [R,S,T]number[3]P_R, P_S, P_T
Q: [R,S,T]number[3]Q_R, Q_S, Q_T
S: [R,S,T]number[3]S_R, S_S, S_T
PF: [R,S,T]number[3]PF_R, PF_S, PF_T
VHARM_R: [h3,h5,h7]number[3]VHARM_3_R, VHARM_5_R, VHARM_7_R
VHARM_S: [h3,h5,h7]number[3]VHARM_3_S, VHARM_5_S, VHARM_7_S
VHARM_T: [h3,h5,h7]number[3]VHARM_3_T, VHARM_5_T, VHARM_7_T
IHARM_R: [h3,h5,h7]number[3]IHARM_3_R, IHARM_5_R, IHARM_7_R
IHARM_S: [h3,h5,h7]number[3]IHARM_3_S, IHARM_5_S, IHARM_7_S
IHARM_T: [h3,h5,h7]number[3]IHARM_3_T, IHARM_5_T, IHARM_7_T

Tekil alanlar

Wire alanıTipHedef alan
STATUSintegerSTATUS (32-bit bitmask; bkz: /projects/cinga/backend/architecture/status-register)
STOPintegerSTOP
FREQnumberFREQ
PCB_TnumberPCB_T
RESETbooleanRESET
VRMS_R, VRMS_S, VRMS_Tnumberaynı alanlar
IRMS_R, IRMS_S, IRMS_Tnumberaynı alanlar
VFUND_R, VFUND_S, VFUND_Tnumberaynı alanlar
IFUND_R, IFUND_S, IFUND_Tnumberaynı alanlar
IPEAK_R, IPEAK_S, IPEAK_Tnumberaynı alanlar
P_R, P_S, P_Tnumberaynı alanlar
Q_R, Q_S, Q_Tnumberaynı alanlar
S_R, S_S, S_Tnumberaynı alanlar
PF_R, PF_S, PF_Tnumberaynı alanlar
VHARM_3/5/7_R, VHARM_3/5/7_S, VHARM_3/5/7_Tnumberaynı alanlar
IHARM_3/5/7_R, IHARM_3/5/7_S, IHARM_3/5/7_Tnumberaynı alanlar

Not: Sayısal alanlar ingest katmanında normalize edilir (ör. float değerlerde 3 basamak hassasiyet).

Blokların Sorumluluğu

  • Info: cihaz kimliği + paket metası
  • Device: cihaz öz durum telemetrisi (pil, RSSI vb.)
  • Payload: ölçüm değişkenleri (wire format, faz setleri dizi olarak)

Paket Bölme Gerekçesi

Tek paket limiti 1024 byte olarak sabitlenmiştir. variables sözlüğündeki alanlar tek pakette taşınmaya çalışıldığında limit aşıldığı için cihaz verisi paket profillerine bölünür:

  • BASIC
  • DETAIL
  • EXPERIMENT

Bu yaklaşımın amacı:

  • cihaz uplink yükünü kontrol etmek
  • sabit paket limitini korumak
  • stream assembly ile veri bütünlüğünü bozmadan parçalı iletim sağlamak

Info.Pack alanı paket profilini (BASIC/DETAIL/EXPERIMENT) taşır, son parçada Info.Stream_End=true set edilir.

Maks Paket Boyutu

Boşluksuz JSON (separators=(',',':')) ve aşağıdaki sınır varsayımları ile hesap:

  • Gerilim alanları (VRMS, VFUND, VHARM_*): xxx.xxx (max 999.999)
  • Akım alanları (IRMS, IFUND, IHARM_*, IPEAK): x.xxx (max 9.999)
  • Frekans (FREQ): xxx.xxx (max 999.999)
  • Güç faktörü (PF): x.xxx (max 9.999)
  • Güç alanları (P, Q, S): xxxx.xxx (max 9999.999)
  • STOP, STATUS: 32-bit int max (2147483647)
  • RESET: boolean
  • B_IV: x.xxx (max 9.999)
  • B_AC: -xxxx.xxx (max mutlak -9999.999)
  • B_CS: x (max 9)
  • RSSI: xxx (max 999)
  • Environment (PCB_T vb.): xx.xxx (max 99.999)

Bu varsayımla hesaplanan maksimum paket boyutu:

  • 784 byte

Sabit limit:

  • max_payload_size = 1024 byte

Kalan güvenlik marjı:

  • 240 byte
Bilgi

variables sözlüğü üzerinden bilgi amaçlı ikinci hesap:

  • Sentez/çıkarım alanları hariç
  • Ortalama/toplam alanlar hariç (*_A, PF_TOT)
  • Anlık alanlar hariç (VINST_*, IINST_*)
  • Harmoniklerde yalnız 3/5/7 tutulup 9 ve THD alanları hariç

Bu filtreyle kalan yaklaşık 100 değişken tek pakette gönderilirse (boşluksuz JSON) tahmini paket boyutu yaklaşık 2141 byte olur. Bu değer de 1024 byte tek paket limitini aşar.

Notlar:

  • Hesap mevcut Info + Device + Payload alan setine göredir.
  • Değişken seti genişlerse limit yeniden hesaplanmalıdır.
  • Bu değer yalnız JSON gövdesidir; taşıma katmanı header'ları dahil değildir.

Redis Buffer Kaydı

Şema mapping sonrası ingest servis, paketi Redis'te cihaz bazlı buffer key'ine yazar:

  • device_buffer:{device_id}

Yazılan gövde (özet):

Not: Aşağıdaki assembly bloğu Stream servis tarafından yönetilir; Ingest servis doğrudan merge/finalize yapmaz.

{
"device_id": "400000011D081B70",
"last_seen": "2026-03-12T09:50:00Z",
"raw_last": {
"Info": {
"Pack": "DETAIL",
"ID": "400000011D081B70",
"Stream_End": false
},
"Device": {
"IoT": {
"RSSI": 31
}
},
"Payload": {
"VRMS": [
0.063,
0.051,
0.055
]
}
},
"assembly": {
"stream_key": "400000011D081B70:2026-03-12T09:50:00Z",
"parts_received": [
"BASIC",
"DETAIL"
],
"stream_end_seen": false,
"payload_merged": {
"VRMS": [
0.063,
0.051,
0.055
],
"IRMS": [
0.004,
0.004,
0.004
]
},
"updated_at": "2026-03-12T09:50:01Z"
},
"meta": {
"pack": "DETAIL",
"imei": "354485417650889",
"iccid": "8946120802005507363",
"firmware": "1.0.7"
},
"raw_buffer": {
"VRMS_R": [
{
"ts": "2026-03-12T09:50:00Z",
"v": 0.063
}
],
"IRMS_R": [
{
"ts": "2026-03-12T09:50:00Z",
"v": 0.004
}
]
},
"synth_buffer": {},
"window_buffer": {},
"last_stream_id": null,
"state_version": 1
}

Kurallar:

  • raw_last her pakette Ingest tarafından overwrite edilir (son paket).
  • assembly Stream servis tarafından event tüketimi sırasında merge edilir (parts_received, payload_merged, stream_end_seen).
  • raw_buffer alanları Stream servis tarafından append + prune (rolling 24h) ile güncellenir.
  • Sayısal değerler buffer'da 3 basamak hassasiyetle tutulur.
  • device_buffer key'i silinmez; sadece buffer içi eski örnekler temizlenir.

Not: Stream birleştirme finalize işlemi Stream servisinde yapılır; Ingest yalnız assembly state'i günceller.

Assembly Timeout ve Temizlik

Assembly setinin tamamlanma kontrolü Stream servisinin sorumluluğundadır.

  • Stream servis her ingest event'te assembly.updated_at alanını kontrol eder.
  • Ayrıca periyodik bir sweeper job çalıştırır (örn. her 30 sn).
  • now - updated_at > assembly_timeout_minutes ise set expired kabul edilir.

Varsayılan: assembly_timeout_minutes = 2 (GSM gecikmeleri için yeterli, konfigürasyonla artırılabilir).

Timeout durumunda assembly completion policy uygulanır:

  • required_min_variables seti kontrol edilir (ilk profil için örn. VRMS_R, VRMS_S, VRMS_T)
  • Minimum set varsa paket is_partial=true ile finalize edilir ve işleme devam eder
  • missing_variables listesi çıkarılır ve stream kaydına yazılır
  • Minimum set yoksa paket işlenmez, stream.failed.v1 event + unprocessed log üretilir
  • Her durumda Redis assembly bloğu temizlenir

Not: Bu modelde cihazdan yeniden gönderim istenmez; ek retry politikası yoktur. Gelen veriyle devam edilir, sorunlu setler loglanır.

Kafka Mesajı

Redis buffer kaydından sonra ingest servis Kafka'ya hafif event yazar; full paket Redis device_buffer içinde tutulur.

Mesaj sözleşmesinin tam detayı /projects/cinga/backend/architecture/kafka sayfasında tanımlıdır.

{
"event": "ingest.received.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "ingest-service",
"produced_at": "2026-03-11T15:22:08.050Z",
"process_ms": 6
},
"context": {
"device_id": "400000011D081B70",
"device_time": "2026-03-11T15:22:06Z"
},
"data": {
"pack": "DETAIL",
"Stream_End": false,
"received_at": "2026-03-11T15:22:08Z",
"meta": {
"pack": "DETAIL",
"imei": "354485417650889",
"iccid": "8946120802005507363",
"firmware": "1.0.7",
"ip": "x.x.x.x",
"size": 1240
}
},
"error": null
}

Ingest Serviste Üretilen Eventler

TopicEventConsume EdenÜretim Koşulu
cinga.ingest.rawingest.received.v1Stream WriterŞema doğrulama + Redis buffer yazımı sonrası
cinga.ingest.failedingest.failed.v1Ops/Replay WorkerIngest aşamasında doğrulama/işleme hatası
cinga.dlq.ingestingest.failed.v1Ops/Replay WorkerRetry limiti aşan kalıcı ingest hataları

ingest.received.v1 içinde yönlendirme için zorunlu alanlar:

  • pack: BASIC / DETAIL / EXPERIMENT
  • Stream_End: true/false

Pack Alanı Kullanımı

Info.Pack alanı paket profilini belirtir:

  • BASIC
  • DETAIL
  • EXPERIMENT

Info.Stream_End alanı ise bu iletim setinin son paketi olup olmadığını belirtir.

Stream finalize kuralı:

  • Stream_End=true ve BASIC paketi alınmışsa set finalize edilir.
  • DETAIL ve EXPERIMENT opsiyoneldir (geldiyse merge edilir).
  • Stream_End=true ama BASIC yoksa set hatalı kabul edilir (stream.failed.v1).

Not: Sentez ve pencere hesapları Payload verisine dayanır; Pack hesap girdisi değil, yönlendirme/assembly meta alanıdır.

Örnek Senaryolar

Senaryo 1 · Yalnız BASIC paketi

  • Gelen paketler: BASIC
  • Son pakette Stream_End=true
  • Sonuç: stream finalize edilir, is_partial=false

Senaryo 2 · BASIC + DETAIL

  • Gelen paketler: BASIC, DETAIL
  • Son pakette Stream_End=true
  • Sonuç: assembly merge tamamlanır, stream finalize edilir, is_partial=false

Senaryo 3 · Timeout + Minimum set var

  • Gelen paketler: DETAIL + kısmi BASIC
  • assembly_timeout_minutes=2 doldu
  • required_min_variables mevcut
  • Sonuç: stream is_partial=true ile finalize edilir, missing_variables yazılır

Senaryo 4 · Timeout + Minimum set yok

  • Gelen paketler: yalnız EXPERIMENT
  • assembly_timeout_minutes=2 doldu
  • required_min_variables yok
  • Sonuç: stream finalize edilmez, stream.failed.v1 + unprocessed log üretilir
Not

Bu Cihaz Veri Şeması bölümü geçiş dokümantasyonudur. İlerleyen fazda cihaz üretim protokolü için ayrı bir standart doküman (resmi şema spesifikasyonu) yayınlanacaktır.