streams
streams, ingest tarafından accepted edilen her telemetry olayının kayıt tablosudur. Bu tabloda yalnız gerekli tüm kontrollerden geçmiş, valid ve non-duplicate paketler yer alır.
Ingest bir paketi kabul ettiğinde streams tablosuna insert yapar ve otomatik artan id alanından bir stream ID üretilir. Bu ID, ilerleyen tüm measurement, state ve ledger kayıtları için temel referans olur; downstream servisler raw_data'yı değil doğrudan stream zincirini kullanır.
Bir stream kaydının oluşabilmesi için şu koşulların tamamı sağlanmış olmalıdır:
raw_datajournal kaydı mevcut- JSON parse başarılı
- CPS schema validation başarılı
- Paket duplicate değil
Bu tablo wire payload içermez, ham birincil kopyası raw_data'da kalır. Measurement sonuçları, state ve ledger çıktıları ayrı tablolarda tutulur.
Tablo Kurallari
Partition/Retention Kurali
streamstablosustream_timebazli aylik partition modeli (yyyy_mm) ile calisir ve migration ile zorunlu kilinir.- Retention suresi 24 aydir. 24 aydan eski
streamspartitionlari ve bagli telemetri kayitlari kontrollu operasyonla silinir.
Degistirilemezlik ve Kontrollu Silme Kurali
INSERTserbesttir.UPDATEgenel olarak yasaktir.DELETEsadece kontrollu prosedur ile serbesttir.
Bu kuralin amaci accepted kayitlarin sonradan degistirilmesini engellemek ve cihaz yeniden atama surecinde veri karismasini onlemektir.
Istisna: operasyonel izleme icin yalniz current_stage ve stage_update_time alanlari, onayli stage fonksiyonu uzerinden guncellenebilir. Ledger sureci icin ledger_status alani da yalniz onayli ledger fonksiyonu uzerinden guncellenebilir.
Stage Izleme Kurali
streams tablosu lineer servis akisinin son durumunu izlemek icin iki alan tutar:
current_stagestage_update_time
Bu alanlar sadece akisin hangi basamakta oldugunu izlemek icindir; stream icerigi, kaynak baglami ve olcum kimlikleri degistirilmez.
Not: current_stage ledger sonucunu temsil etmez; ledger sonucu yalniz ledger_status ile izlenir.
Cihaz Yeniden Atama Kurali
Bir cihaz musteriden geri gelip yeni araziye takilacaksa, secilen cihaza ait tum streams ve bagli telemetri zinciri birlikte silinir.
- Silme once arsiv/snapshot alma (opsiyonel ama onerilir).
- Silme sonrasi yeniden hesaplama/islem tetigi (materialized view, ozet tablo, cache temizligi).
Not: streams.device_id -> devices.id iliskisi ON DELETE RESTRICT oldugu icin telemetri gecmisi olan cihazin fiziksel silinmesi engellenir. Buradaki silme, devices satirini degil, streams ve bagli olcum zincirini hedefler.
Uygulama Sozlesmesi
Bu kurallar DB seviyesinde enforce edilir:
BEFORE UPDATEtrigger ile dogrudan guncelleme reddedilir.- Dogrudan
DELETEuygulama rolunde yasaktir. - Silme yalniz onayli DB fonksiyonu uzerinden calisir.
- Stage guncellemesi yalniz onayli DB fonksiyonu uzerinden calisir.
ledger_statusguncellemesi yalniz onayli ledger fonksiyonu uzerinden calisir.
Zorunlu fonksiyon sozlesmesi:
ops.delete_device_streams_full(p_device_id, p_reason, p_actor)- Fonksiyon, silinen satir sayisini ve islem kimligini (
operation_id) dondurur. - Fonksiyon cagrisi stream_audit tablosuna append-only kayit yazar (kim, neden, ne zaman, hangi cihaz).
Zorunlu stage fonksiyonu sozlesmesi:
ops.set_stream_stage(p_stream_id, p_stage, p_service, p_actor, p_error default null)- Fonksiyon sadece
current_stagevestage_update_timealanlarini gunceller. - Fonksiyon idempotent calisir: ayni stage tekrar geldiginde gereksiz update yapmaz.
- Fonksiyon optimistic kosulla gunceller (ornek:
WHERE id = p_stream_id AND current_stage <> p_stage) ve etkilenen satir sayisini dondurur. - Fonksiyon cagrisi opsiyonel olarak
stream_auditveya operasyonel loga olay dusurur.
Zorunlu ledger fonksiyonu sozlesmesi:
ops.set_stream_ledger_status(p_stream_id, p_ledger_status, p_service, p_actor, p_error default null)- Fonksiyon sadece
ledger_statusalanini gunceller. - Fonksiyon idempotent calisir: ayni
ledger_statustekrar geldiginde gereksiz update yapmaz. - Fonksiyon optimistic kosulla gunceller (ornek:
WHERE id = p_stream_id AND ledger_status <> p_ledger_status) ve etkilenen satir sayisini dondurur. - Fonksiyon cagrisi opsiyonel olarak
stream_auditveya operasyonel loga olay dusurur.
Eszamanlilik kurali:
- Fonksiyonlar tek satir atomik update yapar ve gereksiz tekrar update'i engeller.
- Servisler fonksiyon donus sonucuna gore retry/no-op karari verir.
Kanit ve Dogrulama
- Partition olusum migrationi ve retention calistirma kaydi (drill/evidence) tutulur.
- Dogrudan
UPDATEveDELETEdenemelerinin hata verdigi kanitlanir. - Stage fonksiyonu disinda
UPDATEyapilamadigi kanitlanir. - Yetkili fonksiyon cagrisi ile silme yapilabildigi kanitlanir.
- Yetkili stage fonksiyonu ile
current_stagegecislerinin calistigi kanitlanir. - Yetkili ledger fonksiyonu ile
ledger_statusgecislerinin calistigi kanitlanir. - Sonuclar migration playbook ve evidence kayitlarina referanslanir.
FK Davranis Notlari
| FK | ON DELETE | ON UPDATE | Not |
|---|---|---|---|
streams.device_id -> devices.id | RESTRICT | CASCADE | Telemetri gecmisi olan cihazin fiziksel silinmesi engellenir. |
streams.raw_id -> raw_data.id | SET NULL | CASCADE | Raw kaydi temizlenirse stream tarihcesi korunur. |
| Bu tabloyu referanslayan FK | ON DELETE | ON UPDATE | Not |
|---|---|---|---|
measurements.stream_id -> streams.id | CASCADE | CASCADE | Generic olcum kayitlari stream ile birlikte temizlenir. |
measurements_*.stream_id -> streams.id | CASCADE | CASCADE | Typed olcum tablolari stream ile atomik yasar. |
rule_events.measurement_stream_id -> streams.id | SET NULL | CASCADE | Olay kaydi korunur, tetikleyici stream referansi bosaltilir. |
logs.stream_id -> streams.id | SET NULL | CASCADE | Log denetim izi korunur. |
Indeksler
| Indeks | Tip | Aciklama |
|---|---|---|
id | Primary Key | Tablo birincil anahtari |
(device_id, stream_time) | B-Tree | Cihazin zaman bazli stream gecmisi sorgulari |
(stream_time) | B-Tree | Partition pruning ile birlikte zaman penceresi sorgulari |
(current_stage, stage_update_time) | Partial B-Tree | Sadece takip gereken stage'ler icin operasyonel kuyruk sorgulari |
(ledger_status, stream_time) | Partial B-Tree | Ledger pending/failed durumlarinin taranmasi |
Not: current_stage ve ledger_status indeksleri tum degerlere degil, operasyonel olarak izlenen alt kume degerlere partial index olarak uygulanir.
Kolonlar
| Kolon | Tip | Null | Anlamı |
|---|---|---|---|
id | int | hayır | Accepted stream birincil anahtarı; tüm downstream referanslar buraya bağlanır |
device_id | varchar(21) | hayır | Accepted paketin cihaz kimliği |
message_type_id | int | evet | Paket tipi (timed, interrupt, alarm vb.) |
sim_id | int | evet | Paketin geldiği SIM hattı |
ip_address | varchar(45) | evet | Accepted isteğin kaynak IP bilgisi |
payload_size | int | evet | HTTP body boyutu |
process_time_ms | float | evet | Ingest'in paketi aldığı andan tüm kontrolleri tamamlayıp DB'ye yazana kadar geçen süre |
device_time | timestamp | hayır | Cihazın payload içinde bildirdiği zaman |
stream_time | timestamp | hayır | Sistemin accepted event zamanı |
raw_id | int | evet | Stream'in türetildiği raw_data kaydı; traceability için |
current_stage | stream_stage | hayır | Stream'in lineer servis akisindaki mevcut asamasi |
stage_update_time | timestamp | evet | Son stage gecisinin yazildigi zaman |
ledger_status | stream_ledger_status | hayır | Ledger yazım sonucu |
current_stage enum değerleri:
| Değer | Anlamı |
|---|---|
accepted | Stream accepted ve kaydedildi |
calibrated | Kalibrasyon basamagi tamamlandi |
raw_written | Ham/yardimci yazim basamagi tamamlandi |
synthesized | Sentez basamagi tamamlandi |
failed | Akis basamaklarindan biri hata ile sonlandi |
ledger_status enum değerleri:
| Değer | Anlamı |
|---|---|
pending | Ledger henüz denenmedi |
written | Ledger başarıyla yazıldı |
failed | Ledger yazımı başarısız oldu |
skipped | Bilinçli olarak atlandı |
Örnek Kayıtlar
- Timed Paket
- Interrupt Paket
- İkinci Timed Paket
{
"id": 1,
"device_id": "46000000C47CA670",
"message_type_id": 1,
"sim_id": 1,
"ip_address": "10.10.1.25",
"payload_size": 512,
"process_time_ms": 18.4,
"device_time": "2026-04-03T10:29:50Z",
"stream_time": "2026-04-03T10:30:00Z",
"raw_id": 1,
"current_stage": "synthesized",
"stage_update_time": "2026-04-03T10:30:07Z",
"ledger_status": "pending"
}
Özet: Timed telemetry paketi kabul zincirinden geçmiş; downstream measurement ve state kayıtları bu stream ID üzerinden bağlanır.
{
"id": 2,
"device_id": "46000000C47CA671",
"message_type_id": 2,
"sim_id": 2,
"ip_address": "10.10.1.26",
"payload_size": 420,
"process_time_ms": 12.7,
"device_time": "2026-04-03T10:24:55Z",
"stream_time": "2026-04-03T10:25:00Z",
"raw_id": 2,
"current_stage": "calibrated",
"stage_update_time": "2026-04-03T10:25:03Z",
"ledger_status": "pending"
}
Özet: Farklı cihaz ve SIM hattından gelen interrupt tipi paket accepted olmuş; process süresi timed paketten kısadır.
{
"id": 3,
"device_id": "46000000C47CA672",
"message_type_id": 3,
"sim_id": 1,
"ip_address": "10.10.1.27",
"payload_size": 300,
"process_time_ms": 9.8,
"device_time": "2026-04-03T10:19:57Z",
"stream_time": "2026-04-03T10:20:00Z",
"raw_id": 3,
"current_stage": "failed",
"stage_update_time": "2026-04-03T10:20:04Z",
"ledger_status": "failed"
}
Özet: Üçüncü cihazdan gelen paket; daha küçük payload boyutu daha kısa process süresiyle doğrudan ilişkilendirilmiş.