Ingest Servis
Ingest Servis, cihazlardan gelen CPS uyumlu veri paketlerini sisteme kabul eden giriş mikroservisidir. Bu servis doğrudan internetten ya da ayrılmış bir giriş alanından (iot.xxxx.com, gate.xxxx.com gibi) HTTP üzerinden veri alır. Temel amacı, cihazı mümkün olduğunca kısa süre bekletirken veriyi güvenli biçimde kabul etmek, kalıcı raw kaydı üretmek, normalize edilmiş çalışma halini Redis'e yazmak ve işleme hattını Kafka üzerinden tetiklemektir.
Bu katman düşük gecikmeli bir kabul servisidir. Hesap yapmaz, sentez üretmez, pencere hesabı çalıştırmaz, iş kuralları uygulamaz. Gelen paketi kabul eder, kalıcı ve operasyonel kaydını oluşturur, ardından diğer servislerin işlemesi için sistemi tetikler.
Sorumluluk
Ingest Servis aşağıdaki sorumluluklara sahiptir:
- Cihazdan gelen HTTP payload'ını almak
- CPS şemasına göre parse ve doğrulama yapmak
- Duplicate paket kontrolü yapmak
- Cihazdan gelen wire format veriyi kalıcı raw tabloya yazmak
- Payload'ı backend içinde kullanılacak canonical yapıya normalize etmek
- Normalize edilmiş veriyi Redis
device_buffer:{device_id}kaydına yazmak - Kafka'ya minimal tetikleyici event üretmek
- Kabul zinciri tamamlandıktan sonra cihaza
200 OKdönmek
Bu servis aşağıdakileri yapmaz:
- paket birleştirme
- stream assembly
- sentez hesapları
- pencere veya aggregation üretimi
- rule evaluation
- kalıcı business table yazımı
Temel Tasarım Kararı
Ingest Servis her paketi bağımsız olarak işler. Önceki tasarımlarda yer alan çoklu paket bölme, BASIC / DETAIL / EXPERIMENT profilleri, Stream_End, assembly timeout ve parçalı finalize akışları bu tasarımın parçası değildir. Gelen her paket tek başına anlamlı kabul edilir ve bağımsız bir ingest olayı olarak işlenir.
Bu yaklaşımın ana gerekçeleri şunlardır:
- cihaz tarafını sadeleştirmek
- kabul katmanını deterministik hale getirmek
- assembly kaynaklı hata yüzeyini azaltmak
- güçlü ACK semantiğini korumak
- veri akışını Redis + Kafka tabanlı yalın bir pipeline'a oturtmak
İşlem Akışı
Bir ingest isteği aşağıdaki sırayla işlenir:
- Cihaz
POST /v1/ingestile CPS uyumlu payload gönderir. - Ingest servis JSON parse işlemini ve CPS şema doğrulamasını yapar.
- Duplicate kontrolü uygulanır.
- Paket duplicate değilse payload, cihazdan geldiği wire format ile kalıcı raw tabloya yazılır.
- Aynı payload canonical backend yapısına normalize edilir.
- Normalize veri Redis
device_buffer:{device_id}kaydına yazılır. - Kafka'ya minimal event üretilmeye çalışılır.
- Parse + doğrulama + duplicate kontrolü + raw DB + Redis adımları başarıyla tamamlandıysa cihaza
200 OKdöner.
Bu tasarımda 200 OK, yalnız HTTP seviyesinde isteğin görüldüğü anlamına gelmez. ACK semantiği daha güçlüdür: payload doğrulanmış, duplicate değilse raw olarak kalıcı kayda alınmış ve normalize edilmiş hali Redis'e yazılmıştır. Kafka üretimi downstream tetikleme katmanıdır; kabul garantisinin birincil koşulu değildir.
ACK Semantiği
Bu mimaride 200 OK aşağıdaki anlamı taşır:
- payload parse edilmiştir
- CPS şema kontrolünden geçmiştir
- duplicate kontrolünden geçmiştir
- duplicate değilse raw kayıt kalıcı olarak yazılmıştır
- normalize edilmiş payload Redis çalışma alanına yazılmıştır
Kafka üretimi ACK için birincil kabul koşulu değildir. Kafka başarısızlığı, veri kabulünün başarısız olduğu anlamına gelmez; operasyonel hata olarak kaydedilir ve tekrar üretim mekanizmasına bırakılır.
Buna karşılık 200 OK aşağıdaki anlamlara gelmez:
- sentez hesapları tamamlandı
- diğer servisler veriyi işledi
- kalıcı iş tabloları güncellendi
- alarm veya business workflow sonuçlandı
Ingest bu noktada yalnız kabul katmanıdır.
CPS Veri Modeli
Cihaz ile Ingest Servis arasındaki veri sözleşmesi CPS standardına dayanır. Ayrıntılı protokol tanımı ve alan yapısı için /acik-kaynak/cps referans alınmalıdır.
Ingest bu standarda göre gelen veriyi iki aşamada ele alır:
- Wire payload: cihazdan geldiği orijinal şekil
- Canonical payload: backend servislerinin ortak biçimde kullanacağı normalize edilmiş şekil
Bu ayrım özellikle önemlidir çünkü kalıcı raw kayıt wire formatta tutulur, Redis device_buffer ise normalize edilmiş canonical veriyi taşır.
Şema Doğrulama ve Normalize
Ingest Servis yalnızca parse edilebilir JSON kabul etmez; payload'ın CPS sözleşmesine uyup uymadığını da doğrular. Bu doğrulama kapsamında en azından aşağıdaki başlıklar kontrol edilir:
- üst seviye yapı
- zorunlu alanların varlığı
device_ideşdeğeri cihaz kimliği- cihaz zamanı alanı
- sayısal alanların tür tutarlılığı
- CPS içindeki beklenen blok yapısı
Doğrulama başarılı olduktan sonra payload backend içinde kullanılan canonical yapıya normalize edilir. Bu canonical yapı, downstream servislerin tekrar tekrar mapping yapmak zorunda kalmaması için Ingest katmanında üretilir.
Bu nedenle sistem içi sorumluluk sınırı nettir:
- Raw DB → wire payload
- Redis
device_buffer→ normalize edilmiş canonical payload
Kimlik Doğrulama Politikası
Ingest Servis kimlik doğrulama yapmaz. Kabul kriteri CPS şemasına uygun ve parse edilebilir payload olmaktır.
Bu kararın anlamı şudur:
- Ingest katmanı network-level giriş servisidir
- cihaz kaydı, tanıma veya iş kuralı seviyesindeki doğrulamalar downstream servislerde uygulanır
- downstream servisler sistemde tanımlı olmayan
device_idiçin işleme devam etmeyebilir
Kimlik doğrulama yapılmıyor olması, servisin tamamen kontrolsüz bırakılacağı anlamına gelmez. Operasyonel olarak aşağıdaki korumalar önerilir:
- payload boyutu sınırı
- malformed JSON rate limiting
- burst protection / throttling
- gerektiğinde IP bazlı sınırlama
Kalıcı Raw Kayıt
Ingest Servis, kabul ettiği her geçerli ve duplicate olmayan payload için kalıcı raw kayıt üretir. Bu kayıtın amacı sadece arşiv değildir; aynı zamanda audit, hata analizi, replay ve veri kaybı incelemesidir.
Buradaki temel karar şudur:
- DB'de tutulan veri: cihazdan geldiği wire raw payload
- Redis'te tutulan veri: normalize edilmiş canonical çalışma hali
Bu ayrım sayesinde sistem aynı anda hem operasyonel olarak hızlı hem de gözlemlenebilir olur.
Raw Tablo Amacı
Kalıcı raw tablo aşağıdaki amaçlara hizmet eder:
- cihaz gerçekten paket gönderdi mi sorusuna cevap vermek
- ingest kabulünden sonra downstream hata analizi yapmak
- normalize sürecindeki olası bozulmaları incelemek
- belirli bir zaman aralığı için replay zemini oluşturmak
- veri kaybı veya sıra dışı davranışlarda delil niteliğinde kayıt tutmak
Önerilen Raw Tablo Şeması
Aşağıdaki alan seti bu mimari için yeterlidir:
| Alan | Tip | Açıklama |
|---|---|---|
raw_id | bigserial / uuid | Birincil anahtar |
received_at | timestamptz | Paketin ingest tarafından alındığı zaman |
device_time | timestamptz | Cihazın payload içinde bildirdiği zaman |
device_id | varchar | Cihaz kimliği |
client_ip | varchar / inet | Paketin geldiği istemci IP bilgisi |
pack_size | integer | HTTP body boyutu |
raw_pack | text | Cihazdan geldiği haliyle wire payload |
Notlar:
raw_packalanınıntexttutulması önerilir. Böylece gelecekteki alan genişlemelerinde sabit karakter sınırına takılmaz.- Bu tablo append-only düşünülmelidir. Update odaklı değil, olay kaydı odaklıdır.
- Bu tabloda normalize edilmiş payload tutulmaz; normalize veri Redis katmanındadır.
- Duplicate paketler bu tabloya ikinci kez yazılmaz.
Örnek tablo:
Redis Çalışma Alanı
Ingest Servis, doğrulanmış ve normalize edilmiş veriyi Redis içinde cihaz bazlı çalışma durumuna yazar.
Ana key yapısı:
device_buffer:{device_id}
Bu kayıt, yalnız son ham paketi saklayan basit bir cache değil; servisler arası ortak çalışma alanıdır. Ingest bu alanı başlatır, diğer servisler akış ilerledikçe kendi alanlarını doldurur.
Örnek yapı:
{
"device_id": "400000011D081B70",
"last_seen": "2026-03-12T09:50:00Z",
"ingest": {
"device_time": "2026-03-12T09:49:58Z",
"received_at": "2026-03-12T09:50:00Z",
"schema_version": 1
},
"raw": {
"STATUS": 1073741824,
"VRMS_R": 0.063,
"VRMS_S": 0.051,
"VRMS_T": 0.055,
"IRMS_R": 0.004,
"IRMS_S": 0.004,
"IRMS_T": 0.004
},
"device": {
"B_IV": 2.97,
"RSSI": 31
},
"buffers": {
"raw_buffer": {},
"synth_buffer": {},
"window_buffer": {}
},
"state_version": 1
}
Redis Kullanım Gerekçesi
Bu yaklaşımın nedeni tüm veriyi Kafka içinde taşımamak ve her servis için veritabanına gereksiz sorgu yükü bindirmemektir. Redis device_buffer, servisler arası ortak ve hızlı erişilebilir çalışma alanı sağlar.
Bu sayede:
- Kafka yalnız tetikleyici olur
- downstream servisler payload'ın tamamını Kafka'dan taşımak zorunda kalmaz
- işleme zinciri boyunca ortak state korunur
- son aşamada geçici buffer alanları temizlenebilir
Redis Alan Sorumluluğu
İlk kabul aşamasındaki aşağıdaki alanlar Ingest servisinin sorumluluğundadır:
ingest.*rawdevicelast_seenstate_version
Aşağıdaki işleme alanları ise downstream servisler tarafından doldurulur:
buffers.raw_bufferbuffers.synth_bufferbuffers.window_buffer
Bu ayrım, servisler arası state ownership karışıklığını önlemek için önemlidir.
Kafka Mesajı
Redis güncellemesinden sonra Ingest Servis Kafka'ya minimal bir event yazar. Bu event'in amacı payload taşımak değil, downstream servisleri tetiklemektir.
Kafka içinde full payload dolaştırılmaz. Asıl veri Redis device_buffer içinden okunur.
Önerilen event sözleşmesi:
{
"event": "ingest.accepted.v1",
"data": {
"device_id": "400000011D081B70",
"device_time": "2026-03-11T15:22:06Z",
"received_at": "2026-03-11T15:22:08Z",
"buffer_key": "device_buffer:400000011D081B70",
"schema_version": 1
}
}
Ingest Serviste Üretilen Eventler
| Topic | Event | Consume Eden | Üretim Koşulu |
|---|---|---|---|
cinga.ingest.raw | ingest.accepted.v1 | Stream / Processor servisleri | Raw DB + Redis yazımı sonrası |
cinga.ingest.failed | ingest.failed.v1 | Ops / Replay Worker | Parse, schema ya da kabul hatası |
cinga.ingest.warning | device.ingest_duplicate_detected.v1 | Ops / Monitoring | Duplicate paket algılandığında |
cinga.ingest.warning | device.ingest_stuck_detected.v1 | Ops / Monitoring | Cihaz stuck davranışı gösterdiğinde |
cinga.ingest.warning | device.ingest_flood_detected.v1 | Ops / Monitoring | Cihaz flood davranışı gösterdiğinde |
Bu sözleşmede en kritik alanlar şunlardır:
device_iddevice_timereceived_atbuffer_key
Bu alanlar downstream servislerin ihtiyaç duyduğu bağlamı verir; payload'ın tamamı Redis'ten okunur.
Hata Davranışı
Ingest Servis aşağıdaki hata sınıflarını ayırır:
Parse / Şema Hatası
JSON parse edilemiyorsa ya da payload CPS sözleşmesine uymuyorsa istek reddedilir. Bu durumda raw DB, Redis ve Kafka akışı başlatılmaz.
Duplicate Paket
Paket aşağıdaki koşullarla duplicate kabul edilir:
device_idaynıdevice_timeaynı- canonical payload hash aynı
Duplicate paket normal ingest akışına tekrar sokulmaz. Raw tabloya yeniden yazılmaz, Redis tekrar güncellenmez; ancak warning event veya log üretilir.
Raw DB Yazım Hatası
Geçerli payload kalıcı raw tabloya yazılamıyorsa istek başarısız kabul edilir. Bu durumda ACK dönülmez.
Redis Yazım Hatası
Normalize veri Redis device_buffer içine yazılamıyorsa istek başarısız kabul edilir. Bu durumda ACK dönülmez.
Kafka Yazım Hatası
Raw DB ve Redis yazımı başarılı olmasına rağmen Kafka üretimi başarısız olursa istek başarısız kabul edilmez. Bu durumda payload sistem tarafından kabul edilmiş sayılır, 200 OK dönebilir; ancak Kafka üretim hatası operasyonel hata olarak kayda alınır ve tekrar üretim mekanizmasına bırakılır.
Bu karar sayesinde kabul garantisi raw DB + Redis katmanında sağlanır; Kafka ise tetikleme katmanı olarak değerlendirilir.
Anormal Gönderim Davranışı ve Koruma Politikası
Sahada arızalı, takılmış ya da haberleşme katmanında hatalı davranan bir IoT cihazı kısa süre içinde normalden çok daha sık paket gönderebilir. Bu durum yalnız veri kalitesini bozmaz; aynı zamanda ingest hattını, Redis yazım hızını, raw log büyümesini ve downstream servisleri gereksiz yük altına sokabilir.
Bu nedenle Ingest Servis yalnız paket kabul eden bir giriş noktası değil, aynı zamanda cihaz davranışını gözleyen ilk savunma hattıdır.
Amaç
Bu katmandaki koruma politikasının iki amacı vardır:
- sistemi flood veya anormal tekrar yükünden korumak
- takılmış ya da sapıtmış cihaz davranışını operasyonel olarak görünür hale getirmek
Buradaki temel yaklaşım, mümkün olan her durumda veriyi kaybetmeden kabul etmek; ancak cihaz davranışını sınıflandırmak, işaretlemek ve gerektiğinde sınırlamaktır.
İzlenen Sinyaller
Ingest Servis cihaz bazında aşağıdaki sinyalleri izler:
- son kabul edilen paket ile yeni paket arasındaki süre (
interval_ms) - aynı
device_timeile tekrarlayan gönderimler - aynı payload içeriğinin arka arkaya yinelenmesi
- kısa zaman penceresindeki toplam paket sayısı
Bu amaçla canonical payload üzerinden kısa bir hash üretilebilir ve cihaz bazında son runtime state Redis'te tutulabilir.
Önerilen geçici runtime key yapısı:
device_runtime:{device_id}
Örnek alanlar:
{
"last_seen_at": "2026-03-31T21:40:00Z",
"last_device_time": "2026-03-31T21:39:58Z",
"last_payload_hash": "9a4b...",
"last_interval_ms": 1000,
"same_payload_repeat_count": 14,
"events_per_1m": 68,
"rate_status": "suspicious"
}
Davranış Sınıfları
Sistem cihaz gönderim davranışını aşağıdaki sınıflardan biriyle işaretleyebilir:
| Durum | Açıklama |
|---|---|
normal | Beklenen aralıkta ve doğal içerik değişimiyle gelen paketler |
suspicious | Beklenenden anlamlı derecede sık gelen veya tekrar örüntüsü gösteren paketler |
stuck | Aynı zaman / aynı payload ile uzun süre tekrar eden gönderim davranışı |
flood | Kısa zaman penceresinde sistemi zorlayacak yoğunlukta paket üretimi |
Bu sınıflama veri kabul kararından bağımsızdır. İlk hedef, davranışı görünür kılmaktır.
Duplicate ve Stuck Davranışı
Takılmış cihazların en yaygın belirtisi, aynı payload'ı ya da aynı cihaz zamanını art arda tekrar göndermesidir. Bu durum aşağıdaki sinyallerle tespit edilebilir:
device_idaynıdevice_timeaynı- canonical payload hash aynı
- paket aralığı beklenenden çok daha kısa
Bu koşullar birlikte sağlanıyorsa paket duplicate kabul edilir. Duplicate paket normal ingest akışına tekrar sokulmaz; raw tabloya yeniden yazılmaz, Redis tekrar güncellenmez, ancak uyarı amaçlı event veya log üretilir.
Bu örüntü üst üste belirli bir eşik kadar gerçekleşirse cihaz stuck olarak işaretlenebilir.
Koruma Stratejisi
İlk aşamada önerilen politika soft protection yaklaşımıdır. Yani sistem, anormal davranışı mümkün olduğunca veri kaybetmeden kabul eder; ancak paketi işaretler, sayaçları artırır ve uyarı üretir.
Bu modelde:
- geçerli payload normal ingest akışına devam eder
- cihaz runtime state'i güncellenir
- warning veya anomaly event üretilebilir
- gözlem ve operasyon tarafı cihazı görünür şekilde izler
Daha agresif koruma yalnız gerçekten gerekli olduğunda devreye alınmalıdır.
Olası Hard Protection Adımları
Saha davranışı ve yük profili görüldükten sonra aşağıdaki önlemler eklenebilir:
- cihaz bazlı token bucket rate limiting
- kısa süreli throttle
429 Too Many Requestsyanıtı- belirli eşik sonrası kontrollü drop
- cihazı geçici quarantine listesine alma
Bu önlemler ilk sürüm için zorunlu değildir; ancak mimari bunları destekleyecek şekilde düşünülmelidir.
Operasyonel Uyarılar
Ingest Servis, anormal gönderim örüntülerinde ops tarafını bilgilendirecek event veya log üretebilir.
Örnek olaylar:
device.ingest_rate_warning.v1device.ingest_duplicate_detected.v1device.ingest_stuck_detected.v1device.ingest_flood_detected.v1
Bu uyarılar dashboard, alarm altyapısı veya replay / inceleme iş akışına bağlanabilir.
Önerilen İlk Uygulama Seviyesi
İlk faz için aşağıdaki koruma seviyesi yeterlidir:
- cihaz bazlı son paket aralığını izlemek
- canonical payload hash ile tekrar tespiti yapmak
- Redis içinde kısa runtime state tutmak
normal / suspicious / stuck / floodsınıflaması üretmek- warning seviyesinde olay kaydı üretmek
Önerilen başlangıç eşikleri:
| Kural | Eşik | Sonuç |
|---|---|---|
Aynı device_time + aynı hash tekrarı | 3 kez üst üste | suspicious |
Aynı device_time + aynı hash tekrarı | 10 kez üst üste | stuck |
| 1 dakikadaki paket sayısı | > 12 | suspicious |
| 1 dakikadaki paket sayısı | > 30 | flood |
| Paket aralığı | beklenen aralığın < 1/4'ü | suspicious |
| Paket aralığı | beklenen aralığın < 1/10'u | flood |
Bu seviyeden sonra gerçek saha verisine göre rate limit eşikleri sertleştirilebilir.
Tasarım Gerekçesi
Bu servis için tercih edilen model şu sırayı izler:
- şema doğrulama
- duplicate kontrolü
- raw DB yazımı
- normalize
- Redis yazımı
- Kafka üretimi
- ACK
Bu model, daha agresif asenkron kabul modeline göre biraz daha yavaştır; ancak veri kaybı, belirsiz ACK ve yarım kabul durumu risklerini azaltır. Cihazın aldığı 200 OK böylece güvenilir bir kabul sinyali olur.
Kafka'nın ACK dışında tutulması ise sistemin kabul güvenliğini bozmadan downstream tetikleme katmanını daha esnek hale getirir.
Sınırlar ve Notlar
- Bu servis paket birleştirme yapmaz.
Pack,Stream_End, assembly timeout ve partial finalize mantıkları bu mimarinin parçası değildir.- Raw DB, historical kabul kaydıdır; Redis ise operasyonel çalışma alanıdır.
- Normalize işlem yalnız Ingest katmanında yapılır; downstream servisler canonical yapıyı kullanır.
device_bufferiçindekiingest,raw,device,last_seenve benzeri ilk kabul alanları Ingest servisinin sorumluluğundadır.buffers.raw_buffer,buffers.synth_buffer,buffers.window_buffergibi işleme alanları downstream servisler tarafından doldurulur.- Raw log için başlangıç retention önerisi 90 gündür; yüksek hacimde günlük partition tercih edilmelidir.
- Ingest girişinde temel korumalar olarak
max_payload_size = 16 KB, kısa request timeout ve malformed JSON rate limiting önerilir. - Bu sayfa CPS standardını tekrar tanımlamaz; wire sözleşme için
/acik-kaynak/cpsreferans alınmalıdır.
Sonuç
Ingest Servis, Cınga veri işleme mimarisinin güvenli kabul sınırıdır. CPS uyumlu cihaz payload'ını alır, doğrular, duplicate kontrolünü uygular, kalıcı raw kaydını üretir, normalize edilmiş çalışma verisini Redis'e yazar, Kafka ile işleme hattını tetikler ve kabul zinciri tamamlandıktan sonra cihaza 200 OK döner.
Bu sayede sistem şu üç ihtiyacı aynı anda karşılar:
- cihaz tarafında anlaşılır ve güvenilir ACK semantiği
- backend tarafında hızlı ortak çalışma alanı
- operasyonel olarak replay ve audit için kalıcı wire raw kayıt