Ingest Servis
Ingest Servis, cihazlardan gelen CPS uyumlu veri paketlerini sisteme kabul eden giriş mikroservisidir. Bu servis doğrudan internetten ya da ayrılmış bir giriş alanından (iot.xxxx.com, gate.xxxx.com gibi) HTTP üzerinden veri alır. Temel amacı, cihazı mümkün olduğunca kısa süre bekletirken veriyi güvenli biçimde kabul etmek, kalıcı raw kaydı üretmek, normalize edilmiş çalışma halini Redis'e yazmak ve işleme hattını Kafka üzerinden tetiklemektir.
Bu katman düşük gecikmeli bir kabul servisidir. Hesap yapmaz, sentez üretmez, pencere hesabı çalıştırmaz, iş kuralları uygulamaz. Gelen paketi kabul eder, kalıcı ve operasyonel kaydını oluşturur, ardından diğer servislerin işlemesi için sistemi tetikler.
device_buffer alanlari, ownership ve stage gecisleri icin Device Buffer Redis Kaydi Sozlesmesi sayfasina bakiniz.
Ingest akışları alt sayfalara ayrılmıştır:
Sorumluluk
Ingest Servis aşağıdaki sorumluluklara sahiptir:
- Cihazdan gelen HTTP payload'ını almak
- CPS şemasına göre parse ve doğrulama yapmak
- Duplicate paket kontrolü yapmak
- Cihazdan gelen wire format veriyi kalıcı raw tabloya yazmak
- Payload'ı backend içinde kullanılacak canonical yapıya normalize etmek
- Normalize edilmiş veriyi Redis
device_buffer:{device_id}kaydına yazmak - Kafka'ya minimal tetikleyici event üretmek
- Kabul zinciri tamamlandıktan sonra cihaza
200 OKdönmek
Bu servis aşağıdakileri yapmaz:
- sentez hesapları
- pencere veya aggregation üretimi
- rule evaluation
- kalıcı business table yazımı
Cihaz ile Ingest Servis arasındaki veri sözleşmesi Qapu CPS standardına dayanır. Ayrıntılı protokol tanımı ve alan yapısı için Qapu Core Packet Standard (CPS) belgesine başvurunuz.
Şema Doğrulama ve Normalize
Ingest Servis yalnızca parse edilebilir JSON kabul etmez; payload'ın CPS sözleşmesine uyup uymadığını da doğrular. Bu doğrulama kapsamında en azından aşağıdaki başlıklar kontrol edilir:
- üst seviye yapı
- zorunlu alanların varlığı
device_ideşdeğeri cihaz kimliği- cihaz zamanı alanı
- sayısal alanların tür tutarlılığı
- CPS içindeki beklenen blok yapısı
Doğrulama başarılı olduktan sonra payload backend içinde kullanılan canonical yapıya normalize edilir. Bu canonical yapı, downstream servislerin tekrar tekrar mapping yapmak zorunda kalmaması için Ingest katmanında üretilir.
Bu nedenle sistem içi sorumluluk sınırı nettir:
- Raw DB → wire payload
- Redis
device_buffer→ normalize edilmiş canonical payload
Kimlik Doğrulama Politikası
Tüm üretim cihazlarında DS28C serisi I2C güvenlik çipi bulunur. Bu çipin seri numarası doğrudan device_id olarak kullanılır. Ingest Servis, gelen device_id değerinin DS28C formatına uygunluğunu doğrular.
device_id Yapısı
DS28C seri numarası 8 bytelık sabit bir formata sahiptir:
Byte 0 : Family Code — çip ailesini tanımlayan sabit değer
Byte 1–6 : Serial Number — fabrikada programlanmış, benzersiz 48-bit seri numara
Byte 7 : CRC8 — Byte 0–6 üzerinden hesaplanan Dallas/Maxim CRC-8 sağlama değeri
Toplam: 16 hex karakter (64-bit)
Doğrulama Yöntemi
CPS'ten gelen device_id değeri şema doğrulamasının parçası olarak iki ek kontrolden geçirilir:
Adım 1 — CRC8 Bütünlük Kontrolü:
Son byte (Byte 7), ilk 7 byte'ın Dallas/Maxim CRC-8 sağlamasıdır.
Polinom: x⁸ + x⁵ + x⁴ + 1 (0x8C yansımalı)
crc8_computed = dallas_crc8(device_id[0..6])
crc8_received = device_id[7]
if crc8_computed != crc8_received → 422 Unprocessable (device_id bütünlük hatası)
Adım 2 — Family Code Kontrolü:
Byte 0, izin verilen DS28C aile kodları listesinde olmalıdır. Beklenmeyen değer sahte veya manipüle edilmiş bir device_id işaretidir.
if device_id[0] not in ALLOWED_FAMILY_CODES → 422 Unprocessable (tanınmayan cihaz ailesi)
Doğrulama Özeti
| Kontrol | Geçerlilik Kriteri | Hata Kodu |
|---|---|---|
| Uzunluk | Tam olarak 16 hex karakter (64-bit) | 422 |
| CRC8 bütünlüğü | dallas_crc8(device_id[0..6]) == device_id[7] | 422 |
| Family code | İzin verilen DS28C aile kodları listesinde | 422 |
DS28C çipinden seri numara okuma ve CRC hesaplama için DS28C I2C Authentication Kütüphanesi belgesine bakınız.
Kalıcı Raw Kayıt
Ingest Servis, kabul ettiği her geçerli ve duplicate olmayan payload için kalıcı raw kayıt üretir. Bu kayıtın amacı sadece arşiv değildir; aynı zamanda audit, hata analizi, replay ve veri kaybı incelemesidir.
Buradaki temel karar şudur:
- DB'de tutulan veri: cihazdan geldiği wire raw payload
- Redis'te tutulan veri: normalize edilmiş canonical çalışma hali
Bu ayrım sayesinde sistem aynı anda hem operasyonel olarak hızlı hem de gözlemlenebilir olur.
Kalıcı raw kaydın alan yapısı, şema kararları ve tasarım notları için raw_data Tablosu belgesine bakınız.
Redis Çalışma Alanı
Ingest Servis, doğrulanmış ve normalize edilmiş veriyi Redis içinde cihaz bazlı çalışma durumuna yazar.
Ana key yapısı:
device_buffer:{device_id}
Bu kayıt, yalnız son ham paketi saklayan basit bir cache değil; servisler arası ortak çalışma alanıdır. Ingest bu alanı başlatır, diğer servisler akış ilerledikçe kendi alanlarını doldurur.
Ingest sonrasinda Redis device_buffer kaydinin birebir ornek fixture dosyasi:
Tum asamalari tek sayfada gormek icin Device Buffer Flow dokumanina bakiniz.
{
"device_id": "64000000C466EF70",
"device_time": "2026-04-13T11:18:09Z",
"current_stage": "ingested",
"ingest": {
"ingest_time": "2026-04-13T11:18:10Z",
"is_valid": true,
"raw_id": 2841023,
"process_time_ms": 8
},
"buffers": {
"measurements": {
"VRMS_R": 220.006,
"VRMS_S": 223.792,
"VRMS_T": 221.671,
"IRMS_R": 3.995,
"IRMS_S": 4.073,
"IRMS_T": 3.887,
"IFUND_R": 3.999,
"IFUND_S": 4.067,
"IFUND_T": 3.863,
"IPEAK_R": 6.012,
"IPEAK_S": 6.246,
"IPEAK_T": 5.925,
"VFUND_R": 219.997,
"VFUND_S": 223.793,
"VFUND_T": 221.752,
"P_R": 746.184,
"P_S": 764.792,
"P_T": 777.262,
"Q_R": 467.521,
"Q_S": 496.76,
"Q_T": 360.724,
"S_R": 879.551,
"S_S": 911.181,
"S_T": 861.344,
"PF_R": 0.85,
"PF_S": 0.84,
"PF_T": 0.9,
"AE_R": 62,
"AE_S": 64,
"AE_T": 65,
"FQ": 49.98,
"STOP": 36702270,
"IHARM_R_3": 0.036,
"IHARM_R_5": 0.059,
"IHARM_R_7": 0.07,
"IHARM_R_9": 0.004,
"IHARM_S_3": 0.143,
"IHARM_S_5": 0.093,
"IHARM_S_7": 0.061,
"IHARM_S_9": 0.008,
"IHARM_T_3": 0.362,
"IHARM_T_5": 0.105,
"IHARM_T_7": 0.17,
"IHARM_T_9": 0.055,
"VHARM_R_3": 2.598,
"VHARM_R_5": 0.951,
"VHARM_R_7": 4.972,
"VHARM_R_9": 0.205,
"VHARM_S_3": 1.999,
"VHARM_S_5": 4.082,
"VHARM_S_7": 3.597,
"VHARM_S_9": 0.537,
"VHARM_T_3": 3.705,
"VHARM_T_5": 4.228,
"VHARM_T_7": 0.372,
"VHARM_T_9": 0.92,
"PCB_T": 22.95,
"PCB_H": 48.34,
"RSSI": 6,
"STATUS": 1073741839
},
"synthesis": {},
"window": {}
},
"device_metadata": {
"device_id": "64000000C466EF70",
"iccid": "899001190805082918",
"firmware": "05.00.05",
"device_type": "multi_function_meter"
},
"schema_version": 2,
"state_version": 1
}
Bu dosya, adim adim servis simulasyonunda bir sonraki katmanlara input olarak kullanilabilir.
Not: CPS'teki harmonic_vector alanlari (IHARM_*, VHARM_*) canonical state'te acilir.
Bu ornekte vektorler *_3, *_5, *_7, *_9 alanlarina cozulur; olmayan harmonikler yazilmaz.
Kanonik state semasi, ownership matrisi ve stage gecisleri bu sayfada tekrar edilmez. Tek referans olarak Device Buffer Redis Kaydi Sozlesmesi kullanilir.
Redis Kullanım Gerekçesi
Bu yaklaşımın nedeni tüm veriyi Kafka içinde taşımamak ve her servis için veritabanına gereksiz sorgu yükü bindirmemektir. Redis device_buffer, servisler arası ortak ve hızlı erişilebilir çalışma alanı sağlar.
Bu sayede:
- Kafka yalnız tetikleyici olur
- downstream servisler payload'ın tamamını Kafka'dan taşımak zorunda kalmaz
- işleme zinciri boyunca ortak state korunur
- son aşamada geçici buffer alanları temizlenebilir
Redis Alan Sorumlulugu
Ingest'in sahip oldugu alanlar ve downstream servis ownership sinirlari icin Device Buffer Redis Kaydi Sozlesmesi tablosu esas alinmalidir.
Kafka Mesajı
Redis güncellemesinden sonra Ingest Servis Kafka'ya minimal bir event yazar. Bu event'in amacı payload taşımak değil, downstream servisleri tetiklemektir.
Kafka içinde full payload dolaştırılmaz. Asıl veri Redis device_buffer içinden okunur.
Önerilen event sözleşmesi:
{
"event": "ingest.accepted.v1",
"data": {
"device_id": "400000011D081B70",
"device_time": "2026-03-11T15:22:06Z",
"received_at": "2026-03-11T15:22:08Z",
"schema_version": 1
}
}
Ingest Serviste Üretilen Eventler
| Topic | Event | Consume Eden | Üretim Koşulu |
|---|---|---|---|
qapu.ingest.raw | ingest.accepted.v1 | Stream / Processor servisleri | Raw DB + Redis yazımı sonrası |
qapu.ingest.failed | ingest.failed.v1 | Ops / Replay Worker | Parse, schema ya da kabul hatası |
qapu.ingest.warning | device.ingest_duplicate_detected.v1 | Ops / Monitoring | Duplicate paket algılandığında |
qapu.ingest.warning | device.ingest_stuck_detected.v1 | Ops / Monitoring | Cihaz stuck davranışı gösterdiğinde |
qapu.ingest.warning | device.ingest_flood_detected.v1 | Ops / Monitoring | Cihaz flood davranışı gösterdiğinde |
ingest.failed.v1 Event Sözleşmesi
Parse, şema ya da kabul hatası durumunda üretilen event:
{
"event": "ingest.failed.v1",
"timestamp": "2026-03-31T21:40:02Z",
"data": {
"device_id": "400000011D081B70",
"received_at": "2026-03-31T21:40:02Z",
"error_category": "SCHEMA_VALIDATION_ERROR",
"error_code": "CPS_FIELD_MISSING",
"error_message": "Required field 'device_time' is missing in payload",
"request_size_bytes": 512,
"http_status_code": 422
}
}
Hata kategorileri:
PARSE_ERROR: JSON parse başarısız (400 Bad Request)SCHEMA_VALIDATION_ERROR: CPS şeması doğrulaması başarısız (422 Unprocessable Entity)AUTHENTICATION_ERROR: device_id CRC8 veya family code doğrulaması başarısız (422)RAW_STORAGE_ERROR: Kalıcı raw DB yazımı başarısız (500 Internal Server Error)REDIS_ERROR: device_buffer Redis yazımı başarısız (500)PAYLOAD_SIZE_ERROR: Payload boyut sınırı aşıldı (413 Payload Too Large)
Not: ingest.failed.v1 event'ini consume eden Replay Worker, stream_id olmadığı için manual device_id ile sorgu yapmalıdır.
Bu sözleşmede en kritik alanlar şunlardır:
device_iddevice_timereceived_at
Bu alanlar downstream servislerin ihtiyaç duyduğu bağlamı verir; payload'ın tamamı Redis'ten okunur.
Hata Davranışı
Ingest Servis aşağıdaki hata sınıflarını ayırır:
Parse / Şema Hatası
JSON parse edilemiyorsa ya da payload CPS sözleşmesine uymuyorsa istek reddedilir. Bu durumda raw DB, Redis ve Kafka akışı başlatılmaz.
Duplicate Paket
Paket aşağıdaki koşullarla duplicate kabul edilir:
device_idaynıdevice_timeaynı- canonical payload hash aynı
Duplicate paket normal ingest akışına tekrar sokulmaz. Raw tabloya yeniden yazılmaz, Redis tekrar güncellenmez; ancak warning event veya log üretilir.
Raw DB Yazım Hatası
Geçerli payload kalıcı raw tabloya yazılamıyorsa istek başarısız kabul edilir. Bu durumda ACK dönülmez.
Redis Yazım Hatası
Normalize veri Redis device_buffer içine yazılamıyorsa istek başarısız kabul edilir. Bu durumda ACK dönülmez.
Kafka Yazım Hatası
Raw DB ve Redis yazımı başarılı olmasına rağmen Kafka üretimi başarısız olursa istek başarısız kabul edilmez. Bu durumda payload sistem tarafından kabul edilmiş sayılır, 200 OK dönebilir; ancak Kafka üretim hatası operasyonel hata olarak kayda alınır ve tekrar üretim mekanizmasına bırakılır.
Bu karar sayesinde kabul garantisi raw DB + Redis katmanında sağlanır; Kafka ise tetikleme katmanı olarak değerlendirilir.
Anormal Gönderim Davranışı ve Koruma Politikası
Sahada arızalı, takılmış ya da haberleşme katmanında hatalı davranan bir IoT cihazı kısa süre içinde normalden çok daha sık paket gönderebilir. Bu durum yalnız veri kalitesini bozmaz; aynı zamanda ingest hattını, Redis yazım hızını, raw log büyümesini ve downstream servisleri gereksiz yük altına sokabilir.
Bu nedenle Ingest Servis yalnız paket kabul eden bir giriş noktası değil, aynı zamanda cihaz davranışını gözleyen ilk savunma hattıdır.
Amaç
Bu katmandaki koruma politikasının iki amacı vardır:
- sistemi flood veya anormal tekrar yükünden korumak
- takılmış ya da sapıtmış cihaz davranışını operasyonel olarak görünür hale getirmek
Buradaki temel yaklaşım, mümkün olan her durumda veriyi kaybetmeden kabul etmek; ancak cihaz davranışını sınıflandırmak, işaretlemek ve gerektiğinde sınırlamaktır.
İzlenen Sinyaller
Ingest Servis cihaz bazında aşağıdaki sinyalleri izler:
- son kabul edilen paket ile yeni paket arasındaki süre (
interval_ms) - aynı
device_timeile tekrarlayan gönderimler - aynı payload içeriğinin arka arkaya yinelenmesi
- kısa zaman penceresindeki toplam paket sayısı
Bu amaçla canonical payload üzerinden kısa bir hash üretilebilir ve cihaz bazında son runtime state Redis'te tutulabilir.
Önerilen geçici runtime key yapısı:
device_runtime:{device_id}
Örnek alanlar:
{
"last_seen_at": "2026-03-31T21:40:00Z",
"last_device_time": "2026-03-31T21:39:58Z",
"last_payload_hash": "9a4b...",
"last_interval_ms": 1000,
"same_payload_repeat_count": 14,
"events_per_1m": 68,
"rate_status": "suspicious"
}
Davranış Sınıfları
Sistem cihaz gönderim davranışını aşağıdaki sınıflardan biriyle işaretleyebilir:
| Durum | Açıklama |
|---|---|
normal | Beklenen aralıkta ve doğal içerik değişimiyle gelen paketler |
suspicious | Beklenenden anlamlı derecede sık gelen veya tekrar örüntüsü gösteren paketler |
stuck | Aynı zaman / aynı payload ile uzun süre tekrar eden gönderim davranışı |
flood | Kısa zaman penceresinde sistemi zorlayacak yoğunlukta paket üretimi |
Bu sınıflama veri kabul kararından bağımsızdır. İlk hedef, davranışı görünür kılmaktır.
Duplicate ve Stuck Davranışı
Takılmış cihazların en yaygın belirtisi, aynı payload'ı ya da aynı cihaz zamanını art arda tekrar göndermesidir. Bu durum aşağıdaki sinyallerle tespit edilebilir:
device_idaynıdevice_timeaynı- canonical payload hash aynı
- paket aralığı beklenenden çok daha kısa
Bu koşullar birlikte sağlanıyorsa paket duplicate kabul edilir. Duplicate paket normal ingest akışına tekrar sokulmaz; raw tabloya yeniden yazılmaz, Redis tekrar güncellenmez, ancak uyarı amaçlı event veya log üretilir.
Bu örüntü üst üste belirli bir eşik kadar gerçekleşirse cihaz stuck olarak işaretlenebilir.
Koruma Stratejisi
İlk aşamada önerilen politika soft protection yaklaşımıdır. Yani sistem, anormal davranışı mümkün olduğunca veri kaybetmeden kabul eder; ancak paketi işaretler, sayaçları artırır ve uyarı üretir.
Bu modelde:
- geçerli payload normal ingest akışına devam eder
- cihaz runtime state'i güncellenir
- warning veya anomaly event üretilebilir
- gözlem ve operasyon tarafı cihazı görünür şekilde izler
Daha agresif koruma yalnız gerçekten gerekli olduğunda devreye alınmalıdır.
Rate Limiting: Token Bucket Algoritması
Saha geri bildirimleri dikkate alındıktan sonra hard protection gerekirse, cihaz bazlı token bucket rate limiting uygulanırır.
Token Bucket Mekanizması:
device_bucket:{device_id} =
{
"tokens": integer (0..max_burst),
"last_refill_at": timestamp,
"refill_rate": float (tokens/second),
"max_burst": integer
}
Algoritma:
- Her gelen paket için önce
device_bucket:{device_id}okunur - Son dolum zamanından beri geçen süreden bakiye hesaplanır:
elapsed = now - last_refill_at
tokens_gained = min(elapsed * refill_rate, max_burst)
available_tokens = min(bucket.tokens + tokens_gained, max_burst) - Paket kabul için
cost=1token harcanır:if available_tokens >= 1:
bucket.tokens = available_tokens - 1
accept_packet()
else:
send_429_too_many_requests()
Önerilen Parametreler:
| Parametre | Değer | Açıklama |
|---|---|---|
refill_rate | 0.2 tokens/sec | İdeal cihaz: 12 paket/dakika (0.2 pps) |
max_burst | 5 tokens | Ani 5 paketle (burst) tolerans |
| Burst limit | 5 tokens | 5 paket = ~25 saniyelik akış |
| Throttle window | 60s | 429 yanıtından sonra retry delay |
Örnek Senaryo:
- Cihaz 2 dakika arasında işlemci hatası nedeniyle 8 paket hızlı gönderirse:
-
- paket: ✅ token=4 kalır
-
- paket: ✅ token=3 kalır
-
- paket: ✅ token=2 kalır
-
- paket: ✅ token=1 kalır
-
- paket: ✅ token=0 kalır
-
- paket: ❌
429 Too Many Requests(buffer full)
- paket: ❌
- 7–8. paketler: ❌
429 Too Many Requests - 5 saniye sonra: token refill → 6. paket kabul
-
Uyarı Üretim:
- Eğer paket
429ile reddedildiyse, cihazfloodsınıfına taşınır - Warning event:
device.ingest_flood_detected.v1üretilir - Tekrar işleme (replay) bu paketler için manuel bir hareket gerekebilir
Olası Hard Protection Adımları
Saha davranışı ve yük profili görüldükten sonra aşağıdaki ek önlemler eklenebilir:
- kısa süreli throttle (adaptive backoff)
429 Too Many Requestsyanıtı (token bucket doluluğunda)- belirli eşik sonrası kontrollü drop (mükerrer paketler)
- cihazı geçici quarantine listesine alma (repeat offender)
- per-device giriş bağlantısı timeout (3–5 saniye)
Bu önlemler ortama göre konfigüre edilebilir; ancak mimari bunları destekleyecek şekilde tasarlanmıştır.
Operasyonel Uyarılar
Ingest Servis, anormal gönderim örüntülerinde ops tarafını bilgilendirecek event veya log üretebilir.
Örnek olaylar:
device.ingest_duplicate_detected.v1device.ingest_stuck_detected.v1device.ingest_flood_detected.v1
Bu uyarılar dashboard, alarm altyapısı veya replay / inceleme iş akışına bağlanabilir.
Önerilen İlk Uygulama Seviyesi
İlk faz için aşağıdaki koruma seviyesi yeterlidir:
- cihaz bazlı son paket aralığını izlemek
- canonical payload hash ile tekrar tespiti yapmak
- Redis içinde kısa runtime state tutmak
normal / suspicious / stuck / floodsınıflaması üretmek- warning seviyesinde olay kaydı üretmek
Önerilen başlangıç eşikleri:
| Kural | Eşik | Sonuç |
|---|---|---|
Aynı device_time + aynı hash tekrarı | 3 kez üst üste | suspicious |
Aynı device_time + aynı hash tekrarı | 10 kez üst üste | stuck |
| 1 dakikadaki paket sayısı | > 12 | suspicious |
| 1 dakikadaki paket sayısı | > 30 | flood |
| Paket aralığı | beklenen aralığın < 1/4'ü | suspicious |
| Paket aralığı | beklenen aralığın < 1/10'u | flood |
Bu seviyeden sonra gerçek saha verisine göre rate limit eşikleri sertleştirilebilir.
Konfigürasyon Yönetimi
Environment Variables ve Loading Order
Tüm eşik ve rate limit parametreleri hardcoded değil, environment variables ile kontrol edilir:
# 1. Loading order (priority düşük → yüksek):
# a) Default values (hardcoded)
INGEST_DUPLICATE_REPEAT_THRESHOLD=10
INGEST_SUSPICIOUS_REPEAT_THRESHOLD=3
INGEST_EVENTS_PER_MINUTE_SUSPICIOUS=12
INGEST_EVENTS_PER_MINUTE_FLOOD=30
INGEST_RATE_LIMIT_REFILL_RATE=0.2
INGEST_RATE_LIMIT_MAX_BURST=5
# b) Config file (.env.${ENV})
source /etc/ingest/.env.${ENVIRONMENT}
# c) Runtime overrides
# Kubernetes ConfigMap veya Helm values ile process başlatılır
docker run -e INGEST_EVENTS_PER_MINUTE_FLOOD=25 ingest-service:v1
Per-Environment Profili
Development (dev):
ENVIRONMENT=development
INGEST_DUPLICATE_REPEAT_THRESHOLD=5 # Gevşek
INGEST_SUSPICIOUS_REPEAT_THRESHOLD=2
INGEST_EVENTS_PER_MINUTE_SUSPICIOUS=20
INGEST_EVENTS_PER_MINUTE_FLOOD=50 # Yüksek tolerans
INGEST_RATE_LIMIT_REFILL_RATE=0.5 # Hızlı refill
INGEST_RATE_LIMIT_MAX_BURST=10
INGEST_PAYLOAD_MAX_SIZE_BYTES=32000 # 32 KB (test payload)
INGEST_RETRY_MAX_ATTEMPTS=5 # Dev'de daha çok retry
Staging (staging):
ENVIRONMENT=staging
INGEST_DUPLICATE_REPEAT_THRESHOLD=8 # Orta
INGEST_SUSPICIOUS_REPEAT_THRESHOLD=4
INGEST_EVENTS_PER_MINUTE_SUSPICIOUS=15
INGEST_EVENTS_PER_MINUTE_FLOOD=35
INGEST_RATE_LIMIT_REFILL_RATE=0.25
INGEST_RATE_LIMIT_MAX_BURST=6
INGEST_PAYLOAD_MAX_SIZE_BYTES=16000 # 16 KB
INGEST_RETRY_MAX_ATTEMPTS=3
Production (prod):
ENVIRONMENT=production
INGEST_DUPLICATE_REPEAT_THRESHOLD=10 # Kesin
INGEST_SUSPICIOUS_REPEAT_THRESHOLD=3
INGEST_EVENTS_PER_MINUTE_SUSPICIOUS=12
INGEST_EVENTS_PER_MINUTE_FLOOD=30 # Kesin koşul
INGEST_RATE_LIMIT_REFILL_RATE=0.2 # 12 pps
INGEST_RATE_LIMIT_MAX_BURST=5 # Minimal burst
INGEST_PAYLOAD_MAX_SIZE_BYTES=16000 # 16 KB (kesin limit)
INGEST_RETRY_MAX_ATTEMPTS=2
INGEST_CONNECTION_TIMEOUT_MS=5000
INGEST_REQUEST_TIMEOUT_MS=10000
Validation ve Runtime Check
Servis startup zamanında configuration validate edilir:
def validate_config():
assert REFILL_RATE > 0, "INGEST_RATE_LIMIT_REFILL_RATE > 0"
assert MAX_BURST > 0, "INGEST_RATE_LIMIT_MAX_BURST > 0"
assert PAYLOAD_MAX_SIZE > 0, "INGEST_PAYLOAD_MAX_SIZE_BYTES > 0"
assert SUSPICIOUS_REPEAT < DUPLICATE_REPEAT, \
f"suspicious({SUSPICIOUS_REPEAT}) < duplicate({DUPLICATE_REPEAT})"
assert EVENTS_SUSPICIOUS < EVENTS_FLOOD, \
f"suspicious({EVENTS_SUSPICIOUS}) < flood({EVENTS_FLOOD})"
logger.info(f"Config loaded: ENVIRONMENT={ENVIRONMENT}, "
f"RATE_LIMIT={REFILL_RATE}pps, "
f"FLOOD_THRESHOLD={EVENTS_FLOOD}/min")
Threshold Tuning Stratejisi
Aşama 1: Taban Eşikleri (Production ilk deploy):
- Konservatif ayarlar (flood threshold = 30/min)
- 1 hafta gözlem yapılır
- False positive log tahlili
Aşama 2: Telemetri Toplanması:
- Tüm cihazların paket interval dağılımı analiz edilir
- Per-device anomaly profili oluşturulur (Q1/Q3/IQR)
- Device cluster'laması (IoT vs mobile vs fixed)
Aşama 3: Dinamik Tuning:
# Örnek: Device cluster bazında farklı eşikler
IoT gateway devices → FLOOD_THRESHOLD = 20/min (düşük bw)
Mobile field sensors → FLOOD_THRESHOLD = 50/min (intermittent)
Fixed industrial meters → FLOOD_THRESHOLD = 10/min (çok stabil)
Aşama 4: Feedback Loop:
- Monthly review: flood alert count / false positive ratio
- SLI correlation: latency vs duplicate detection accuracy
- Eğer false positive > 5% → eşikleri %10 arttır
- Eğer missed flood > 2% → eşikleri %15 azalt
Her environment'in ayrı monitoring dashboard'u olmalıdır:
- Dev: Threshold tuning için real-time feedback
- Staging: Production simülasyonu (eşikler aynı)
- Prod: Alert + manual override seçenekleri
Dashboards'ta görülmesi gereken:
device_behavior_status(gauge) — per-device: normal/suspicious/stuck/floodingest_threshold_breach_count(counter) — min başına breach sayısıingest_token_bucket_fullness(histogram) — device başına %0-100 fullness
Bu seviyeden sonra gerçek saha verisine göre rate limit eşikleri sertleştirilebilir.
Tasarım Gerekçesi
Bu servis için tercih edilen model şu sırayı izler:
- şema doğrulama
- duplicate kontrolü
- raw DB yazımı
- normalize
- Redis yazımı
- Kafka üretimi
- ACK
Bu model, daha agresif asenkron kabul modeline göre biraz daha yavaştır; ancak veri kaybı, belirsiz ACK ve yarım kabul durumu risklerini azaltır. Cihazın aldığı 200 OK böylece güvenilir bir kabul sinyali olur.
Kafka'nın ACK dışında tutulması ise sistemin kabul güvenliğini bozmadan downstream tetikleme katmanını daha esnek hale getirir.
Sınırlar ve Notlar
- Raw DB, historical kabul kaydıdır; Redis ise operasyonel çalışma alanıdır.
- Normalize işlem yalnız Ingest katmanında yapılır; downstream servisler canonical yapıyı kullanır.
device_bufferiçindeki temel kimlik/zaman ve ilk canonical alanlar (device_id,device_time,ingest.ingest_time,buffers.measurements) Ingest servisinin sorumluluğundadır.
Latency ve SLA Hedefleri
Ingest Servisi, cihazın HTTP bağlantısında bekleme süresi minimize etmek ve downstream işleme hattını tetiklemek için optimize edilmiştir.
Uçtan Uca Latency (device → HTTP 200 OK):
| Aşama | P50 | P99 | Hedef |
|---|---|---|---|
| Request parse | 0.5ms | 2ms | < 5ms |
| Schema validate | 1ms | 5ms | < 10ms |
| Duplicate check (Redis) | 2ms | 8ms | < 15ms |
| Raw DB write | 10ms | 50ms | < 100ms |
| Redis device_buffer write | 3ms | 15ms | < 20ms |
| Kafka produce | 5ms | 25ms | < 50ms |
| Total (P95) | ~22ms | ~105ms | < 150ms |
SLA Hedefleri:
| Metrik | Hedef | Alarm Eşiği |
|---|---|---|
| İstek başarı oranı (2xx/4xx) | > 99.5% | < 99.0% |
| P99 latency | < 150ms | > 200ms |
| Duplicate detection accuracy | > 99.9% | < 99.5% |
| Raw DB write success rate | > 99.99% | < 99.9% |
| Redis availability | > 99.99% | < 99.9% |
| Kafka produce latency | < 50ms (P99) | > 100ms |
Monitoring SLI'ları:
ingest_request_duration_ms(histogram): Parse → OK zamanıingest_raw_db_write_duration_ms(histogram): DB write latencyingest_redis_write_duration_ms(histogram): Redis write latencyingest_kafka_produce_duration_ms(histogram): Kafka publish latencyingest_error_rate(gauge): minute başına hata sayısıdevice_duplicate_count(counter): cihaz başına duplicate işaretlemelerdevice_flood_warnings(counter): flood uyarı sayısıbuffers.synthesis,buffers.windowgibi işleme alanları downstream servisler tarafından doldurulur.- Raw log için başlangıç retention önerisi 90 gündür; yüksek hacimde günlük partition tercih edilmelidir.
- Ingest girişinde temel korumalar olarak
max_payload_size = 16 KB, kısa request timeout ve malformed JSON rate limiting önerilir. - Bu sayfa CPS standardını tekrar tanımlamaz; wire sözleşme için
/acik-kaynak/cpsreferans alınmalıdır.
Sonuç
Ingest Servis, Cınga veri işleme mimarisinin güvenli kabul sınırıdır. CPS uyumlu cihaz payload'ını alır, doğrular, duplicate kontrolünü uygular, kalıcı raw kaydını üretir, normalize edilmiş çalışma verisini Redis'e yazar, Kafka ile işleme hattını tetikler ve kabul zinciri tamamlandıktan sonra cihaza 200 OK döner.
Bu sayede sistem şu üç ihtiyacı aynı anda karşılar:
- cihaz tarafında anlaşılır ve güvenilir ACK semantiği
- backend tarafında hızlı ortak çalışma alanı
- operasyonel olarak replay ve audit için kalıcı wire raw kayıt
Operasyon sırasında sorun giderme adımları, monitoring setup ve en sık sorulan sorukular için Sık Sorulan Sorular ve Troubleshooting sayfasına bakınız.