Ana içeriğe geç

streams

streams, ingest tarafından accepted edilen her telemetry olayının kayıt tablosudur. Bu tabloda yalnız gerekli tüm kontrollerden geçmiş, valid ve non-duplicate paketler yer alır.

Ingest bir paketi kabul ettiğinde streams tablosuna insert yapar ve otomatik artan id alanından bir stream ID üretilir. Bu ID, ilerleyen tüm measurement, state ve ledger kayıtları için temel referans olur; downstream servisler raw_data'yı değil doğrudan stream zincirini kullanır.

Bir stream kaydının oluşabilmesi için şu koşulların tamamı sağlanmış olmalıdır:

  • raw_data journal kaydı mevcut
  • JSON parse başarılı
  • CPS schema validation başarılı
  • Paket duplicate değil

Bu tablo wire payload içermez, ham birincil kopyası raw_data'da kalır. Measurement sonuçları, state ve ledger çıktıları ayrı tablolarda tutulur.

Tablo Kurallari

Partition/Retention Kurali

  • streams tablosu stream_time bazli aylik partition modeli (yyyy_mm) ile calisir ve migration ile zorunlu kilinir.
  • Retention suresi 24 aydir. 24 aydan eski streams partitionlari ve bagli telemetri kayitlari kontrollu operasyonla silinir.

Degistirilemezlik ve Kontrollu Silme Kurali

  • INSERT serbesttir.
  • UPDATE genel olarak yasaktir.
  • DELETE sadece kontrollu prosedur ile serbesttir.

Bu kuralin amaci accepted kayitlarin sonradan degistirilmesini engellemek ve cihaz yeniden atama surecinde veri karismasini onlemektir.

Istisna: operasyonel izleme icin yalniz current_stage ve stage_update_time alanlari, onayli stage fonksiyonu uzerinden guncellenebilir. Ledger sureci icin ledger_status alani da yalniz onayli ledger fonksiyonu uzerinden guncellenebilir.

Stage Izleme Kurali

streams tablosu lineer servis akisinin son durumunu izlemek icin iki alan tutar:

  • current_stage
  • stage_update_time

Bu alanlar sadece akisin hangi basamakta oldugunu izlemek icindir; stream icerigi, kaynak baglami ve olcum kimlikleri degistirilmez.

Not: current_stage ledger sonucunu temsil etmez; ledger sonucu yalniz ledger_status ile izlenir.

Cihaz Yeniden Atama Kurali

Bir cihaz musteriden geri gelip yeni araziye takilacaksa, secilen cihaza ait tum streams ve bagli telemetri zinciri birlikte silinir.

  • Silme once arsiv/snapshot alma (opsiyonel ama onerilir).
  • Silme sonrasi yeniden hesaplama/islem tetigi (materialized view, ozet tablo, cache temizligi).

Not: streams.device_id -> devices.id iliskisi ON DELETE RESTRICT oldugu icin telemetri gecmisi olan cihazin fiziksel silinmesi engellenir. Buradaki silme, devices satirini degil, streams ve bagli olcum zincirini hedefler.

Uygulama Sozlesmesi

Bu kurallar DB seviyesinde enforce edilir:

  1. BEFORE UPDATE trigger ile dogrudan guncelleme reddedilir.
  2. Dogrudan DELETE uygulama rolunde yasaktir.
  3. Silme yalniz onayli DB fonksiyonu uzerinden calisir.
  4. Stage guncellemesi yalniz onayli DB fonksiyonu uzerinden calisir.
  5. ledger_status guncellemesi yalniz onayli ledger fonksiyonu uzerinden calisir.

Zorunlu fonksiyon sozlesmesi:

  • ops.delete_device_streams_full(p_device_id, p_reason, p_actor)
  • Fonksiyon, silinen satir sayisini ve islem kimligini (operation_id) dondurur.
  • Fonksiyon cagrisi stream_audit tablosuna append-only kayit yazar (kim, neden, ne zaman, hangi cihaz).

Zorunlu stage fonksiyonu sozlesmesi:

  • ops.set_stream_stage(p_stream_id, p_stage, p_service, p_actor, p_error default null)
  • Fonksiyon sadece current_stage ve stage_update_time alanlarini gunceller.
  • Fonksiyon idempotent calisir: ayni stage tekrar geldiginde gereksiz update yapmaz.
  • Fonksiyon optimistic kosulla gunceller (ornek: WHERE id = p_stream_id AND current_stage <> p_stage) ve etkilenen satir sayisini dondurur.
  • Fonksiyon cagrisi opsiyonel olarak stream_audit veya operasyonel loga olay dusurur.

Zorunlu ledger fonksiyonu sozlesmesi:

  • ops.set_stream_ledger_status(p_stream_id, p_ledger_status, p_service, p_actor, p_error default null)
  • Fonksiyon sadece ledger_status alanini gunceller.
  • Fonksiyon idempotent calisir: ayni ledger_status tekrar geldiginde gereksiz update yapmaz.
  • Fonksiyon optimistic kosulla gunceller (ornek: WHERE id = p_stream_id AND ledger_status <> p_ledger_status) ve etkilenen satir sayisini dondurur.
  • Fonksiyon cagrisi opsiyonel olarak stream_audit veya operasyonel loga olay dusurur.

Eszamanlilik kurali:

  • Fonksiyonlar tek satir atomik update yapar ve gereksiz tekrar update'i engeller.
  • Servisler fonksiyon donus sonucuna gore retry/no-op karari verir.

Kanit ve Dogrulama

  • Partition olusum migrationi ve retention calistirma kaydi (drill/evidence) tutulur.
  • Dogrudan UPDATE ve DELETE denemelerinin hata verdigi kanitlanir.
  • Stage fonksiyonu disinda UPDATE yapilamadigi kanitlanir.
  • Yetkili fonksiyon cagrisi ile silme yapilabildigi kanitlanir.
  • Yetkili stage fonksiyonu ile current_stage gecislerinin calistigi kanitlanir.
  • Yetkili ledger fonksiyonu ile ledger_status gecislerinin calistigi kanitlanir.
  • Sonuclar migration playbook ve evidence kayitlarina referanslanir.

FK Davranis Notlari

FKON DELETEON UPDATENot
streams.device_id -> devices.idRESTRICTCASCADETelemetri gecmisi olan cihazin fiziksel silinmesi engellenir.
streams.raw_id -> raw_data.idSET NULLCASCADERaw kaydi temizlenirse stream tarihcesi korunur.
Bu tabloyu referanslayan FKON DELETEON UPDATENot
measurements.stream_id -> streams.idCASCADECASCADEGeneric olcum kayitlari stream ile birlikte temizlenir.
measurements_*.stream_id -> streams.idCASCADECASCADETyped olcum tablolari stream ile atomik yasar.
rule_events.measurement_stream_id -> streams.idSET NULLCASCADEOlay kaydi korunur, tetikleyici stream referansi bosaltilir.
logs.stream_id -> streams.idSET NULLCASCADELog denetim izi korunur.

Indeksler

IndeksTipAciklama
idPrimary KeyTablo birincil anahtari
(device_id, stream_time)B-TreeCihazin zaman bazli stream gecmisi sorgulari
(stream_time)B-TreePartition pruning ile birlikte zaman penceresi sorgulari
(current_stage, stage_update_time)Partial B-TreeSadece takip gereken stage'ler icin operasyonel kuyruk sorgulari
(ledger_status, stream_time)Partial B-TreeLedger pending/failed durumlarinin taranmasi

Not: current_stage ve ledger_status indeksleri tum degerlere degil, operasyonel olarak izlenen alt kume degerlere partial index olarak uygulanir.

Kolonlar

KolonTipNullAnlamı
idinthayırAccepted stream birincil anahtarı; tüm downstream referanslar buraya bağlanır
device_idvarchar(21)hayırAccepted paketin cihaz kimliği
message_type_idintevetPaket tipi (timed, interrupt, alarm vb.)
sim_idintevetPaketin geldiği SIM hattı
ip_addressvarchar(45)evetAccepted isteğin kaynak IP bilgisi
payload_sizeintevetHTTP body boyutu
process_time_msfloatevetIngest'in paketi aldığı andan tüm kontrolleri tamamlayıp DB'ye yazana kadar geçen süre
device_timetimestamphayırCihazın payload içinde bildirdiği zaman
stream_timetimestamphayırSistemin accepted event zamanı
raw_idintevetStream'in türetildiği raw_data kaydı; traceability için
current_stagestream_stagehayırStream'in lineer servis akisindaki mevcut asamasi
stage_update_timetimestampevetSon stage gecisinin yazildigi zaman
ledger_statusstream_ledger_statushayırLedger yazım sonucu

current_stage enum değerleri:

DeğerAnlamı
acceptedStream accepted ve kaydedildi
calibratedKalibrasyon basamagi tamamlandi
raw_writtenHam/yardimci yazim basamagi tamamlandi
synthesizedSentez basamagi tamamlandi
failedAkis basamaklarindan biri hata ile sonlandi

ledger_status enum değerleri:

DeğerAnlamı
pendingLedger henüz denenmedi
writtenLedger başarıyla yazıldı
failedLedger yazımı başarısız oldu
skippedBilinçli olarak atlandı

Örnek Kayıtlar

{
"id": 1,
"device_id": "46000000C47CA670",
"message_type_id": 1,
"sim_id": 1,
"ip_address": "10.10.1.25",
"payload_size": 512,
"process_time_ms": 18.4,
"device_time": "2026-04-03T10:29:50Z",
"stream_time": "2026-04-03T10:30:00Z",
"raw_id": 1,
"current_stage": "synthesized",
"stage_update_time": "2026-04-03T10:30:07Z",
"ledger_status": "pending"
}

Özet: Timed telemetry paketi kabul zincirinden geçmiş; downstream measurement ve state kayıtları bu stream ID üzerinden bağlanır.