Ingest Servis
Ingest Servis, cihazdan gelen veri paketini sisteme güvenli ve düşük gecikmeli şekilde kabul eden giriş mikroservisidir. Cınga mimarisinde Docker Swarm ile orkestre edilen mikroservislerden biridir; ana görevi paketi doğrulamak, Redis device_buffer durumunu güncellemek ve işleme hattını tetiklemek için Kafka'ya hafif event üretmektir.
Sorumluluk
- Cihazdan veri paketini almak
- Paket için şema doğrulaması yapmak
- Kimlik doğrulama yapmamak (şemaya uyan tüm
device_iddeğerlerini kabul etmek) - Paketi Redis
device_bufferalanına yazmak - Paketi Kafka ingest topic'ine göndermek
- Cihaza hızlı ACK dönmek
Bu katman hesap yapmaz, sentez üretmez, window güncellemez. Amaç düşük gecikmeli ve dayanıklı kabul katmanı olmaktır.
İşlem Akışı
- Cihaz
POST /v1/ingestile payload gönderir - Servis şema ve zorunlu alan kontrolü yapar (auth yok)
- Paket Redis
device_buffer:{device_id}alanına yazılır:raw_last(son gelen ham paket)
- Kafka'ya hafif event yazılır (
cinga.ingest.raw) - Servis
200 OKACK döner
Cihaz Veri Şeması
Cihaz verisi üç ana bloktan oluşur: Info, Device, Payload.
Payload tarafında esneklik için iki format birlikte desteklenir:
- Dizi formatı:
VRMS: [R,S,T] - Tekil format:
VRMS_R,VRMS_S,VRMS_T
Ingest servis bu iki formatı ortak canonical alana normalize eder.
{
"Info": {
"Pack": "BASIC",
"TimeStamp": "2026-3-11T15:22:6",
"ID": "400000011D081B70",
"ICCID": "8946120802005507363",
"IMEI": "354485417650889",
"Stream_End": false
},
"Device": {
"Power": {
"B_IV": 2.97,
"B_AC": -286.875,
"B_CS": 0
},
"IoT": {
"RSSI": 31
}
},
"Payload": {
"STATUS": 1073741824,
"STOP": 36702270,
"VRMS": [
0.063,
0.051,
0.055
],
"IRMS": [
0.004,
0.004,
0.004
],
"VFUND": [
0.006,
0.004,
0.005
],
"IFUND": [
0.0,
0.0,
0.0
],
"VHARM_R": [
0.061,
0.063,
0.061
],
"VHARM_S": [
0.055,
0.052,
0.054
],
"VHARM_T": [
0.054,
0.055,
0.055
],
"IHARM_R": [
0.005,
0.005,
0.005
],
"IHARM_S": [
0.004,
0.004,
0.004
],
"IHARM_T": [
0.004,
0.004,
0.004
],
"IPEAK": [
0.014,
0.011,
0.015
],
"P": [
0.0,
0.0,
0.0
],
"Q": [
0.0,
0.0,
0.0
],
"S": [
0.0,
0.0,
0.0
],
"PF": [
0.0,
0.0,
0.0
],
"FREQ": 102.36,
"PCB_T": 125,
"RESET": 1
}
}
Şema Mapping
Ingest servis, dizi veya tekil gelen payload alanlarını backend'de kullanılan standart alanlara açar.
Öncelik kuralı (ikisi birden gelirse):
- Aynı değişken hem dizi hem tekil formatta gelirse tekil alan önceliklidir.
- Çakışma
meta.mapping_conflict=trueolarak işaretlenir ve audit log'a yazılır.
Dizi Faz Seti Alanları
| Wire alanı | Tip | Mapping |
|---|---|---|
VRMS: [R,S,T] | number[3] | VRMS_R, VRMS_S, VRMS_T |
IRMS: [R,S,T] | number[3] | IRMS_R, IRMS_S, IRMS_T |
VFUND: [R,S,T] | number[3] | VFUND_R, VFUND_S, VFUND_T |
IFUND: [R,S,T] | number[3] | IFUND_R, IFUND_S, IFUND_T |
IPEAK: [R,S,T] | number[3] | IPEAK_R, IPEAK_S, IPEAK_T |
P: [R,S,T] | number[3] | P_R, P_S, P_T |
Q: [R,S,T] | number[3] | Q_R, Q_S, Q_T |
S: [R,S,T] | number[3] | S_R, S_S, S_T |
PF: [R,S,T] | number[3] | PF_R, PF_S, PF_T |
VHARM_R: [h3,h5,h7] | number[3] | VHARM_3_R, VHARM_5_R, VHARM_7_R |
VHARM_S: [h3,h5,h7] | number[3] | VHARM_3_S, VHARM_5_S, VHARM_7_S |
VHARM_T: [h3,h5,h7] | number[3] | VHARM_3_T, VHARM_5_T, VHARM_7_T |
IHARM_R: [h3,h5,h7] | number[3] | IHARM_3_R, IHARM_5_R, IHARM_7_R |
IHARM_S: [h3,h5,h7] | number[3] | IHARM_3_S, IHARM_5_S, IHARM_7_S |
IHARM_T: [h3,h5,h7] | number[3] | IHARM_3_T, IHARM_5_T, IHARM_7_T |
Tekil alanlar
| Wire alanı | Tip | Hedef alan |
|---|---|---|
STATUS | integer | STATUS (32-bit bitmask; bkz: /projects/cinga/backend/architecture/status-register) |
STOP | integer | STOP |
FREQ | number | FREQ |
PCB_T | number | PCB_T |
RESET | boolean | RESET |
VRMS_R, VRMS_S, VRMS_T | number | aynı alanlar |
IRMS_R, IRMS_S, IRMS_T | number | aynı alanlar |
VFUND_R, VFUND_S, VFUND_T | number | aynı alanlar |
IFUND_R, IFUND_S, IFUND_T | number | aynı alanlar |
IPEAK_R, IPEAK_S, IPEAK_T | number | aynı alanlar |
P_R, P_S, P_T | number | aynı alanlar |
Q_R, Q_S, Q_T | number | aynı alanlar |
S_R, S_S, S_T | number | aynı alanlar |
PF_R, PF_S, PF_T | number | aynı alanlar |
VHARM_3/5/7_R, VHARM_3/5/7_S, VHARM_3/5/7_T | number | aynı alanlar |
IHARM_3/5/7_R, IHARM_3/5/7_S, IHARM_3/5/7_T | number | aynı alanlar |
Not: Sayısal alanlar ingest katmanında normalize edilir (ör. float değerlerde 3 basamak hassasiyet).
Blokların Sorumluluğu
Info: cihaz kimliği + paket metasıDevice: cihaz öz durum telemetrisi (pil, RSSI vb.)Payload: ölçüm değişkenleri (wire format, faz setleri dizi olarak)
Paket Bölme Gerekçesi
Tek paket limiti 1024 byte olarak sabitlenmiştir. variables sözlüğündeki alanlar tek pakette taşınmaya çalışıldığında limit aşıldığı için cihaz verisi paket profillerine bölünür:
BASICDETAILEXPERIMENT
Bu yaklaşımın amacı:
- cihaz uplink yükünü kontrol etmek
- sabit paket limitini korumak
- stream assembly ile veri bütünlüğünü bozmadan parçalı iletim sağlamak
Info.Pack alanı paket profilini (BASIC/DETAIL/EXPERIMENT) taşır, son parçada Info.Stream_End=true set edilir.
Maks Paket Boyutu
Boşluksuz JSON (separators=(',',':')) ve aşağıdaki sınır varsayımları ile hesap:
- Gerilim alanları (
VRMS,VFUND,VHARM_*):xxx.xxx(max999.999) - Akım alanları (
IRMS,IFUND,IHARM_*,IPEAK):x.xxx(max9.999) - Frekans (
FREQ):xxx.xxx(max999.999) - Güç faktörü (
PF):x.xxx(max9.999) - Güç alanları (
P,Q,S):xxxx.xxx(max9999.999) STOP,STATUS: 32-bit int max (2147483647)RESET: booleanB_IV:x.xxx(max9.999)B_AC:-xxxx.xxx(max mutlak-9999.999)B_CS:x(max9)RSSI:xxx(max999)- Environment (
PCB_Tvb.):xx.xxx(max99.999)
Bu varsayımla hesaplanan maksimum paket boyutu:
- 784 byte
Sabit limit:
max_payload_size = 1024 byte
Kalan güvenlik marjı:
- 240 byte
variables sözlüğü üzerinden bilgi amaçlı ikinci hesap:
- Sentez/çıkarım alanları hariç
- Ortalama/toplam alanlar hariç (
*_A,PF_TOT) - Anlık alanlar hariç (
VINST_*,IINST_*) - Harmoniklerde yalnız
3/5/7tutulup9veTHDalanları hariç
Bu filtreyle kalan yaklaşık 100 değişken tek pakette gönderilirse (boşluksuz JSON) tahmini paket boyutu yaklaşık 2141 byte olur. Bu değer de 1024 byte tek paket limitini aşar.
Notlar:
- Hesap mevcut
Info + Device + Payloadalan setine göredir. - Değişken seti genişlerse limit yeniden hesaplanmalıdır.
- Bu değer yalnız JSON gövdesidir; taşıma katmanı header'ları dahil değildir.
Redis Buffer Kaydı
Şema mapping sonrası ingest servis, paketi Redis'te cihaz bazlı buffer key'ine yazar:
device_buffer:{device_id}
Yazılan gövde (özet):
Not: Aşağıdaki
assemblybloğu Stream servis tarafından yönetilir; Ingest servis doğrudan merge/finalize yapmaz.
{
"device_id": "400000011D081B70",
"last_seen": "2026-03-12T09:50:00Z",
"raw_last": {
"Info": {
"Pack": "DETAIL",
"ID": "400000011D081B70",
"Stream_End": false
},
"Device": {
"IoT": {
"RSSI": 31
}
},
"Payload": {
"VRMS": [
0.063,
0.051,
0.055
]
}
},
"assembly": {
"stream_key": "400000011D081B70:2026-03-12T09:50:00Z",
"parts_received": [
"BASIC",
"DETAIL"
],
"stream_end_seen": false,
"payload_merged": {
"VRMS": [
0.063,
0.051,
0.055
],
"IRMS": [
0.004,
0.004,
0.004
]
},
"updated_at": "2026-03-12T09:50:01Z"
},
"meta": {
"pack": "DETAIL",
"imei": "354485417650889",
"iccid": "8946120802005507363",
"firmware": "1.0.7"
},
"raw_buffer": {
"VRMS_R": [
{
"ts": "2026-03-12T09:50:00Z",
"v": 0.063
}
],
"IRMS_R": [
{
"ts": "2026-03-12T09:50:00Z",
"v": 0.004
}
]
},
"synth_buffer": {},
"window_buffer": {},
"last_stream_id": null,
"state_version": 1
}
Kurallar:
raw_lasther pakette Ingest tarafından overwrite edilir (son paket).assemblyStream servis tarafından event tüketimi sırasında merge edilir (parts_received,payload_merged,stream_end_seen).raw_bufferalanları Stream servis tarafından append + prune (rolling 24h) ile güncellenir.- Sayısal değerler buffer'da 3 basamak hassasiyetle tutulur.
device_bufferkey'i silinmez; sadece buffer içi eski örnekler temizlenir.
Not: Stream birleştirme finalize işlemi Stream servisinde yapılır; Ingest yalnız assembly state'i günceller.
Assembly Timeout ve Temizlik
Assembly setinin tamamlanma kontrolü Stream servisinin sorumluluğundadır.
- Stream servis her ingest event'te
assembly.updated_atalanını kontrol eder. - Ayrıca periyodik bir sweeper job çalıştırır (örn. her 30 sn).
now - updated_at > assembly_timeout_minutesise setexpiredkabul edilir.
Varsayılan: assembly_timeout_minutes = 2 (GSM gecikmeleri için yeterli, konfigürasyonla artırılabilir).
Timeout durumunda assembly completion policy uygulanır:
required_min_variablesseti kontrol edilir (ilk profil için örn.VRMS_R,VRMS_S,VRMS_T)- Minimum set varsa paket
is_partial=trueile finalize edilir ve işleme devam eder missing_variableslistesi çıkarılır ve stream kaydına yazılır- Minimum set yoksa paket işlenmez,
stream.failed.v1event + unprocessed log üretilir - Her durumda Redis
assemblybloğu temizlenir
Not: Bu modelde cihazdan yeniden gönderim istenmez; ek retry politikası yoktur. Gelen veriyle devam edilir, sorunlu setler loglanır.
Kafka Mesajı
Redis buffer kaydından sonra ingest servis Kafka'ya hafif event yazar; full paket Redis device_buffer içinde tutulur.
Mesaj sözleşmesinin tam detayı /projects/cinga/backend/architecture/kafka sayfasında tanımlıdır.
{
"event": "ingest.received.v1",
"meta": {
"schema_version": 1,
"trace_id": "9f3f...",
"producer_service": "ingest-service",
"produced_at": "2026-03-11T15:22:08.050Z",
"process_ms": 6
},
"context": {
"device_id": "400000011D081B70",
"device_time": "2026-03-11T15:22:06Z"
},
"data": {
"pack": "DETAIL",
"Stream_End": false,
"received_at": "2026-03-11T15:22:08Z",
"meta": {
"pack": "DETAIL",
"imei": "354485417650889",
"iccid": "8946120802005507363",
"firmware": "1.0.7",
"ip": "x.x.x.x",
"size": 1240
}
},
"error": null
}
Ingest Serviste Üretilen Eventler
| Topic | Event | Consume Eden | Üretim Koşulu |
|---|---|---|---|
cinga.ingest.raw | ingest.received.v1 | Stream Writer | Şema doğrulama + Redis buffer yazımı sonrası |
cinga.ingest.failed | ingest.failed.v1 | Ops/Replay Worker | Ingest aşamasında doğrulama/işleme hatası |
cinga.dlq.ingest | ingest.failed.v1 | Ops/Replay Worker | Retry limiti aşan kalıcı ingest hataları |
ingest.received.v1 içinde yönlendirme için zorunlu alanlar:
pack:BASIC/DETAIL/EXPERIMENTStream_End:true/false
Pack Alanı Kullanımı
Info.Pack alanı paket profilini belirtir:
BASICDETAILEXPERIMENT
Info.Stream_End alanı ise bu iletim setinin son paketi olup olmadığını belirtir.
Stream finalize kuralı:
Stream_End=trueveBASICpaketi alınmışsa set finalize edilir.DETAILveEXPERIMENTopsiyoneldir (geldiyse merge edilir).Stream_End=trueamaBASICyoksa set hatalı kabul edilir (stream.failed.v1).
Not: Sentez ve pencere hesapları Payload verisine dayanır; Pack hesap girdisi değil, yönlendirme/assembly meta alanıdır.
Örnek Senaryolar
Senaryo 1 · Yalnız BASIC paketi
- Gelen paketler:
BASIC - Son pakette
Stream_End=true - Sonuç: stream finalize edilir,
is_partial=false
Senaryo 2 · BASIC + DETAIL
- Gelen paketler:
BASIC,DETAIL - Son pakette
Stream_End=true - Sonuç: assembly merge tamamlanır, stream finalize edilir,
is_partial=false
Senaryo 3 · Timeout + Minimum set var
- Gelen paketler:
DETAIL+ kısmiBASIC assembly_timeout_minutes=2doldurequired_min_variablesmevcut- Sonuç: stream
is_partial=trueile finalize edilir,missing_variablesyazılır
Senaryo 4 · Timeout + Minimum set yok
- Gelen paketler: yalnız
EXPERIMENT assembly_timeout_minutes=2doldurequired_min_variablesyok- Sonuç: stream finalize edilmez,
stream.failed.v1+ unprocessed log üretilir
Bu Cihaz Veri Şeması bölümü geçiş dokümantasyonudur. İlerleyen fazda cihaz üretim protokolü için ayrı bir standart doküman (resmi şema spesifikasyonu) yayınlanacaktır.