Envelope: Observer Servisi
Topic: qapu.observer
DLQ: qapu.dlq.observer
Producer: observer-service
Observer servisi, cihazlardan başlayan stream yaşam döngüsünü ingesta'dan rule tamamlanmasına kadar dış tarafından izler ve hata durumunda kontrollü şekilde müdahale eder. Buffer çakışması, retry tetiklemesi, kalıcı hata eskalasyonu ve performans kaydı oluşturması görevlidir.
Event Envanteri
| Event | Ne zaman üretilir? | Consumer | Amaç |
|---|---|---|---|
observer.buffer.duplicate.v1 | Aynı device_id ve stream_id için yeni ingest.accepted alındığında | Dashboard | Duplicate paketi görünürlüğü |
observer.buffer.replaced.v1 | Eski takilı buffer yeni stream ile değiştirildiğinde | Dashboard, Log | Buffer replacement görünürlüğü |
observer.retry.requested.v1 | Bir stage başarısız olup retry tetiklemesi gerektiğinde | Retry Worker, Rule | Retry orchestration |
observer.escalated.v1 | Max retry (3) aşılıp kalıcı hata ilan edildiğinde | Dashboard, AlertingSystem | Kalıcı hata eskalasyonu |
observer.completed.v1 | Pipeline başarıyla tamamlandığında (rule aşaması başarılı) | Dashboard, Analytics | Success path görünürlüğü |
observer.buffer.duplicate.v1
Özet
| Başlık | Değer |
|---|---|
| Topic | qapu.observer |
| Producer | observer-service |
| Consumer | Dashboard, Audit Log |
| Tetikleyici | ingest.accepted.v1 gelirken mevcut buffer aynı stream_id'de zaten varsa |
| Kritiklik | Düşük — tekrar paket normal bir durum |
Payload
{
"event": "observer.buffer.duplicate.v1",
"meta": {
"trace_id": "01a2b3c4d5e6f7g8h9i0",
"producer_service": "observer-service",
"produced_at": "2026-03-11T15:22:15.100Z",
"process_ms": 5
},
"context": {
"device_id": "400000011D081B70",
"raw_id": "20260311_152206_001",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:15Z",
"iccid": "8946120802005507363",
"imei": "354485417650889",
"firmware": "1.0.7"
},
"data": {
"duplicate_count": 2,
"first_ingest_at": "2026-03-11T15:22:10Z",
"current_buffer_status": "in_progress",
"current_stage": "calibration"
},
"error": null
}
Alan Açıklamaları
| Alan | Tip | Zorunlu | Açıklama |
|---|---|---|---|
data.duplicate_count | integer | Evet | Bu stream_id için kaç duplicate paketi alındığı |
data.first_ingest_at | string (ISO 8601) | Evet | İlk paket alış zamanı |
data.current_buffer_status | string | Evet | Buffer durumu: in_progress, completed, permanent_failed |
data.current_stage | string | Evet | Şu anki pipeline aşaması: ingest, stream, calibration, raw_writer, synthesis, window, rule |
observer.buffer.replaced.v1
Özet
| Başlık | Değer |
|---|---|
| Topic | qapu.observer |
| Producer | observer-service |
| Consumer | Dashboard, Audit Log |
| Tetikleyici | Yeni stream_id'li paket gelip eski buffer takilı kabul edilip silindiğinde |
| Kritiklik | Orta — takilı buffer temizleme |
Payload
{
"event": "observer.buffer.replaced.v1",
"meta": {
"trace_id": "01a2b3c4d5e6f7g8h9i1",
"producer_service": "observer-service",
"produced_at": "2026-03-11T16:30:05.200Z",
"process_ms": 8
},
"context": {
"device_id": "400000011D081B70",
"raw_id": "20260311_163005_001",
"stream_id": 9823413,
"device_time": "2026-03-11T16:30:00Z",
"stream_time": "2026-03-11T16:30:05Z",
"iccid": "8946120802005507363",
"imei": "354485417650889",
"firmware": "1.0.7"
},
"data": {
"replaced_stream_id": 9823412,
"replaced_status": "in_progress",
"replaced_stage": "calibration",
"stale_duration_sec": 425,
"stale_threshold_sec": 420
},
"error": null
}
Alan Açıklamaları
| Alan | Tip | Zorunlu | Açıklama |
|---|---|---|---|
data.replaced_stream_id | integer | Evet | Silinen eski buffer'ın stream_id'si |
data.replaced_status | string | Evet | Eski buffer durumu: in_progress vb |
data.replaced_stage | string | Evet | Eski buffer'ın takılı olduğu aşama |
data.stale_duration_sec | integer | Evet | Eski paket alındıktan sonra geçen süre (saniye) |
data.stale_threshold_sec | integer | Evet | Takilma eşiği (ayarlanabilir, tip 420s) |
observer.retry.requested.v1
Özet
| Başlık | Değer |
|---|---|
| Topic | qapu.observer |
| Producer | observer-service |
| Consumer | Retry Worker, Rule Service Input |
| Tetikleyici | Bir stage failed event ürettiğinde ve retry sayısı < 3 ise |
| Kritiklik | Yüksek — retry orchestration |
Payload
{
"event": "observer.retry.requested.v1",
"meta": {
"trace_id": "01a2b3c4d5e6f7g8h9i0",
"producer_service": "observer-service",
"produced_at": "2026-03-11T15:22:25.100Z",
"process_ms": 6
},
"context": {
"device_id": "400000011D081B70",
"raw_id": "20260311_152206_001",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z",
"iccid": "8946120802005507363",
"imei": "354485417650889",
"firmware": "1.0.7"
},
"data": {
"retry_stage": "calibration",
"retry_count": 1,
"max_retries": 3,
"backoff_wait_ms": 5000,
"failed_error_code": "CALIB_DB_TIMEOUT",
"scheduled_retry_at": "2026-03-11T15:22:30Z"
},
"error": null
}
Alan Açıklamaları
| Alan | Tip | Zorunlu | Açıklama |
|---|---|---|---|
data.retry_stage | string | Evet | Tekrar denencek aşama |
data.retry_count | integer | Evet | Şu anki retry sayısı (1, 2 veya 3) |
data.max_retries | integer | Evet | Maksimum retry limit (hardcoded 3) |
data.backoff_wait_ms | integer | Evet | Retry öncesi bekleme süresi (ms) — exponential backoff |
data.failed_error_code | string | Evet | Hataya neden olan error_code |
data.scheduled_retry_at | string (ISO 8601) | Evet | Planlanan retry zamanı |
observer.escalated.v1
Özet
| Başlık | Değer |
|---|---|
| Topic | qapu.observer |
| Producer | observer-service |
| Consumer | AlertingSystem, Dashboard, Audit Log |
| Tetikleyici | Max retry (3) aşılıp hata kalıcı kabul edildiğinde |
| Kritiklik | Yüksek — kalıcı hata eskalasyonu |
Payload
{
"event": "observer.escalated.v1",
"meta": {
"trace_id": "01a2b3c4d5e6f7g8h9i0",
"producer_service": "observer-service",
"produced_at": "2026-03-11T15:23:10.200Z",
"process_ms": 7
},
"context": {
"device_id": "400000011D081B70",
"raw_id": "20260311_152206_001",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z",
"iccid": "8946120802005507363",
"imei": "354485417650889",
"firmware": "1.0.7"
},
"data": {
"failed_stage": "calibration",
"failure_reason": "CALIB_DB_TIMEOUT — connection persistent",
"total_retries_attempted": 3,
"first_failed_at": "2026-03-11T15:22:15Z",
"last_failed_at": "2026-03-11T15:23:10Z",
"escalation_action": "mark_permanent_failed"
},
"error": {
"failed_stage": "calibration",
"error_code": "CALIB_DB_WRITE_FAIL",
"error_message": "Database connection timeout after 3 retries",
"retryable": false,
"failed_at": "2026-03-11T15:23:10Z"
}
}
Alan Açıklamaları
| Alan | Tip | Zorunlu | Açıklama |
|---|---|---|---|
data.failed_stage | string | Evet | Kalıcı hataya sebep olan aşama |
data.failure_reason | string | Evet | Hata sebebi özeti |
data.total_retries_attempted | integer | Evet | Kaç retry yapıldığı |
data.first_failed_at | string (ISO 8601) | Evet | İlk hata zamanı |
data.last_failed_at | string (ISO 8601) | Evet | Son retry hata zamanı |
data.escalation_action | string | Evet | İşlem: mark_permanent_failed |
observer.completed.v1
Özet
| Başlık | Değer |
|---|---|
| Topic | qapu.observer |
| Producer | observer-service |
| Consumer | Dashboard, Analytics, Performance Logging |
| Tetikleyici | Rule aşaması başarıyla tamamlandığında (veya no-trigger başarı) |
| Kritiklik | Düşük — başarı path görünürlüğü |
Payload
{
"event": "observer.completed.v1",
"meta": {
"trace_id": "01a2b3c4d5e6f7g8h9i0",
"producer_service": "observer-service",
"produced_at": "2026-03-11T15:23:45.300Z",
"process_ms": 9
},
"context": {
"device_id": "400000011D081B70",
"raw_id": "20260311_152206_001",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z",
"iccid": "8946120802005507363",
"imei": "354485417650889",
"firmware": "1.0.7"
},
"data": {
"pipeline_status": "completed",
"completion_reason": "rule_success",
"total_duration_ms": 95000,
"stage_durations_ms": {
"ingest": 100,
"stream": 150,
"calibration": 2000,
"raw_writer": 3000,
"synthesis": 15000,
"window": 8000,
"rule": 67000
},
"total_retries": 0,
"triggered_rule_count": 1
},
"error": null
}
Alan Açıklamaları
| Alan | Tip | Zorunlu | Açıklama |
|---|---|---|---|
data.pipeline_status | string | Evet | completed veya no_trigger_success |
data.completion_reason | string | Evet | Tamamlama sebebi |
data.total_duration_ms | integer | Evet | Ingesta'dan rule'e kadar toplam süre (ms) |
data.stage_durations_ms | object | Evet | Her aşamanın ayrı ayrı işlem süresi |
data.total_retries | integer | Evet | Tüm pipeline'da yapılan toplam retry sayısı |
data.triggered_rule_count | integer | Evet | Tetiklenen kural sayısı |
Hata Modeli
Observer başarısız işlemelerde error alanı doldurulur; örneğin escalated event'te:
"error": {
"failed_stage": "calibration",
"error_code": "CALIB_DB_WRITE_FAIL",
"error_message": "Database connection timeout after 3 retries",
"retryable": false,
"failed_at": "2026-03-11T15:23:10Z"
}
Consumer Stratejisi
Observer eventleri yalnızca cg.qapu.observer consumer group'u tarafından tüketilir. İşlem akışında:
- Duplicate/replaced buffer eventleri → Audit + Dashboard
- Retry requested → Retry Worker Queue'ya gönderilir
- Escalated → Alert System
- Completed → Performance Analytics
Detaylı bilgi için bkz. Observer Servisi Index