Sentez Servisi
Sentez Servisi, raw.persisted.v1 eventini tuketip Redis device_buffer uzerindeki guncel cihaz state'ini (gerekirse DB fallback ile) kullanarak sentez metriklerini ureten mikroservistir.
Bu servis ham measurement segmentlerine yazim yapmaz. Sentez kurallarini calistirir, sonuclari hem Redis device_buffer.buffers.synthesis alanina hem de ilgili sentez tablolarina yazar ve bir sonraki katmani tetikler.
device_buffer alanlari, ownership ve stage gecisleri icin Device Buffer Redis Kaydi Sozlesmesi sayfasina bakiniz.
Sorumluluk
raw.persisted.v1eventini consume etmekstream_idbazli girdi setini Redisdevice_bufferuzerinden okumak- Redis miss/eksik alan durumunda DB fallback ile girdi setini tamamlamak
- Redis/DB uzerinden synthesis rule snapshot cozmek
- Kural onceligine gore sentez metriklerini hesaplamak
- Sonuclari
device_buffer.buffers.synthesisalanina yazmak - Sonuclari ilgili sentez tablolarina idempotent upsert etmek
- Basarida
synth.ready.v1, hatadasynth.failed.v1uretmek
Detayli islem akisi icin Synthesis Servisi Akis Diyagrami sayfasina bakiniz.
Redis/DB okuma sirasi, kural cozumleme, sentez tablo yazimi ve event cikis sirasini tek diyagramda gormek icin Veri Katmanlari Arasi Yazim Sirasi sayfasina bakiniz.
synth.ready.v1 ve synth.failed.v1 payload sozlesmeleri icin Event Sozlesmeleri sayfasina bakiniz.
İşlem Akışı
Sentez Servisi asagidaki zincirle calisir:
raw.persisted.v1consume edilir.device_buffer:{device_id}icindenmeasurements/ilgili state bloklari okunur.- Redis miss veya eksik alan varsa
stream_idbazli DB fallback ile girdi seti tamamlanir. - Aktif sentez kurallari cihaz ozel -> global fallback sirasi ile cozulur.
- Kurallar deterministic sirayla calistirilir.
- Uretilen metrikler once
device_buffer.buffers.synthesisalanina patch edilir. - Sonuclar ilgili sentez tablolarina yazilir.
- Basariliysa
synth.ready.v1, hatadasynth.failed.v1uretilir.
Girdi Kaynaklari
Sentez hesaplamasi icin kullanim sirasi:
- Redis
device_buffer:{device_id}(birincil hizli kaynak) - Typed measurement tablolari (
measurements_voltage/current/power/energy) ile DB fallback - Gerekirse pass-through segment tablolari (
measurements_device/register/environment/water/location) - Eksik veya segment disi alanlar icin generic
measurementsfallback
Not: Hesaplama girdisi event payload'undan degil, once Redis state sonra kalici veri katmanindan cozulur. Boylece tekrar isleme (replay) deterministic kalir.
Kural Cozumleme
Kural kaynagi:
- Redis cihaz kurali:
synth_rules:{device_id}:latest - Redis global kurali:
synth_rules:0:latest - Cache miss -> DB (
synthesis_rules+synthesis_assignments) + write-back
Oncelik:
- Cihaz ozel kural
- Global kural
- Kural yoksa degisken pas gec
Calistirma sirasi:
priority ASC- esitlikte
update_time DESC
Sentez Persist Tablolari
Bu sayfada tablo kolon detaylari verilmez. Sentez ciktisi ilgili typed tablolara yazilir.
Sentez tablo detaylari icin data-model belgelerine bakiniz:
Yazım Kuralları
- Yazım anahtarı
stream_id'dir. - Aynı
stream_idtekrar işlenirse idempotent upsert uygulanır. - Rule hesaplamasi sirasinda bir degisken uretilemezse ilgili alan
NULLbirakilabilir; bu durum kalite bayraklariyla isaretlenir. - Persist asamasinda transaction modeli single-row atomic upsert olarak calisir.
- Önerilen isolation:
READ COMMITTED+ idempotent upsert (yüksek throughput için dengeli varsayılan).
Multi-Table Yazım Atomicity
Synthesis Servisi tek stream_id için dört tabloyu (synthesis_voltage, synthesis_current, synthesis_power, synthesis_energy) yazıyor. Bu işlemler tablolar arası consistency açısından aşağıdaki garantileri sağlar:
Seviye 1 — Tek Tablo İçinde:
- Her tablo için atomicity vardır (
stream_idprimary key) - Aynı
stream_idüzerinde uyuşmayan iki yazım aynı anda gerçekleşemez (DB lock)
Seviye 2 — Tablolar Arası Consistency:
stream_idaynıysa tüm dört tablodaki veriler aynı rule çalıştırması sonucudur- Crash durumunda (yazım ortasında):
- Bazı tablolar güncellenmiş, bazıları eski versiyonda kalabilir
- Çözüm: Tekrar işleme idempotent olarak tamamen aynı sonucu üretir
- Redis
buffers.synthesisde benzer şekilde patch edilir
Isolation Seviyesi: READ COMMITTED
Bu seviye yüksek konkürensivite ve Synthesis gibi batch-oriented işlemler için uygunudur:
- Dirty read yok (başarısız yazımlar görülmez)
- Non-repeatable read mümkün (başka servis aynı
stream_id'yi güncellemişse) - Phantom read mümkün (yeni satırlar eklenebilir)
Crash Toleransı:
Eğer Synthesis Servisi 4 tablo yazımının ortasında başarısız olursa:
- İlk N tablo yazıldıktan sonra crash → N tablo yeni, 4-N tablo eski haline
- Tekrar işleme (
replay) koştu → idempotent upsert aynıstream_idiçin tekrar çalışır - Sonuç: 4 tablo da aynı rule sonucu ile senkronize edilir
Öneriler:
- Yazım sırası sabittir: voltage → current → power → energy
- Her tablo yazımı öncesinde log entry yazılabilir (audit trail)
synth.ready.v1yalnız 4 tablo da başarıyla yazıldıktan sonra üretilir- Kafka event'inin tamamlanmasını garantilemek için yazım bitiminden sonra event üretilir (kısa latency vs consistency tradeoff)
Yazım Sonrası Akış
DB yazımı tamamlandıktan sonra servis:
- Redis
device_buffer:{device_id}icinde kendi sorumlulugundakibuffers.synthesisblogunu gunceller. - Basariliysa
synth.ready.v1uretir. - Downstream window katmanini tetikler.
Not: buffers.synthesis alanlarinin kesin semantigi ve tum state ownership kurallari icin
Device Buffer Redis Kaydi Sozlesmesi esas alinmalidir.
raw.persisted.v1 sonrasinda Synthesis Servisinin device_buffer uzerinde olusturdugu ornek fixture:
Tum asamalari tek sayfada gormek icin Device Buffer Flow dokumanina bakiniz.
{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "synthesized",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"stream": {
"stream_id": 10248764,
"stream_time": "2026-04-13T11:18:11Z",
"process_time_ms": 13
},
"calibration": {
"calibration_time": "2026-04-13T11:18:12Z",
"process_time_ms": 11,
"ct_ratio": 1,
"vt_ratio": 1,
"calibrated_value_count": 3
},
"raw_writer": {
"write_time": "2026-04-13T11:18:13Z",
"process_time_ms": 15,
"table_write_counts": {
"measurements_voltage": 1,
"measurements_current": 1,
"measurements_power": 1,
"measurements_energy": 1,
"measurements_device": 1,
"measurements_harmonics": 1,
"measurements_register": 1
}
},
"synthesis": {
"synthesis_time": "2026-04-13T11:18:14Z",
"process_time_ms": 16,
"synthesized_variable_count": 6
},
"buffers": {
"measurements": {
"VRMS_R": 220.006,
"VRMS_S": 223.792,
"VRMS_T": 221.671,
"IRMS_R": 3.995,
"IRMS_S": 4.073,
"IRMS_T": 3.887,
"IFUND_R": 3.999,
"IFUND_S": 4.067,
"IFUND_T": 3.863,
"IPEAK_R": 6.012,
"IPEAK_S": 6.246,
"IPEAK_T": 5.925,
"P_R": 746.184,
"P_S": 764.792,
"P_T": 777.262,
"Q_R": 467.521,
"Q_S": 496.76,
"Q_T": 360.724,
"S_R": 879.551,
"S_S": 911.181,
"S_T": 861.344,
"PF_R": 0.85,
"PF_S": 0.84,
"PF_T": 0.9,
"FQ": 49.98,
"IHARM_R_3": 0.036,
"IHARM_R_5": 0.059,
"IHARM_R_7": 0.07,
"IHARM_R_9": 0.004,
"IHARM_S_3": 0.143,
"IHARM_S_5": 0.093,
"IHARM_S_7": 0.061,
"IHARM_S_9": 0.008,
"IHARM_T_3": 0.362,
"IHARM_T_5": 0.105,
"IHARM_T_7": 0.17,
"IHARM_T_9": 0.055,
"VFUND_R": 219.997,
"VFUND_S": 223.793,
"VFUND_T": 221.752,
"VHARM_R_3": 2.598,
"VHARM_R_5": 0.951,
"VHARM_R_7": 4.972,
"VHARM_R_9": 0.205,
"VHARM_S_3": 1.999,
"VHARM_S_5": 4.082,
"VHARM_S_7": 3.597,
"VHARM_S_9": 0.537,
"VHARM_T_3": 3.705,
"VHARM_T_5": 4.228,
"VHARM_T_7": 0.372,
"VHARM_T_9": 0.92,
"PCB_T": 22.95,
"PCB_H": 48.34,
"RSSI": 6,
"STATUS": 1073741839
},
"synthesis": {
"VRMS_AVG": 221.823,
"IRMS_AVG": 3.985,
"P_TOTAL": 2288.238,
"Q_TOTAL": 1325.005,
"S_TOTAL": 2652.076,
"PF_AVG": 0.863,
"THD_V_R": 2.38,
"THD_V_S": 2.15,
"THD_V_T": 2.64,
"THD_I_R": 1.92,
"THD_I_S": 4.21,
"THD_I_T": 10.43
},
"window": {}
},
"device_metadata": {
"device_id": "64000000C466EF70",
"iccid": "899001190805082918",
"firmware": "05.00.05",
"device_type": "multi_function_meter"
},
"schema_version": 2,
"state_version": 5
}
Bu dosya, bir sonraki adimda Window Servisine input olarak kullanilabilir.
Başarısızlıkta:
synth.failed.v1uretilir.- Hata kodu log + DLQ akisina tasinir.
- Tekrar isleme idempotent olarak
stream_idustunden yapilir.
Not: Window Servisi, yalnız başarılı sentez yazımı olan stream setlerini işlemeye almalıdır.
DLQ ve Retry Policy
Synthesis Servisi hata durumunda aşağıdaki stratejiyi uygular:
Hata Sınıflaması:
| Hata Kategorisi | error.retryable | Açıklama | DLQ Destinasyonu |
|---|---|---|---|
| Transient | true | Geçici network/DB timeout, Redis miss (fallback ok) | qapu.synth.retry |
| Permanent | false | Kural bulunamadı, schema hatası, invalid input | qapu.synth.dlq |
| Configuration | false | Environment variable eksik, DB bağlantısı kapamadı | qapu.synth.dlq |
Retry Policy - Exponential Backoff:
max_retries = 3
retry_delays = [1s, 5s, 25s] // exponential: 1s, 1s*5, 5s*5
örnek:
1. attempt @ t=0s → fail (TIMEOUT)
2. retry @ t=1s → fail (TIMEOUT)
3. retry @ t=6s (1s+5s) → fail (RULE_NOT_FOUND - permanent!)
4. → DLQ (permanent hata - artık retry yapma)
Retry Başlatma:
synth.failed.v1event üretilir (error.retryable=true)- Event
qapu.synth.retrytopic'ine yazılır - Retry Worker bu topic'i consume eder
- Retry Worker, backoff süresi sonra yeni event üretir
- Synthesis Servisi işlemi tekrar işler
{
"event": "synth.retry.v1",
"meta": {
"original_trace_id": "9f3f...",
"retry_attempt": 2,
"retry_at": "2026-04-12T09:10:37Z",
"backoff_ms": 5000
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412
},
"error": {
"failed_stage": "persist",
"error_code": "SYNTHESIS_WRITE_TIMEOUT",
"original_error_at": "2026-04-12T09:10:12Z"
}
}
Dead-Letter Queue (DLQ) Routing:
Hata retryable=false ise veya max_retries aşılırsa:
- Mensaje
qapu.synth.dlqtopic'ine yazılır - DLQ Consumer (Ops/Incident Response) konuya abone olur
- Alert üretilir (Slack/PagerDuty/SMS)
- Manuel inceleme ve recovery işlemi başlatılır
{
"event": "synth.permanent_failure.v1",
"meta": {
"original_trace_id": "9f3f...",
"dlq_entry_at": "2026-04-12T09:10:37Z"
},
"context": {
"device_id": "400000011D081B70",
"stream_id": 9823412
},
"error": {
"failed_stage": "rule_resolution",
"error_code": "RULE_NOT_FOUND",
"error_message": "No active synthesis rule found for stream_id=9823412",
"retryable": false,
"final_retry_count": 0
}
}
Operasyonel Yönetim:
- DLQ alert: Severity = HIGH
- Response time target: < 15 dakika
- Manual resolution: Device ID + stream_id ile query → replay trigger
- Post-mortem: Neden hata oluştu (rule missing mi, config mi, DB mi?)
Topic ve Event Standardı
Bu sayfada kullanılan standart:
- Kafka topic:
qapu.synth.ready,qapu.synth.failed - Event adı:
synth.ready.v1,synth.failed.v1
Not: Topic adı taşıma kanalıdır, event adı payload sözleşmesidir. Doküman genelinde bu ikisi karıştırılmamalıdır.
Cikti
Synthesis Servisinin ciktisi:
- Redis
device_buffer.buffers.synthesisalaninda guncel sentez state - Sentez tablolarinda kalici veri
- Kafka'da
synth.ready.v1(veya hata durumundasynth.failed.v1)
Window Servisi bu eventi tuketerek pencere hesaplarini baslatir.
Latency ve SLA Hedefleri
Synthesis Servisi, raw.persisted.v1 eventinden synth.ready.v1 / synth.failed.v1 üretimine kadar optimize edilmiştir.
Uçtan Uca Latency (raw.persisted → synth event publish):
| Aşama | P50 | P99 | Hedef |
|---|---|---|---|
| Event consume (Kafka) | 1ms | 5ms | < 10ms |
| Redis device_buffer read | 2ms | 8ms | < 15ms |
| DB fallback (eksik alanlar) | 5ms | 20ms | < 50ms (gerekirse) |
| Rule resolution | 3ms | 10ms | < 20ms |
| Synthesis calculation | 8ms | 40ms | < 100ms |
| Multi-table persist | 15ms | 80ms | < 150ms |
| Redis patch + publish | 2ms | 8ms | < 20ms |
| Total (P95) | ~36ms | ~171ms | < 250ms |
SLA Hedefleri:
| Metrik | Hedef | Alarm Eşiği |
|---|---|---|
| Event successful processing | > 99.5% | < 99.0% |
| P99 latency | < 250ms | > 350ms |
| DB write success rate | > 99.99% | < 99.9% |
| Rule resolution accuracy | > 99.9% | < 99.5% |
| Idempotent upsert consistency | > 99.99% | < 99.9% |
| Retry success rate (transient) | > 95% | < 90% |
Monitoring SLI'ları:
synth_ingestion_latency_ms(histogram): event receive → process startsynth_db_write_latency_ms(histogram): 4-table upsert durationsynth_total_process_time_ms(histogram): P50/P99 distributionssynth_rule_resolution_cache_hit_rate(gauge): Redis rule cache hit %synth_db_fallback_count(counter): minute başına DB fallbacksynth_success_rate(gauge): başarılı işlem % oranısynth_retry_success_rate(gauge): retry başarı oranısynth_dlq_entry_count(counter): DLQ'ya giren event sayısı
Konfigürasyon Yönetimi
Environment Variables
Tüm retry, timeout ve fallback parametreleri environment variables ile kontrol edilir:
# Retry Policy
SYNTH_MAX_RETRIES=3
SYNTH_RETRY_BACKOFF_MS=1000,5000,25000 # Exponential: [1s, 5s, 25s]
SYNTH_RETRY_TIMEOUT_MS=30000 # Max süre untuk her retry attempt
# DB Fallback
SYNTH_DB_FALLBACK_TIMEOUT_MS=5000 # Fallback query timeout
SYNTH_DB_CONNECTION_POOL_SIZE=10
SYNTH_DB_READ_COMMITTED_ISOLATION=true
# Rule Resolution
SYNTH_RULE_REDIS_CACHE_TTL_SECONDS=3600 # 1 saat cache
SYNTH_RULE_DB_QUERY_TIMEOUT_MS=2000
SYNTH_RULE_NOT_FOUND_RETRYABLE=false # Permanent error olarak işaretle
# Multi-Table Write
SYNTH_WRITE_ORDER=voltage,current,power,energy # Order-critical
SYNTH_WRITE_TIMEOUT_PER_TABLE_MS=10000
SYNTH_WRITE_CONSISTENCY_LEVEL=READ_COMMITTED
# DLQ
SYNTH_DLQ_TOPIC=qapu.synth.dlq
SYNTH_DLQ_MAX_RETRIES_BEFORE_SINK=3
Per-Environment Profili
Development (dev):
ENVIRONMENT=development
SYNTH_MAX_RETRIES=5 # Daha toleranslı
SYNTH_RETRY_TIMEOUT_MS=60000
SYNTH_DB_FALLBACK_TIMEOUT_MS=10000 # Gevşek fallback
SYNTH_RULE_REDIS_CACHE_TTL_SECONDS=300 # 5 min cache (test rule changes)
SYNTH_WRITE_TIMEOUT_PER_TABLE_MS=15000
SYNTH_DLQ_MAX_RETRIES_BEFORE_SINK=5
Staging (staging):
ENVIRONMENT=staging
SYNTH_MAX_RETRIES=3 # Normal
SYNTH_RETRY_TIMEOUT_MS=45000
SYNTH_DB_FALLBACK_TIMEOUT_MS=7000 # Orta
SYNTH_RULE_REDIS_CACHE_TTL_SECONDS=1800 # 30 min cache
SYNTH_WRITE_TIMEOUT_PER_TABLE_MS=12000
SYNTH_DLQ_MAX_RETRIES_BEFORE_SINK=3
Production (prod):
ENVIRONMENT=production
SYNTH_MAX_RETRIES=2 # Kesin retry limiti
SYNTH_RETRY_TIMEOUT_MS=30000 # Kesin timeout
SYNTH_DB_FALLBACK_TIMEOUT_MS=3000 # Hızlı fail-fast
SYNTH_RULE_REDIS_CACHE_TTL_SECONDS=3600 # 1 saat cache
SYNTH_WRITE_TIMEOUT_PER_TABLE_MS=8000 # Kesin limit
SYNTH_DLQ_MAX_RETRIES_BEFORE_SINK=2 # DLQ'ya hızlı routing
Validation ve Monitoring
Synthesis startup'ında konfigürasyon validate edilir:
def validate_config():
assert len(RETRY_BACKOFF_MS) == MAX_RETRIES, \
"Retry backoff array size must match max_retries"
for backoff in RETRY_BACKOFF_MS:
assert 0 < backoff < 120000, \
f"Backoff {backoff}ms should be 1s-120s"
assert RULE_CACHE_TTL > 0, "Rule cache TTL must be positive"
logger.info(f"Config loaded: ENVIRONMENT={ENVIRONMENT}, "
f"MAX_RETRIES={MAX_RETRIES}, "
f"BACKOFF={RETRY_BACKOFF_MS}, "
f"RULE_CACHE_TTL={RULE_CACHE_TTL}s")
Environment-specific alerting:
| Alert | Dev | Staging | Prod |
|---|---|---|---|
| Retry exhaustion | Log | Dashboard | PagerDuty |
| DLQ entry | Log | Slack | PagerDuty + SMS |
| P99 latency breach | - | Slack | PagerDuty |
| Rule cache miss spike | Log | Slack | Alert |
| DB fallback timeout | Dashboard | Slack | PagerDuty |
Gozlemlenebilirlik Sahipligi
Synthesis dokumani, servis kontratini ve olculecek sinyalleri (SLI) tanimlar. Sayisal hedefler ve alarm esikleri bu sayfada tutulmaz.
Bu ayrimda sahiplik:
- Synthesis Servisi: event/metrik uretimi,
meta.process_ms, hata kodlari,ready/failedeventleri - Observer Servisi: SLO/SLA hedefleri, ortam bazli alarm esikleri, eskalasyon ve bildirim kurallari
Observer tarafindaki policy ve alarm yonetimi icin Observer Servisi sayfasina bakiniz.
Sik Sorulan Sorular
Troubleshooting ve operasyonel sik sorular icin Sik Sorulan Sorular sayfasina bakiniz.