Ana içeriğe geç

← Rule Service Ana Sayfası

Rule Service - Veri Katmanı Yazım Sırası

Okuma Yönü (Input Data Flow)

Load Order:

  1. Context: device_id, stream_id
  2. Rule Definitions: Redis/DB (rule_groups, rules)
  3. Device Assignments: Redis/DB (device_rule_assignments)
  4. Current State: Redis/DB (device_rule_state)
  5. Register History: Redis (for transitions)
  6. Input Data: device_buffer (window event içinde)
  7. Publish Setting: measurement_register (publish bit)

Yazma Yönü (Output Data Flow & Sequence)


State Write Sequence (Detailed)

Step 1: device_rule_state Update

-- First: Update persistent state
UPDATE device_rule_state
SET
is_triggered = true,
trigger_count = trigger_count + 1,
last_trigger_time = NOW(),
updated_at = NOW()
WHERE
device_id = ?
AND rule_group_id = ?

Razones:

  • Düşük latency (single row update)
  • Atomicity guaranteed
  • Audit trail temelidir

Timing: ~20-50ms


Step 2: rule_events Insert (Append-Only)

-- Second: Append to audit log
INSERT INTO rule_events (
device_id,
rule_group_id,
rule_id,
event_type,
trigger_count,
severity,
details,
created_at
)
VALUES (
?, -- device_id
?, -- group_id
?, -- rule_id (nullable)
'triggered', -- event_type
?, -- trigger_count from device_rule_state
'high', -- severity
'{"reason": "VRMS_R > 250 for 600s", ...}'::jsonb,
NOW()
)

Razones:

  • Immutable history
  • Queries için kolay filtreling
  • Time-series analytics

Timing: ~15-40ms


Step 3: Redis State Cache

// Third: Update cache for fast lookups
const key = `state:${device_id}:${group_id}`;
const ttl = 24 * 3600; // 24 hours

redis.set(key, JSON.stringify({
is_triggered: true,
trigger_count: 58,
last_trigger_time: now(),
last_reset_time: previousReset,
duration_counters: {
rule_114: 615, // 615 seconds of duration
rule_115: 300
},
register_history: {
device_status: "0x0101" // previous value
}
}), {
EX: ttl
});

Razones:

  • Next window'da cache hit
  • Latency < 5ms
  • Expiration otomatik (stale data önler)

Timing: ~5-10ms


Step 4: Duration Counter Cache (Separate)

// For duration-based rules, maintain counters separately
const durationKey = `rule:${rule_id}:${device_id}:duration_counter`;

if (condition_true) {
redis.incrby(durationKey, window_interval); // +50s
redis.expire(durationKey, 3600); // Expire in 1h (or reset on condition false)
} else {
redis.del(durationKey); // Reset counter
}

Razones:

  • Incremental updates (atomic)
  • Fast lookups (single key)
  • Auto-reset on condition false

Step 5: Register History Cache (Transitions)

// For register transition detection
const historyKey = `register:${device_id}:device_status`;
const previousValue = redis.get(historyKey);
const currentValue = deviceBuffer.status_register;

if (previousValue && hasBitTransition(previousValue, currentValue)) {
// Transition detected, rule evaluates to true
redis.set(historyKey, currentValue); // Update for next window
} else {
redis.set(historyKey, currentValue); // Update anyway
}

Razones:

  • Transition detection needs history
  • Only store last value (memory efficient)
  • Auto-reset on window interval

Write Consistency Model

Scenario: System Crash After DB Write

T1: UPDATE device_rule_state (✓ written to disk)
T2: INSERT rule_events (✓ written to disk)
T3: CRASH ← Before Redis update
T4: System restart
T5: Next evaluation

Recovery:

  • Redis state missing
  • DB state correct
  • Fallback on next eval: DB → Cache = OK
  • Risk: One window of slow queries (cache miss)

Scenario: Redis Write Succeeds, DB Fails

T1: UPDATE device_rule_state (✗ disc full)
T2: INSERT rule_events (✗ fails)
T3: CRASH
T4: System restart
T5: Next evaluation

Recovery:

  • Redis state: has stale/wrong data
  • DB state: missing
  • Inconsistency! Redis newer than DB

Prevention:

  • Transactional writes: DB first, then Redis
  • If Redis write fails: log and continue (accept cache miss)
  • If DB write fails: don't update Redis

Recommended Sequence:

  1. DB write (transactional)
  2. Wait for sync to disk
  3. Redis write (async okay)
  4. Emit event

Duration Counter Lifecycle


Publish Bit Sync Mechanism

Source of Truth: measurement_register.publish (PostgreSQL)

Runtime Cache: settings_buffer (Redis) — optional optimization


Batch Processing Implications

High-Load Scenario

Device count: 1000
Rules per device: avg 5
Window interval: 50ms

Per window:
├─ Events processed: 1000
├─ Rules evaluated: 5000
├─ Writes (avg 10% trigger rate): 500 device_rule_state updates
│ 500 rule_events inserts
│ 500 redis cache updates
└─ Total latency budget: < 50ms

Optimization:
├─ Batch inserts (rule_events)
├─ Pipelined Redis writes
├─ Connection pooling (DB)
└─ Async event emission

Data Validation Before Write

// Safety checks before persistence

// 1. State Consistency
if (!previousState && triggeredNow) {
// Normal: false → true transition
} else if (previousState && !triggeredNow) {
// Normal: true → false transition
} else if (previousState && triggeredNow && !multiTrigger) {
// Skip: already triggered without multi_trigger
return SKIP;
}

// 2. Trigger Count Sanity
if (newTriggerCount <= previousTriggerCount) {
// ERROR: should never decrement
throw new Error("Invalid trigger count");
}

// 3. Timestamp Ordering
if (newTimestamp < previousTimestamp) {
// ERROR: time moved backward
throw new Error("Timestamp regression");
}

// 4. Rule Hash Validation
if (currentRuleHash !== previousRuleHash) {
// WARNING: rule definition changed, reset state
resetDeviceRuleState(device_id, group_id);
}

Monitoring Points

Latency Tracking

Metric: rule_service_write_latency_ms

Breakdowns:
├─ db_device_rule_state_ms: ~30ms (1 row update)
├─ db_rule_events_insert_ms: ~20ms (1 row insert)
├─ redis_state_cache_ms: ~8ms (single SET)
├─ redis_counter_cache_ms: ~5ms (incrby)
├─ event_emission_ms: ~2ms (kafka produce)
└─ total_p95: < 200ms (SLA)

Write Volume

Metric: rule_service_writes_per_second

Breakdowns:
├─ device_rule_state: N devices × P(trigger) / window_interval
├─ rule_events: same as above
├─ redis_operations: 2-3x higher (counters, history)
└─ estimate: 1000 devices × 10% trigger rate / 0.05s window = 2000 writes/sec