End-to-End Flow Örneği
Bu sayfada, bir IoT cihazından gelen pakelin sistem içinde nasıl ilerlediğini, device_buffer'ın her aşamada nasıl evrimleştiğini adım adım gösterilmektedir.
Senaryo
Cihaz: Üç fazlı enerji ve enerji kalitesi ölçer (QAPU Multi-Function Meter)
Model ID: 64000000C466EF70
Firmware: 05.00.05
Verinin İçeriği: Voltaj, akım, güç, reaktif güç, görünür güç, harmonikler, power factor, sıcaklık, enerji
Aşama 1: Cihazdan Gelen CPS Paketi
Cihaz, standardize edilmiş Qapu Core Packet Standard (CPS) format'ında veri gönderir:
{
"Info": {
"TimeStamp": "2026-04-13T11:18:09Z",
"ID": "64000000C466EF70",
"ICCID": "899001190805082918",
"Firmware": "05.00.05"
},
"Device": {
"Power": {
"B_AC": -0.469,
"B_CS": 3,
"B_IV": 4.13
},
"IoT": {
"RSSI": 6
}
},
"Payload": {
"AE": [62, 64, 65],
"IFUND": [3.999, 4.067, 3.863],
"IHARM_R": [0.036, 0.059, 0.07, 0.004],
"IHARM_S": [0.143, 0.093, 0.061, 0.008],
"IHARM_T": [0.362, 0.105, 0.17, 0.055],
"IPEAK": [6.012, 6.246, 5.925],
"IRMS": [3.995, 4.073, 3.887],
"P": [746.184, 764.792, 777.262],
"PCB_H": 48.34,
"PCB_T": 22.95,
"PF": [0.85, 0.84, 0.9],
"PFUND": [746.179, 764.567, 777.037],
"Q": [467.521, 496.76, 360.724],
"QFUND": [465.567, 494.03, 359.238],
"RE_G": [0, 0, 0],
"RE_L": [39, 42, 30],
"S": [879.551, 911.181, 861.344],
"STATUS": 1073741839,
"STOP": 36702270,
"VFUND": [219.997, 223.793, 221.752],
"VHARM_R": [2.598, 0.951, 4.972, 0.205],
"VHARM_S": [1.999, 4.082, 3.597, 0.537],
"VHARM_T": [3.705, 4.228, 0.372, 0.92],
"VRMS": [220.006, 223.792, 221.671]
}
}
Açıklamalar:
| Alan | Açıklama |
|---|---|
ID | Device ID (DS28C seri numarası) |
ICCID | SIM kart ICCID |
VRMS | R/S/T fazlarının RMS voltajları (V) |
IRMS | R/S/T fazlarının RMS akımları (A) |
IFUND | Fundamental (temel) akım bileşenleri |
IHARM_R/S/T | Akım harmonik vektörü (sabit sıra: 3/5/7/9/11) |
IPEAK | Akım tepe değerleri |
VFUND | Fundamental voltaj |
VHARM_R/S/T | Voltaj harmonik vektörü (sabit sıra: 3/5/7/9/11) |
P | Aktif güç (W) — 3 faz |
Q | Reaktif güç (VAR) |
S | Görünür güç (VA) |
PF | Power factor (0-1) |
AE | Aktif enerji (sayaç: kWh) |
FQ | Frekans (Hz) |
PCB_T/H | Yazılım/donanım PCB sıcaklıkları |
RSSI | GSM sinyal kalitesi (dBm) |
RE_G/L | Reaktif enerji (geç/erken) |
STOP | Kumulatif enerji counter |
Aşama 2: Ingest Servisi İşlemesi
Ingest Servisi, HTTP POST ederilen paketi alır, doğrular ve Redis'e yazar.
Işlem Adımları:
- JSON parse
- CPS şeması doğrulama (Info, Device, Payload blokları)
- device_id (ID) CRC8 kontrolü
- Duplicate check (Redis)
- Kalıcı raw DB yazımı
- Normalize edilmiş device_buffer oluşturma (ve CPS kayıt formattan canonical forma dönüştürme)
- Kafka tetikleme
Örnek Dönüşüm (vektör -> canonical şema):
VRMS: [220.006, 223.792, 221.671]->VRMS_R: 220.006,VRMS_S: 223.792,VRMS_T: 221.671IRMS: [3.995, 4.073, 3.887]->IRMS_R: 3.995,IRMS_S: 4.073,IRMS_T: 3.887P: [746.184, 764.792, 777.262]->P_R: 746.184,P_S: 764.792,P_T: 777.262
Sonuç: device_buffer state after ingest:
{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "ingested",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"buffers": {
"measurements": {
"VRMS_R": 220.006,
"VRMS_S": 223.792,
"VRMS_T": 221.671,
"IRMS_R": 3.995,
"IRMS_S": 4.073,
"IRMS_T": 3.887,
"IFUND_R": 3.999,
"IFUND_S": 4.067,
"IFUND_T": 3.863,
"IPEAK_R": 6.012,
"IPEAK_S": 6.246,
"IPEAK_T": 5.925,
"VFUND_R": 219.997,
"VFUND_S": 223.793,
"VFUND_T": 221.752,
"P_R": 746.184,
"P_S": 764.792,
"P_T": 777.262,
"Q_R": 467.521,
"Q_S": 496.76,
"Q_T": 360.724,
"S_R": 879.551,
"S_S": 911.181,
"S_T": 861.344,
"PF_R": 0.85,
"PF_S": 0.84,
"PF_T": 0.9,
"AE_R": 62,
"AE_S": 64,
"AE_T": 65,
"FQ": 49.98,
"STOP": 36702270,
"IHARM_R_3": 0.036,
"IHARM_R_5": 0.059,
"IHARM_R_7": 0.07,
"IHARM_R_9": 0.004,
"IHARM_S_3": 0.143,
"IHARM_S_5": 0.093,
"IHARM_S_7": 0.061,
"IHARM_S_9": 0.008,
"IHARM_T_3": 0.362,
"IHARM_T_5": 0.105,
"IHARM_T_7": 0.17,
"IHARM_T_9": 0.055,
"VHARM_R_3": 2.598,
"VHARM_R_5": 0.951,
"VHARM_R_7": 4.972,
"VHARM_R_9": 0.205,
"VHARM_S_3": 1.999,
"VHARM_S_5": 4.082,
"VHARM_S_7": 3.597,
"VHARM_S_9": 0.537,
"VHARM_T_3": 3.705,
"VHARM_T_5": 4.228,
"VHARM_T_7": 0.372,
"VHARM_T_9": 0.92,
"PCB_T": 22.95,
"PCB_H": 48.34,
"RSSI": 6,
"STATUS": 1073741839
},
"synthesis": {},
"window": {}
},
"device_metadata": {
"device_id": "64000000C466EF70",
"iccid": "899001190805082918",
"firmware": "05.00.05",
"device_type": "multi_function_meter"
},
"schema_version": 2,
"state_version": 1
}
Neler Oldu:
- ✅ Paket geçerli ve duplicate değil
- ✅ Raw DB'ye wire payload yazıldı (raw_id: 2841023)
- ✅
device_bufferoluşturuldu ve Redis'e yazıldı - ✅ CPS payload normalize edildi (Info, Device, Payload → buffers.measurements + device_metadata)
- ✅ Harmonic vector alanları canonical şemada açıldı (
IHARM_*_3/5/7/9,VHARM_*_3/5/7/9) - ℹ️ Bu örnekte vektörde olmayan harmonikler (örn.
*_11) canonical state'e yazılmadı - ✅ Harmonik veriler korundu (geleceğe yönelik analiz için)
- ℹ️
buffers.synthesisvebuffers.windowhenüz boş (sonraki servisler doldurur)
Aşama 3: Stream Servisi İşlemesi
Stream Servisi, ingest event'i consume eder. Parçalı metrikleri birleştirir, stream_id atar ve finalize eder.
Kafka event (ingest → stream):
{
"event": "ingest.accepted.v1",
"context": {
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"received_at": "2026-04-13T11:18:10Z"
}
}
Stream Servisi:
- device_buffer okuyor
stream_idatıyor (auto-increment: 10248764)- Metadata ekleniyor
- Kalıcı stream tablosuna yazıyor
device_buffer state (stream sonrası — Ingest + Stream birlikte):
{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "streamed",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"stream": {
"stream_id": 10248764,
"stream_time": "2026-04-13T11:18:11Z",
"process_time_ms": 13
},
"buffers": {
"measurements": { /* ... tüm measurements ... */ },
"synthesis": {},
"window": {}
}
}
Neler Oldu:
- ✅ Ingest verileri korundu
- ✅ Stream metadata eklendi (stream_id, stream_time)
- ✅ Kalıcı stream tablosuna yazıldı
Aşama 4: Calibration Servisi İşlemesi
Calibration Servisi, raw measurements'ı device-specific calibration rule'ları ile kalibre eder.
Kalibrasyon Kuralı:
{
"device_id": "64000000C466EF70",
"ct_ratio": 1.0,
"vt_ratio": 1.0,
"created_at": "2025-12-01T00:00:00Z"
}
Not: Bu cihaz doğrudan ölçüm yapıyor (CT/VT oranları 1:1), bu nedenle kalibrasyon minimal.
device_buffer state (calibration sonrası — Ingest + Stream + Calibration):
{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "calibrated",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"stream": {
"stream_id": 10248764,
"stream_time": "2026-04-13T11:18:11Z",
"process_time_ms": 13
},
"calibration": {
"calibration_time": "2026-04-13T11:18:12Z",
"process_time_ms": 11,
"ct_ratio": 1.0,
"vt_ratio": 1.0,
"calibrated_value_count": 3
},
"buffers": {
"measurements": { /* ... tüm measurements korundu ... */ },
"synthesis": {},
"window": {}
}
}
Neler Oldu:
- ✅ Ingest + Stream verileri korundu
- ✅ Calibration metadata eklendi
- ✅ Measurements'lar kalibre edildi (1:1 oranında değişim yok)
Aşama 5: Raw Writer Servisi İşlemesi
Raw Writer, kalibre edilmiş ölçümleri typed tablo'lara yazar.
Yazılan Tablolar:
measurements_voltage← VRMS (3 faz)measurements_current← IRMS (3 faz) + IFUND + IHARM + IPEAKmeasurements_power← P, Q, S (aktif, reaktif, görünür güç)measurements_energy← AE (enerji sayacı)measurements_device← STATUS, PCB_T/H, RSSI, FQmeasurements_harmonics← VHARM, IHARM (voltaj/akım harmonikleri)measurements_register← Device metadata (firmware, ICCID)
device_buffer state (raw writer sonrası — Ingest + Stream + Calibration + Raw Writer):
{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "raw_written",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"stream": {
"stream_id": 10248764,
"stream_time": "2026-04-13T11:18:11Z",
"process_time_ms": 13
},
"calibration": {
"calibration_time": "2026-04-13T11:18:12Z",
"process_time_ms": 11,
"ct_ratio": 1.0,
"vt_ratio": 1.0,
"calibrated_value_count": 3
},
"raw_writer": {
"write_time": "2026-04-13T11:18:13Z",
"process_time_ms": 15,
"table_write_counts": {
"measurements_voltage": 1,
"measurements_current": 1,
"measurements_power": 1,
"measurements_energy": 1,
"measurements_device": 1,
"measurements_harmonics": 1,
"measurements_register": 1
}
},
"buffers": {
"measurements": { /* ... tüm measurements korundu ... */ },
"synthesis": {},
"window": {}
}
}
Neler Oldu:
- ✅ Ingest + Stream + Calibration verileri korundu
- ✅ Raw Writer metadata eklendi
- ✅ 7 farklı typed tablo başarıyla yazıldı
- ℹ️ Buffers hala synthesis ve window için boş
Aşama 6: Synthesis Servisi İşlemesi
Synthesis Servisi, measurements'dan derived metrics ve kalite endeksleri hesaplar:
Sentez Kuralları:
{
"device_id": "64000000C466EF70",
"rules": [
{
"priority": 1,
"output": "VRMS_AVG",
"formula": "(VRMS_R + VRMS_S + VRMS_T) / 3",
"description": "3-fazlı ortalama voltaj"
},
{
"priority": 2,
"output": "IRMS_AVG",
"formula": "(IRMS_R + IRMS_S + IRMS_T) / 3",
"description": "3-fazlı ortalama akım"
},
{
"priority": 3,
"output": "P_TOTAL",
"formula": "P_R + P_S + P_T",
"description": "Toplam aktif güç"
},
{
"priority": 4,
"output": "PF_AVG",
"formula": "(PF_R + PF_S + PF_T) / 3",
"description": "Ortalama power factor"
},
{
"priority": 5,
"output": "THD_V",
"formula": "sqrt(VHARM_R_3^2 + VHARM_R_5^2 + VHARM_R_7^2 + VHARM_R_9^2) / VFUND_R * 100",
"description": "Voltaj toplam harmonik distorsiyonu"
},
{
"priority": 6,
"output": "THD_I",
"formula": "sqrt(IHARM_R_3^2 + IHARM_R_5^2 + IHARM_R_7^2 + IHARM_R_9^2) / IFUND_R * 100",
"description": "Akım toplam harmonik distorsiyonu"
}
]
}
Hesaplamalar:
- Ortalama Voltaj: (220.006 + 223.792 + 221.671) / 3 = 221.823 V
- Ortalama Akım: (3.995 + 4.073 + 3.887) / 3 = 3.985 A
- Toplam Güç: 746.184 + 764.792 + 777.262 = 2,288.238 W
- Ortalama PF: (0.85 + 0.84 + 0.9) / 3 = 0.863
- Voltaj THD (R fazı): √(2.598² + 0.951² + 4.972² + 0.205²) / 219.997 × 100 = 2.38%
- Akım THD (R fazı): √(0.036² + 0.059² + 0.07² + 0.004²) / 3.999 × 100 = 1.92%
device_buffer state (synthesis sonrası):
{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "synthesized",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"stream": {
"stream_id": 10248764,
"stream_time": "2026-04-13T11:18:11Z",
"process_time_ms": 13
},
"calibration": {
"calibration_time": "2026-04-13T11:18:12Z",
"process_time_ms": 11,
"ct_ratio": 1.0,
"vt_ratio": 1.0,
"calibrated_value_count": 3
},
"raw_writer": {
"write_time": "2026-04-13T11:18:13Z",
"process_time_ms": 15,
"table_write_counts": {
"measurements_voltage": 1,
"measurements_current": 1,
"measurements_power": 1,
"measurements_energy": 1,
"measurements_device": 1,
"measurements_harmonics": 1,
"measurements_register": 1
}
},
"synthesis": {
"synthesis_time": "2026-04-13T11:18:14Z",
"process_time_ms": 16,
"synthesized_variable_count": 6
},
"buffers": {
"measurements": {
"VRMS_R": 220.006,
"VRMS_S": 223.792,
"VRMS_T": 221.671,
"IRMS_R": 3.995,
"IRMS_S": 4.073,
"IRMS_T": 3.887,
"IFUND_R": 3.999,
"IFUND_S": 4.067,
"IFUND_T": 3.863,
"IPEAK_R": 6.012,
"IPEAK_S": 6.246,
"IPEAK_T": 5.925,
"P_R": 746.184,
"P_S": 764.792,
"P_T": 777.262,
"Q_R": 467.521,
"Q_S": 496.76,
"Q_T": 360.724,
"S_R": 879.551,
"S_S": 911.181,
"S_T": 861.344,
"PF_R": 0.85,
"PF_S": 0.84,
"PF_T": 0.9,
"FQ": 49.98,
"VFUND_R": 219.997,
"VFUND_S": 223.793,
"VFUND_T": 221.752,
"IHARM_R_3": 0.036,
"IHARM_R_5": 0.059,
"IHARM_R_7": 0.07,
"IHARM_R_9": 0.004,
"IHARM_S_3": 0.143,
"IHARM_S_5": 0.093,
"IHARM_S_7": 0.061,
"IHARM_S_9": 0.008,
"IHARM_T_3": 0.362,
"IHARM_T_5": 0.105,
"IHARM_T_7": 0.17,
"IHARM_T_9": 0.055,
"VHARM_R_3": 2.598,
"VHARM_R_5": 0.951,
"VHARM_R_7": 4.972,
"VHARM_R_9": 0.205,
"VHARM_S_3": 1.999,
"VHARM_S_5": 4.082,
"VHARM_S_7": 3.597,
"VHARM_S_9": 0.537,
"VHARM_T_3": 3.705,
"VHARM_T_5": 4.228,
"VHARM_T_7": 0.372,
"VHARM_T_9": 0.92,
"PCB_T": 22.95,
"PCB_H": 48.34,
"RSSI": 6,
"STATUS": 1073741839
},
"synthesis": {
"VRMS_AVG": 221.823,
"IRMS_AVG": 3.985,
"P_TOTAL": 2288.238,
"Q_TOTAL": 1325.005,
"S_TOTAL": 2652.076,
"PF_AVG": 0.863,
"THD_V_R": 2.38,
"THD_V_S": 2.15,
"THD_V_T": 2.64,
"THD_I_R": 1.92,
"THD_I_S": 4.21,
"THD_I_T": 10.43
},
"window": {}
},
"schema_version": 2,
"state_version": 8
}
Yapılan İşlemler:
- ✅ Kural çözümleme (device-specific)
- ✅ Sentez metrikler + kalite endeksleri hesaplanır
- ✅
synthesis_voltage,synthesis_current,synthesis_power,synthesis_qualitytablolarına yazılır - ✅ device_buffer.buffers.synthesis patched
- ✅
synth.ready.v1event üretilir
Aşama 7: Window Servisi İşlemesi (Sonrası)
Window Servisi, synthesis eventini consume eder. Zaman pencereleri (5-min, 1-hour vb.) hesaplayıp agregate sonuçları üretir.
Sonuç (device_buffer final state):
{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "windowed",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"stream": {
"stream_id": 10248764,
"stream_time": "2026-04-13T11:18:11Z",
"process_time_ms": 13
},
"calibration": {
"calibration_time": "2026-04-13T11:18:12Z",
"process_time_ms": 11,
"ct_ratio": 1.0,
"vt_ratio": 1.0,
"calibrated_value_count": 3
},
"raw_writer": {
"write_time": "2026-04-13T11:18:13Z",
"process_time_ms": 15,
"table_write_counts": {
"measurements_voltage": 1,
"measurements_current": 1,
"measurements_power": 1,
"measurements_energy": 1,
"measurements_device": 1,
"measurements_harmonics": 1,
"measurements_register": 1
}
},
"synthesis": {
"synthesis_time": "2026-04-13T11:18:14Z",
"process_time_ms": 16,
"synthesized_variable_count": 6
},
"window": {
"window_time": "2026-04-13T11:18:15Z",
"process_time_ms": 19,
"window_count": 2
},
"buffers": {
"measurements": { /* ... */ },
"synthesis": { /* ... */ },
"window": {
"window_5m": {
"VRMS_AVG_mean": 221.82,
"VRMS_AVG_min": 221.12,
"VRMS_AVG_max": 222.45,
"IRMS_AVG_mean": 3.985,
"IRMS_AVG_min": 3.92,
"IRMS_AVG_max": 4.08,
"P_TOTAL_energy": 190.68,
"PF_AVG_mean": 0.863,
"THD_I_R_max": 1.92
},
"window_1h": {
"VRMS_AVG_mean": 221.75,
"P_TOTAL_energy": 2288.24,
"AE_total": 191
}
}
}
}
End-to-End Timeline
11:18:09.000Z ← Cihaz gönderişi (device_time)
└─ Ingest: parse, validate, normalize
11:18:10.008Z ← ✅ 200 OK device'e döner (P50: 8ms)
└─ Stream: stream_id assign
11:18:11.021Z ← Stream hazır (13ms)
└─ Calibration: doğrulama ve kurala göre ayar
11:18:12.032Z ← Kalibre (11ms)
└─ Raw Writer: 6 typed table yazımı
11:18:13.047Z ← Raw tablolar hazır (15ms)
└─ Synthesis: 6 sentez metrik hesabı
11:18:14.063Z ← Sentez hazır (16ms)
└─ Window: 5-min + 1-hour pencereleri
11:18:15.082Z ← Nihai sonuç hazır (~73ms total, P95)
Latency özeti:
| Aşama | Latency | Kümülatif |
|---|---|---|
| Ingest | 8ms | 8ms |
| Stream | 13ms | 21ms |
| Calibration | 11ms | 32ms |
| Raw Writer | 15ms | 47ms |
| Synthesis | 16ms | 63ms |
| Window | 19ms | 82ms |
Hata Durumu Örneği
Eğer harmonik kalitesi çok kötüyse (THD_I > 30%) ve Synthesis başarısız olursa:
{
"event": "synth.failed.v1",
"context": {
"device_id": "64000000C466EF70",
"stream_id": 10248764
},
"error": {
"failed_stage": "rule_resolution",
"error_code": "RULE_NOT_FOUND",
"error_message": "No active synthesis rule for device_id=64000000C466EF70",
"retryable": false,
"failed_at": "2026-04-13T11:18:14Z"
}
}
İşlemler:
- Hata DLQ (
qapu.synth.dlq) yazılır - Ops team'e alert gönderilir
- Manual inceleme: Kural neden missing?
- Manual recovery: Kural assign edilir → replay trigger
Önemli Noktalar
- device_buffer tüm aşamalarda ortak çalışma alanı
- Her servis kendi bölümünü doldurur, önceki adımları okur
- Harmonik veriler (IHARM, VHARM) korunur — gelecekteki analiz için
- Kalite metrikleri (THD, PF, frequency) sentez aşamasında hesaplanır
- Hata oluşursa event üretilir ve optional retry yapılır
- Latency: Ingest → Synthesis ~63ms, Window'a kadarsa ~82ms
- Idempotency: Aynı stream_id tekrar işlenirse aynı sonuç
- Multi-phase measurement: 3-fazlı sistem, tüm faz verisi preserved
İlişkili Sayfalar
- Ingest Servisi — Giriş ve doğrulama
- Stream Servisi — Stream ID assignment
- Calibration Servisi — Ölçüm kalibrasyonu
- Raw Writer Servisi — Typed tablo yazımı
- Synthesis Servisi — Türetilmiş metrik hesabı
- Window Servisi — Zaman pencereleri