Rule Service - Veri Katmanı Yazım Sırası
Okuma Yönü (Input Data Flow)
Load Order:
- Context: device_id, stream_id
- Rule Definitions: Redis/DB (rule_groups, rules)
- Device Assignments: Redis/DB (device_rule_assignments)
- Current State: Redis/DB (device_rule_state)
- Register History: Redis (for transitions)
- Input Data: device_buffer (window event içinde)
- Publish Setting: measurement_register (publish bit)
Yazma Yönü (Output Data Flow & Sequence)
State Write Sequence (Detailed)
Step 1: device_rule_state Update
-- First: Update persistent state
UPDATE device_rule_state
SET
is_triggered = true,
trigger_count = trigger_count + 1,
last_trigger_time = NOW(),
updated_at = NOW()
WHERE
device_id = ?
AND rule_group_id = ?
Razones:
- Düşük latency (single row update)
- Atomicity guaranteed
- Audit trail temelidir
Timing: ~20-50ms
Step 2: rule_events Insert (Append-Only)
-- Second: Append to audit log
INSERT INTO rule_events (
device_id,
rule_group_id,
rule_id,
event_type,
trigger_count,
severity,
details,
created_at
)
VALUES (
?, -- device_id
?, -- group_id
?, -- rule_id (nullable)
'triggered', -- event_type
?, -- trigger_count from device_rule_state
'high', -- severity
'{"reason": "VRMS_R > 250 for 600s", ...}'::jsonb,
NOW()
)
Razones:
- Immutable history
- Queries için kolay filtreling
- Time-series analytics
Timing: ~15-40ms
Step 3: Redis State Cache
// Third: Update cache for fast lookups
const key = `state:${device_id}:${group_id}`;
const ttl = 24 * 3600; // 24 hours
redis.set(key, JSON.stringify({
is_triggered: true,
trigger_count: 58,
last_trigger_time: now(),
last_reset_time: previousReset,
duration_counters: {
rule_114: 615, // 615 seconds of duration
rule_115: 300
},
register_history: {
device_status: "0x0101" // previous value
}
}), {
EX: ttl
});
Razones:
- Next window'da cache hit
- Latency < 5ms
- Expiration otomatik (stale data önler)
Timing: ~5-10ms
Step 4: Duration Counter Cache (Separate)
// For duration-based rules, maintain counters separately
const durationKey = `rule:${rule_id}:${device_id}:duration_counter`;
if (condition_true) {
redis.incrby(durationKey, window_interval); // +50s
redis.expire(durationKey, 3600); // Expire in 1h (or reset on condition false)
} else {
redis.del(durationKey); // Reset counter
}
Razones:
- Incremental updates (atomic)
- Fast lookups (single key)
- Auto-reset on condition false
Step 5: Register History Cache (Transitions)
// For register transition detection
const historyKey = `register:${device_id}:device_status`;
const previousValue = redis.get(historyKey);
const currentValue = deviceBuffer.status_register;
if (previousValue && hasBitTransition(previousValue, currentValue)) {
// Transition detected, rule evaluates to true
redis.set(historyKey, currentValue); // Update for next window
} else {
redis.set(historyKey, currentValue); // Update anyway
}
Razones:
- Transition detection needs history
- Only store last value (memory efficient)
- Auto-reset on window interval
Write Consistency Model
Scenario: System Crash After DB Write
T1: UPDATE device_rule_state (✓ written to disk)
T2: INSERT rule_events (✓ written to disk)
T3: CRASH ← Before Redis update
T4: System restart
T5: Next evaluation
Recovery:
- Redis state missing
- DB state correct
- Fallback on next eval: DB → Cache = OK
- Risk: One window of slow queries (cache miss)
Scenario: Redis Write Succeeds, DB Fails
T1: UPDATE device_rule_state (✗ disc full)
T2: INSERT rule_events (✗ fails)
T3: CRASH
T4: System restart
T5: Next evaluation
Recovery:
- Redis state: has stale/wrong data
- DB state: missing
- Inconsistency! Redis newer than DB
Prevention:
- Transactional writes: DB first, then Redis
- If Redis write fails: log and continue (accept cache miss)
- If DB write fails: don't update Redis
Recommended Sequence:
- DB write (transactional)
- Wait for sync to disk
- Redis write (async okay)
- Emit event
Duration Counter Lifecycle
Publish Bit Sync Mechanism
Source of Truth: measurement_register.publish (PostgreSQL)
Runtime Cache: settings_buffer (Redis) — optional optimization
Batch Processing Implications
High-Load Scenario
Device count: 1000
Rules per device: avg 5
Window interval: 50ms
Per window:
├─ Events processed: 1000
├─ Rules evaluated: 5000
├─ Writes (avg 10% trigger rate): 500 device_rule_state updates
│ 500 rule_events inserts
│ 500 redis cache updates
└─ Total latency budget: < 50ms
Optimization:
├─ Batch inserts (rule_events)
├─ Pipelined Redis writes
├─ Connection pooling (DB)
└─ Async event emission
Data Validation Before Write
// Safety checks before persistence
// 1. State Consistency
if (!previousState && triggeredNow) {
// Normal: false → true transition
} else if (previousState && !triggeredNow) {
// Normal: true → false transition
} else if (previousState && triggeredNow && !multiTrigger) {
// Skip: already triggered without multi_trigger
return SKIP;
}
// 2. Trigger Count Sanity
if (newTriggerCount <= previousTriggerCount) {
// ERROR: should never decrement
throw new Error("Invalid trigger count");
}
// 3. Timestamp Ordering
if (newTimestamp < previousTimestamp) {
// ERROR: time moved backward
throw new Error("Timestamp regression");
}
// 4. Rule Hash Validation
if (currentRuleHash !== previousRuleHash) {
// WARNING: rule definition changed, reset state
resetDeviceRuleState(device_id, group_id);
}
Monitoring Points
Latency Tracking
Metric: rule_service_write_latency_ms
Breakdowns:
├─ db_device_rule_state_ms: ~30ms (1 row update)
├─ db_rule_events_insert_ms: ~20ms (1 row insert)
├─ redis_state_cache_ms: ~8ms (single SET)
├─ redis_counter_cache_ms: ~5ms (incrby)
├─ event_emission_ms: ~2ms (kafka produce)
└─ total_p95: < 200ms (SLA)
Write Volume
Metric: rule_service_writes_per_second
Breakdowns:
├─ device_rule_state: N devices × P(trigger) / window_interval
├─ rule_events: same as above
├─ redis_operations: 2-3x higher (counters, history)
└─ estimate: 1000 devices × 10% trigger rate / 0.05s window = 2000 writes/sec