Skip to main content

Ingest Servis

Ingest Servis, cihazlardan gelen CPS uyumlu veri paketlerini sisteme kabul eden giriş mikroservisidir. Bu servis doğrudan internetten ya da ayrılmış bir giriş alanından (iot.xxxx.com, gate.xxxx.com gibi) HTTP üzerinden veri alır. Temel amacı, cihazı mümkün olduğunca kısa süre bekletirken veriyi güvenli biçimde kabul etmek, kalıcı raw kaydı üretmek, normalize edilmiş çalışma halini Redis'e yazmak ve işleme hattını Kafka üzerinden tetiklemektir.

Bu katman düşük gecikmeli bir kabul servisidir. Hesap yapmaz, sentez üretmez, pencere hesabı çalıştırmaz, iş kuralları uygulamaz. Gelen paketi kabul eder, kalıcı ve operasyonel kaydını oluşturur, ardından diğer servislerin işlemesi için sistemi tetikler.

Kanonik State Sozlesmesi

device_buffer alanlari, ownership ve stage gecisleri icin Device Buffer Redis Kaydi Sozlesmesi sayfasina bakiniz.

Sorumluluk

Ingest Servis aşağıdaki sorumluluklara sahiptir:

  • Cihazdan gelen HTTP payload'ını almak
  • CPS şemasına göre parse ve doğrulama yapmak
  • Duplicate paket kontrolü yapmak
  • Cihazdan gelen wire format veriyi kalıcı raw tabloya yazmak
  • Payload'ı backend içinde kullanılacak canonical yapıya normalize etmek
  • Normalize edilmiş veriyi Redis device_buffer:{device_id} kaydına yazmak
  • Kafka'ya minimal tetikleyici event üretmek
  • Kabul zinciri tamamlandıktan sonra cihaza 200 OK dönmek

Bu servis aşağıdakileri yapmaz:

  • sentez hesapları
  • pencere veya aggregation üretimi
  • rule evaluation
  • kalıcı business table yazımı
CPS Veri Modeli

Cihaz ile Ingest Servis arasındaki veri sözleşmesi Qapu CPS standardına dayanır. Ayrıntılı protokol tanımı ve alan yapısı için Qapu Core Packet Standard (CPS) belgesine başvurunuz.

Şema Doğrulama ve Normalize

Ingest Servis yalnızca parse edilebilir JSON kabul etmez; payload'ın CPS sözleşmesine uyup uymadığını da doğrular. Bu doğrulama kapsamında en azından aşağıdaki başlıklar kontrol edilir:

  • üst seviye yapı
  • zorunlu alanların varlığı
  • device_id eşdeğeri cihaz kimliği
  • cihaz zamanı alanı
  • sayısal alanların tür tutarlılığı
  • CPS içindeki beklenen blok yapısı

Doğrulama başarılı olduktan sonra payload backend içinde kullanılan canonical yapıya normalize edilir. Bu canonical yapı, downstream servislerin tekrar tekrar mapping yapmak zorunda kalmaması için Ingest katmanında üretilir.

Bu nedenle sistem içi sorumluluk sınırı nettir:

  • Raw DB → wire payload
  • Redis device_buffer → normalize edilmiş canonical payload

Kimlik Doğrulama Politikası

Tüm üretim cihazlarında DS28C serisi I2C güvenlik çipi bulunur. Bu çipin seri numarası doğrudan device_id olarak kullanılır. Ingest Servis, gelen device_id değerinin DS28C formatına uygunluğunu doğrular.

device_id Yapısı

DS28C seri numarası 8 bytelık sabit bir formata sahiptir:

Byte  0    : Family Code  — çip ailesini tanımlayan sabit değer
Byte 1–6 : Serial Number — fabrikada programlanmış, benzersiz 48-bit seri numara
Byte 7 : CRC8 — Byte 0–6 üzerinden hesaplanan Dallas/Maxim CRC-8 sağlama değeri

Toplam: 16 hex karakter (64-bit)

Doğrulama Yöntemi

CPS'ten gelen device_id değeri şema doğrulamasının parçası olarak iki ek kontrolden geçirilir:

Adım 1 — CRC8 Bütünlük Kontrolü:
Son byte (Byte 7), ilk 7 byte'ın Dallas/Maxim CRC-8 sağlamasıdır.
Polinom: x⁸ + x⁵ + x⁴ + 1 (0x8C yansımalı)

crc8_computed = dallas_crc8(device_id[0..6])
crc8_received = device_id[7]

if crc8_computed != crc8_received → 422 Unprocessable (device_id bütünlük hatası)

Adım 2 — Family Code Kontrolü:
Byte 0, izin verilen DS28C aile kodları listesinde olmalıdır. Beklenmeyen değer sahte veya manipüle edilmiş bir device_id işaretidir.

if device_id[0] not in ALLOWED_FAMILY_CODES → 422 Unprocessable (tanınmayan cihaz ailesi)

Doğrulama Özeti

KontrolGeçerlilik KriteriHata Kodu
UzunlukTam olarak 16 hex karakter (64-bit)422
CRC8 bütünlüğüdallas_crc8(device_id[0..6]) == device_id[7]422
Family codeİzin verilen DS28C aile kodları listesinde422
DS28C Kütüphanesi

DS28C çipinden seri numara okuma ve CRC hesaplama için DS28C I2C Authentication Kütüphanesi belgesine bakınız.

Kalıcı Raw Kayıt

Ingest Servis, kabul ettiği her geçerli ve duplicate olmayan payload için kalıcı raw kayıt üretir. Bu kayıtın amacı sadece arşiv değildir; aynı zamanda audit, hata analizi, replay ve veri kaybı incelemesidir.

Buradaki temel karar şudur:

  • DB'de tutulan veri: cihazdan geldiği wire raw payload
  • Redis'te tutulan veri: normalize edilmiş canonical çalışma hali

Bu ayrım sayesinde sistem aynı anda hem operasyonel olarak hızlı hem de gözlemlenebilir olur.

raw_data Tablosu

Kalıcı raw kaydın alan yapısı, şema kararları ve tasarım notları için raw_data Tablosu belgesine bakınız.

Redis Çalışma Alanı

Ingest Servis, doğrulanmış ve normalize edilmiş veriyi Redis içinde cihaz bazlı çalışma durumuna yazar.

Ana key yapısı:

  • device_buffer:{device_id}

Bu kayıt, yalnız son ham paketi saklayan basit bir cache değil; servisler arası ortak çalışma alanıdır. Ingest bu alanı başlatır, diğer servisler akış ilerledikçe kendi alanlarını doldurur.

Device Buffer Flow

Ingest sonrasinda Redis device_buffer kaydinin birebir ornek fixture dosyasi:

Tum asamalari tek sayfada gormek icin Device Buffer Flow dokumanina bakiniz.

{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "ingested",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"buffers": {
"measurements": {
"VRMS_R": 220.006,
"VRMS_S": 223.792,
"VRMS_T": 221.671,
"IRMS_R": 3.995,
"IRMS_S": 4.073,
"IRMS_T": 3.887,
"IFUND_R": 3.999,
"IFUND_S": 4.067,
"IFUND_T": 3.863,
"IPEAK_R": 6.012,
"IPEAK_S": 6.246,
"IPEAK_T": 5.925,
"VFUND_R": 219.997,
"VFUND_S": 223.793,
"VFUND_T": 221.752,
"P_R": 746.184,
"P_S": 764.792,
"P_T": 777.262,
"Q_R": 467.521,
"Q_S": 496.76,
"Q_T": 360.724,
"S_R": 879.551,
"S_S": 911.181,
"S_T": 861.344,
"PF_R": 0.85,
"PF_S": 0.84,
"PF_T": 0.9,
"AE_R": 62,
"AE_S": 64,
"AE_T": 65,
"FQ": 49.98,
"STOP": 36702270,
"IHARM_R_3": 0.036,
"IHARM_R_5": 0.059,
"IHARM_R_7": 0.07,
"IHARM_R_9": 0.004,
"IHARM_S_3": 0.143,
"IHARM_S_5": 0.093,
"IHARM_S_7": 0.061,
"IHARM_S_9": 0.008,
"IHARM_T_3": 0.362,
"IHARM_T_5": 0.105,
"IHARM_T_7": 0.17,
"IHARM_T_9": 0.055,
"VHARM_R_3": 2.598,
"VHARM_R_5": 0.951,
"VHARM_R_7": 4.972,
"VHARM_R_9": 0.205,
"VHARM_S_3": 1.999,
"VHARM_S_5": 4.082,
"VHARM_S_7": 3.597,
"VHARM_S_9": 0.537,
"VHARM_T_3": 3.705,
"VHARM_T_5": 4.228,
"VHARM_T_7": 0.372,
"VHARM_T_9": 0.92,
"PCB_T": 22.95,
"PCB_H": 48.34,
"RSSI": 6,
"STATUS": 1073741839
},
"synthesis": {},
"window": {}
},
"device_metadata": {
"device_id": "64000000C466EF70",
"iccid": "899001190805082918",
"firmware": "05.00.05",
"device_type": "multi_function_meter"
},
"schema_version": 2,
"state_version": 1
}

Bu dosya, adim adim servis simulasyonunda bir sonraki katmanlara input olarak kullanilabilir.

Not: CPS'teki harmonic_vector alanlari (IHARM_*, VHARM_*) canonical state'te acilir. Bu ornekte vektorler *_3, *_5, *_7, *_9 alanlarina cozulur; olmayan harmonikler yazilmaz.

Kanonik state semasi, ownership matrisi ve stage gecisleri bu sayfada tekrar edilmez. Tek referans olarak Device Buffer Redis Kaydi Sozlesmesi kullanilir.

Redis Kullanım Gerekçesi

Bu yaklaşımın nedeni tüm veriyi Kafka içinde taşımamak ve her servis için veritabanına gereksiz sorgu yükü bindirmemektir. Redis device_buffer, servisler arası ortak ve hızlı erişilebilir çalışma alanı sağlar.

Bu sayede:

  • Kafka yalnız tetikleyici olur
  • downstream servisler payload'ın tamamını Kafka'dan taşımak zorunda kalmaz
  • işleme zinciri boyunca ortak state korunur
  • son aşamada geçici buffer alanları temizlenebilir

Redis Alan Sorumlulugu

Ingest'in sahip oldugu alanlar ve downstream servis ownership sinirlari icin Device Buffer Redis Kaydi Sozlesmesi tablosu esas alinmalidir.

Kafka Mesajı

Redis güncellemesinden sonra Ingest Servis Kafka'ya minimal bir event yazar. Bu event'in amacı payload taşımak değil, downstream servisleri tetiklemektir.

Kafka içinde full payload dolaştırılmaz. Asıl veri Redis device_buffer içinden okunur.

Önerilen event sözleşmesi:

{
"event": "ingest.accepted.v1",
"data": {
"device_id": "400000011D081B70",
"device_time": "2026-03-11T15:22:06Z",
"received_at": "2026-03-11T15:22:08Z",
"schema_version": 1
}
}

Ingest Serviste Üretilen Eventler

TopicEventConsume EdenÜretim Koşulu
qapu.ingest.rawingest.accepted.v1Stream / Processor servisleriRaw DB + Redis yazımı sonrası
qapu.ingest.failedingest.failed.v1Ops / Replay WorkerParse, schema ya da kabul hatası
qapu.ingest.warningdevice.ingest_duplicate_detected.v1Ops / MonitoringDuplicate paket algılandığında
qapu.ingest.warningdevice.ingest_stuck_detected.v1Ops / MonitoringCihaz stuck davranışı gösterdiğinde
qapu.ingest.warningdevice.ingest_flood_detected.v1Ops / MonitoringCihaz flood davranışı gösterdiğinde

ingest.failed.v1 Event Sözleşmesi

Parse, şema ya da kabul hatası durumunda üretilen event:

{
"event": "ingest.failed.v1",
"timestamp": "2026-03-31T21:40:02Z",
"data": {
"device_id": "400000011D081B70",
"received_at": "2026-03-31T21:40:02Z",
"error_category": "SCHEMA_VALIDATION_ERROR",
"error_code": "CPS_FIELD_MISSING",
"error_message": "Required field 'device_time' is missing in payload",
"request_size_bytes": 512,
"http_status_code": 422
}
}

Hata kategorileri:

  • PARSE_ERROR: JSON parse başarısız (400 Bad Request)
  • SCHEMA_VALIDATION_ERROR: CPS şeması doğrulaması başarısız (422 Unprocessable Entity)
  • AUTHENTICATION_ERROR: device_id CRC8 veya family code doğrulaması başarısız (422)
  • RAW_STORAGE_ERROR: Kalıcı raw DB yazımı başarısız (500 Internal Server Error)
  • REDIS_ERROR: device_buffer Redis yazımı başarısız (500)
  • PAYLOAD_SIZE_ERROR: Payload boyut sınırı aşıldı (413 Payload Too Large)

Not: ingest.failed.v1 event'ini consume eden Replay Worker, stream_id olmadığı için manual device_id ile sorgu yapmalıdır.

Bu sözleşmede en kritik alanlar şunlardır:

  • device_id
  • device_time
  • received_at

Bu alanlar downstream servislerin ihtiyaç duyduğu bağlamı verir; payload'ın tamamı Redis'ten okunur.

Hata Davranışı

Ingest Servis aşağıdaki hata sınıflarını ayırır:

Parse / Şema Hatası

JSON parse edilemiyorsa ya da payload CPS sözleşmesine uymuyorsa istek reddedilir. Bu durumda raw DB, Redis ve Kafka akışı başlatılmaz.

Duplicate Paket

Paket aşağıdaki koşullarla duplicate kabul edilir:

  • device_id aynı
  • device_time aynı
  • canonical payload hash aynı

Duplicate paket normal ingest akışına tekrar sokulmaz. Raw tabloya yeniden yazılmaz, Redis tekrar güncellenmez; ancak warning event veya log üretilir.

Raw DB Yazım Hatası

Geçerli payload kalıcı raw tabloya yazılamıyorsa istek başarısız kabul edilir. Bu durumda ACK dönülmez.

Redis Yazım Hatası

Normalize veri Redis device_buffer içine yazılamıyorsa istek başarısız kabul edilir. Bu durumda ACK dönülmez.

Kafka Yazım Hatası

Raw DB ve Redis yazımı başarılı olmasına rağmen Kafka üretimi başarısız olursa istek başarısız kabul edilmez. Bu durumda payload sistem tarafından kabul edilmiş sayılır, 200 OK dönebilir; ancak Kafka üretim hatası operasyonel hata olarak kayda alınır ve tekrar üretim mekanizmasına bırakılır.

Bu karar sayesinde kabul garantisi raw DB + Redis katmanında sağlanır; Kafka ise tetikleme katmanı olarak değerlendirilir.

Anormal Gönderim Davranışı ve Koruma Politikası

Sahada arızalı, takılmış ya da haberleşme katmanında hatalı davranan bir IoT cihazı kısa süre içinde normalden çok daha sık paket gönderebilir. Bu durum yalnız veri kalitesini bozmaz; aynı zamanda ingest hattını, Redis yazım hızını, raw log büyümesini ve downstream servisleri gereksiz yük altına sokabilir.

Bu nedenle Ingest Servis yalnız paket kabul eden bir giriş noktası değil, aynı zamanda cihaz davranışını gözleyen ilk savunma hattıdır.

Amaç

Bu katmandaki koruma politikasının iki amacı vardır:

  • sistemi flood veya anormal tekrar yükünden korumak
  • takılmış ya da sapıtmış cihaz davranışını operasyonel olarak görünür hale getirmek

Buradaki temel yaklaşım, mümkün olan her durumda veriyi kaybetmeden kabul etmek; ancak cihaz davranışını sınıflandırmak, işaretlemek ve gerektiğinde sınırlamaktır.

İzlenen Sinyaller

Ingest Servis cihaz bazında aşağıdaki sinyalleri izler:

  • son kabul edilen paket ile yeni paket arasındaki süre (interval_ms)
  • aynı device_time ile tekrarlayan gönderimler
  • aynı payload içeriğinin arka arkaya yinelenmesi
  • kısa zaman penceresindeki toplam paket sayısı

Bu amaçla canonical payload üzerinden kısa bir hash üretilebilir ve cihaz bazında son runtime state Redis'te tutulabilir.

Önerilen geçici runtime key yapısı:

  • device_runtime:{device_id}

Örnek alanlar:

{
"last_seen_at": "2026-03-31T21:40:00Z",
"last_device_time": "2026-03-31T21:39:58Z",
"last_payload_hash": "9a4b...",
"last_interval_ms": 1000,
"same_payload_repeat_count": 14,
"events_per_1m": 68,
"rate_status": "suspicious"
}

Davranış Sınıfları

Sistem cihaz gönderim davranışını aşağıdaki sınıflardan biriyle işaretleyebilir:

DurumAçıklama
normalBeklenen aralıkta ve doğal içerik değişimiyle gelen paketler
suspiciousBeklenenden anlamlı derecede sık gelen veya tekrar örüntüsü gösteren paketler
stuckAynı zaman / aynı payload ile uzun süre tekrar eden gönderim davranışı
floodKısa zaman penceresinde sistemi zorlayacak yoğunlukta paket üretimi

Bu sınıflama veri kabul kararından bağımsızdır. İlk hedef, davranışı görünür kılmaktır.

Duplicate ve Stuck Davranışı

Takılmış cihazların en yaygın belirtisi, aynı payload'ı ya da aynı cihaz zamanını art arda tekrar göndermesidir. Bu durum aşağıdaki sinyallerle tespit edilebilir:

  • device_id aynı
  • device_time aynı
  • canonical payload hash aynı
  • paket aralığı beklenenden çok daha kısa

Bu koşullar birlikte sağlanıyorsa paket duplicate kabul edilir. Duplicate paket normal ingest akışına tekrar sokulmaz; raw tabloya yeniden yazılmaz, Redis tekrar güncellenmez, ancak uyarı amaçlı event veya log üretilir.

Bu örüntü üst üste belirli bir eşik kadar gerçekleşirse cihaz stuck olarak işaretlenebilir.

Koruma Stratejisi

İlk aşamada önerilen politika soft protection yaklaşımıdır. Yani sistem, anormal davranışı mümkün olduğunca veri kaybetmeden kabul eder; ancak paketi işaretler, sayaçları artırır ve uyarı üretir.

Bu modelde:

  • geçerli payload normal ingest akışına devam eder
  • cihaz runtime state'i güncellenir
  • warning veya anomaly event üretilebilir
  • gözlem ve operasyon tarafı cihazı görünür şekilde izler

Daha agresif koruma yalnız gerçekten gerekli olduğunda devreye alınmalıdır.

Rate Limiting: Token Bucket Algoritması

Saha geri bildirimleri dikkate alındıktan sonra hard protection gerekirse, cihaz bazlı token bucket rate limiting uygulanırır.

Token Bucket Mekanizması:

device_bucket:{device_id} =
{
"tokens": integer (0..max_burst),
"last_refill_at": timestamp,
"refill_rate": float (tokens/second),
"max_burst": integer
}

Algoritma:

  1. Her gelen paket için önce device_bucket:{device_id} okunur
  2. Son dolum zamanından beri geçen süreden bakiye hesaplanır:
    elapsed = now - last_refill_at
    tokens_gained = min(elapsed * refill_rate, max_burst)
    available_tokens = min(bucket.tokens + tokens_gained, max_burst)
  3. Paket kabul için cost=1 token harcanır:
    if available_tokens >= 1:
    bucket.tokens = available_tokens - 1
    accept_packet()
    else:
    send_429_too_many_requests()

Önerilen Parametreler:

ParametreDeğerAçıklama
refill_rate0.2 tokens/secİdeal cihaz: 12 paket/dakika (0.2 pps)
max_burst5 tokensAni 5 paketle (burst) tolerans
Burst limit5 tokens5 paket = ~25 saniyelik akış
Throttle window60s429 yanıtından sonra retry delay

Örnek Senaryo:

  • Cihaz 2 dakika arasında işlemci hatası nedeniyle 8 paket hızlı gönderirse:
      1. paket: ✅ token=4 kalır
      1. paket: ✅ token=3 kalır
      1. paket: ✅ token=2 kalır
      1. paket: ✅ token=1 kalır
      1. paket: ✅ token=0 kalır
      1. paket: ❌ 429 Too Many Requests (buffer full)
    • 7–8. paketler: ❌ 429 Too Many Requests
    • 5 saniye sonra: token refill → 6. paket kabul

Uyarı Üretim:

  • Eğer paket 429 ile reddedildiyse, cihaz flood sınıfına taşınır
  • Warning event: device.ingest_flood_detected.v1 üretilir
  • Tekrar işleme (replay) bu paketler için manuel bir hareket gerekebilir

Olası Hard Protection Adımları

Saha davranışı ve yük profili görüldükten sonra aşağıdaki ek önlemler eklenebilir:

  • kısa süreli throttle (adaptive backoff)
  • 429 Too Many Requests yanıtı (token bucket doluluğunda)
  • belirli eşik sonrası kontrollü drop (mükerrer paketler)
  • cihazı geçici quarantine listesine alma (repeat offender)
  • per-device giriş bağlantısı timeout (3–5 saniye)

Bu önlemler ortama göre konfigüre edilebilir; ancak mimari bunları destekleyecek şekilde tasarlanmıştır.

Operasyonel Uyarılar

Ingest Servis, anormal gönderim örüntülerinde ops tarafını bilgilendirecek event veya log üretebilir.

Örnek olaylar:

  • device.ingest_duplicate_detected.v1
  • device.ingest_stuck_detected.v1
  • device.ingest_flood_detected.v1

Bu uyarılar dashboard, alarm altyapısı veya replay / inceleme iş akışına bağlanabilir.

Önerilen İlk Uygulama Seviyesi

İlk faz için aşağıdaki koruma seviyesi yeterlidir:

  • cihaz bazlı son paket aralığını izlemek
  • canonical payload hash ile tekrar tespiti yapmak
  • Redis içinde kısa runtime state tutmak
  • normal / suspicious / stuck / flood sınıflaması üretmek
  • warning seviyesinde olay kaydı üretmek

Önerilen başlangıç eşikleri:

KuralEşikSonuç
Aynı device_time + aynı hash tekrarı3 kez üst üstesuspicious
Aynı device_time + aynı hash tekrarı10 kez üst üstestuck
1 dakikadaki paket sayısı> 12suspicious
1 dakikadaki paket sayısı> 30flood
Paket aralığıbeklenen aralığın < 1/4suspicious
Paket aralığıbeklenen aralığın < 1/10'uflood

Bu seviyeden sonra gerçek saha verisine göre rate limit eşikleri sertleştirilebilir.

Konfigürasyon Yönetimi

Environment Variables ve Loading Order

Tüm eşik ve rate limit parametreleri hardcoded değil, environment variables ile kontrol edilir:

# 1. Loading order (priority düşük → yüksek):
# a) Default values (hardcoded)
INGEST_DUPLICATE_REPEAT_THRESHOLD=10
INGEST_SUSPICIOUS_REPEAT_THRESHOLD=3
INGEST_EVENTS_PER_MINUTE_SUSPICIOUS=12
INGEST_EVENTS_PER_MINUTE_FLOOD=30
INGEST_RATE_LIMIT_REFILL_RATE=0.2
INGEST_RATE_LIMIT_MAX_BURST=5

# b) Config file (.env.${ENV})
source /etc/ingest/.env.${ENVIRONMENT}

# c) Runtime overrides
# Kubernetes ConfigMap veya Helm values ile process başlatılır
docker run -e INGEST_EVENTS_PER_MINUTE_FLOOD=25 ingest-service:v1

Per-Environment Profili

Development (dev):

ENVIRONMENT=development
INGEST_DUPLICATE_REPEAT_THRESHOLD=5 # Gevşek
INGEST_SUSPICIOUS_REPEAT_THRESHOLD=2
INGEST_EVENTS_PER_MINUTE_SUSPICIOUS=20
INGEST_EVENTS_PER_MINUTE_FLOOD=50 # Yüksek tolerans
INGEST_RATE_LIMIT_REFILL_RATE=0.5 # Hızlı refill
INGEST_RATE_LIMIT_MAX_BURST=10
INGEST_PAYLOAD_MAX_SIZE_BYTES=32000 # 32 KB (test payload)
INGEST_RETRY_MAX_ATTEMPTS=5 # Dev'de daha çok retry

Staging (staging):

ENVIRONMENT=staging
INGEST_DUPLICATE_REPEAT_THRESHOLD=8 # Orta
INGEST_SUSPICIOUS_REPEAT_THRESHOLD=4
INGEST_EVENTS_PER_MINUTE_SUSPICIOUS=15
INGEST_EVENTS_PER_MINUTE_FLOOD=35
INGEST_RATE_LIMIT_REFILL_RATE=0.25
INGEST_RATE_LIMIT_MAX_BURST=6
INGEST_PAYLOAD_MAX_SIZE_BYTES=16000 # 16 KB
INGEST_RETRY_MAX_ATTEMPTS=3

Production (prod):

ENVIRONMENT=production
INGEST_DUPLICATE_REPEAT_THRESHOLD=10 # Kesin
INGEST_SUSPICIOUS_REPEAT_THRESHOLD=3
INGEST_EVENTS_PER_MINUTE_SUSPICIOUS=12
INGEST_EVENTS_PER_MINUTE_FLOOD=30 # Kesin koşul
INGEST_RATE_LIMIT_REFILL_RATE=0.2 # 12 pps
INGEST_RATE_LIMIT_MAX_BURST=5 # Minimal burst
INGEST_PAYLOAD_MAX_SIZE_BYTES=16000 # 16 KB (kesin limit)
INGEST_RETRY_MAX_ATTEMPTS=2
INGEST_CONNECTION_TIMEOUT_MS=5000
INGEST_REQUEST_TIMEOUT_MS=10000

Validation ve Runtime Check

Servis startup zamanında configuration validate edilir:

def validate_config():
assert REFILL_RATE > 0, "INGEST_RATE_LIMIT_REFILL_RATE > 0"
assert MAX_BURST > 0, "INGEST_RATE_LIMIT_MAX_BURST > 0"
assert PAYLOAD_MAX_SIZE > 0, "INGEST_PAYLOAD_MAX_SIZE_BYTES > 0"
assert SUSPICIOUS_REPEAT < DUPLICATE_REPEAT, \
f"suspicious({SUSPICIOUS_REPEAT}) < duplicate({DUPLICATE_REPEAT})"
assert EVENTS_SUSPICIOUS < EVENTS_FLOOD, \
f"suspicious({EVENTS_SUSPICIOUS}) < flood({EVENTS_FLOOD})"

logger.info(f"Config loaded: ENVIRONMENT={ENVIRONMENT}, "
f"RATE_LIMIT={REFILL_RATE}pps, "
f"FLOOD_THRESHOLD={EVENTS_FLOOD}/min")

Threshold Tuning Stratejisi

Aşama 1: Taban Eşikleri (Production ilk deploy):

  • Konservatif ayarlar (flood threshold = 30/min)
  • 1 hafta gözlem yapılır
  • False positive log tahlili

Aşama 2: Telemetri Toplanması:

  • Tüm cihazların paket interval dağılımı analiz edilir
  • Per-device anomaly profili oluşturulur (Q1/Q3/IQR)
  • Device cluster'laması (IoT vs mobile vs fixed)

Aşama 3: Dinamik Tuning:

# Örnek: Device cluster bazında farklı eşikler
IoT gateway devices → FLOOD_THRESHOLD = 20/min (düşük bw)
Mobile field sensors → FLOOD_THRESHOLD = 50/min (intermittent)
Fixed industrial meters → FLOOD_THRESHOLD = 10/min (çok stabil)

Aşama 4: Feedback Loop:

  • Monthly review: flood alert count / false positive ratio
  • SLI correlation: latency vs duplicate detection accuracy
  • Eğer false positive > 5% → eşikleri %10 arttır
  • Eğer missed flood > 2% → eşikleri %15 azalt
Monitoring dashboards

Her environment'in ayrı monitoring dashboard'u olmalıdır:

  • Dev: Threshold tuning için real-time feedback
  • Staging: Production simülasyonu (eşikler aynı)
  • Prod: Alert + manual override seçenekleri

Dashboards'ta görülmesi gereken:

  • device_behavior_status (gauge) — per-device: normal/suspicious/stuck/flood
  • ingest_threshold_breach_count (counter) — min başına breach sayısı
  • ingest_token_bucket_fullness (histogram) — device başına %0-100 fullness

Bu seviyeden sonra gerçek saha verisine göre rate limit eşikleri sertleştirilebilir.

Tasarım Gerekçesi

Bu servis için tercih edilen model şu sırayı izler:

  • şema doğrulama
  • duplicate kontrolü
  • raw DB yazımı
  • normalize
  • Redis yazımı
  • Kafka üretimi
  • ACK

Bu model, daha agresif asenkron kabul modeline göre biraz daha yavaştır; ancak veri kaybı, belirsiz ACK ve yarım kabul durumu risklerini azaltır. Cihazın aldığı 200 OK böylece güvenilir bir kabul sinyali olur.

Kafka'nın ACK dışında tutulması ise sistemin kabul güvenliğini bozmadan downstream tetikleme katmanını daha esnek hale getirir.

Sınırlar ve Notlar

  • Raw DB, historical kabul kaydıdır; Redis ise operasyonel çalışma alanıdır.
  • Normalize işlem yalnız Ingest katmanında yapılır; downstream servisler canonical yapıyı kullanır.
  • device_buffer içindeki temel kimlik/zaman ve ilk canonical alanlar (device_id, device_time, ingest.ingest_time, buffers.measurements) Ingest servisinin sorumluluğundadır.

Latency ve SLA Hedefleri

Ingest Servisi, cihazın HTTP bağlantısında bekleme süresi minimize etmek ve downstream işleme hattını tetiklemek için optimize edilmiştir.

Uçtan Uca Latency (device → HTTP 200 OK):

AşamaP50P99Hedef
Request parse0.5ms2ms< 5ms
Schema validate1ms5ms< 10ms
Duplicate check (Redis)2ms8ms< 15ms
Raw DB write10ms50ms< 100ms
Redis device_buffer write3ms15ms< 20ms
Kafka produce5ms25ms< 50ms
Total (P95)~22ms~105ms< 150ms

SLA Hedefleri:

MetrikHedefAlarm Eşiği
İstek başarı oranı (2xx/4xx)> 99.5%< 99.0%
P99 latency< 150ms> 200ms
Duplicate detection accuracy> 99.9%< 99.5%
Raw DB write success rate> 99.99%< 99.9%
Redis availability> 99.99%< 99.9%
Kafka produce latency< 50ms (P99)> 100ms

Monitoring SLI'ları:

  • ingest_request_duration_ms (histogram): Parse → OK zamanı
  • ingest_raw_db_write_duration_ms (histogram): DB write latency
  • ingest_redis_write_duration_ms (histogram): Redis write latency
  • ingest_kafka_produce_duration_ms (histogram): Kafka publish latency
  • ingest_error_rate (gauge): minute başına hata sayısı
  • device_duplicate_count (counter): cihaz başına duplicate işaretlemeler
  • device_flood_warnings (counter): flood uyarı sayısı
  • buffers.synthesis, buffers.window gibi işleme alanları downstream servisler tarafından doldurulur.
  • Raw log için başlangıç retention önerisi 90 gündür; yüksek hacimde günlük partition tercih edilmelidir.
  • Ingest girişinde temel korumalar olarak max_payload_size = 16 KB, kısa request timeout ve malformed JSON rate limiting önerilir.
  • Bu sayfa CPS standardını tekrar tanımlamaz; wire sözleşme için /acik-kaynak/cps referans alınmalıdır.

Sonuç

Ingest Servis, Cınga veri işleme mimarisinin güvenli kabul sınırıdır. CPS uyumlu cihaz payload'ını alır, doğrular, duplicate kontrolünü uygular, kalıcı raw kaydını üretir, normalize edilmiş çalışma verisini Redis'e yazar, Kafka ile işleme hattını tetikler ve kabul zinciri tamamlandıktan sonra cihaza 200 OK döner.

Bu sayede sistem şu üç ihtiyacı aynı anda karşılar:

  • cihaz tarafında anlaşılır ve güvenilir ACK semantiği
  • backend tarafında hızlı ortak çalışma alanı
  • operasyonel olarak replay ve audit için kalıcı wire raw kayıt
Troubleshooting Rehberi

Operasyon sırasında sorun giderme adımları, monitoring setup ve en sık sorulan sorukular için Sık Sorulan Sorular ve Troubleshooting sayfasına bakınız.