Skip to main content

Envelope: Observer Servisi

Topic: qapu.observer
DLQ: qapu.dlq.observer
Producer: observer-service

Observer servisi, cihazlardan başlayan stream yaşam döngüsünü ingesta'dan rule tamamlanmasına kadar dış tarafından izler ve hata durumunda kontrollü şekilde müdahale eder. Buffer çakışması, retry tetiklemesi, kalıcı hata eskalasyonu ve performans kaydı oluşturması görevlidir.

Event Envanteri

EventNe zaman üretilir?ConsumerAmaç
observer.buffer.duplicate.v1Aynı device_id ve stream_id için yeni ingest.accepted alındığındaDashboardDuplicate paketi görünürlüğü
observer.buffer.replaced.v1Eski takilı buffer yeni stream ile değiştirildiğindeDashboard, LogBuffer replacement görünürlüğü
observer.retry.requested.v1Bir stage başarısız olup retry tetiklemesi gerektiğindeRetry Worker, RuleRetry orchestration
observer.escalated.v1Max retry (3) aşılıp kalıcı hata ilan edildiğindeDashboard, AlertingSystemKalıcı hata eskalasyonu
observer.completed.v1Pipeline başarıyla tamamlandığında (rule aşaması başarılı)Dashboard, AnalyticsSuccess path görünürlüğü

observer.buffer.duplicate.v1

Özet

BaşlıkDeğer
Topicqapu.observer
Producerobserver-service
ConsumerDashboard, Audit Log
Tetikleyiciingest.accepted.v1 gelirken mevcut buffer aynı stream_id'de zaten varsa
KritiklikDüşük — tekrar paket normal bir durum

Payload

{
"event": "observer.buffer.duplicate.v1",
"meta": {
"trace_id": "01a2b3c4d5e6f7g8h9i0",
"producer_service": "observer-service",
"produced_at": "2026-03-11T15:22:15.100Z",
"process_ms": 5
},
"context": {
"device_id": "400000011D081B70",
"raw_id": "20260311_152206_001",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:15Z",
"iccid": "8946120802005507363",
"imei": "354485417650889",
"firmware": "1.0.7"
},
"data": {
"duplicate_count": 2,
"first_ingest_at": "2026-03-11T15:22:10Z",
"current_buffer_status": "in_progress",
"current_stage": "calibration"
},
"error": null
}

Alan Açıklamaları

AlanTipZorunluAçıklama
data.duplicate_countintegerEvetBu stream_id için kaç duplicate paketi alındığı
data.first_ingest_atstring (ISO 8601)Evetİlk paket alış zamanı
data.current_buffer_statusstringEvetBuffer durumu: in_progress, completed, permanent_failed
data.current_stagestringEvetŞu anki pipeline aşaması: ingest, stream, calibration, raw_writer, synthesis, window, rule

observer.buffer.replaced.v1

Özet

BaşlıkDeğer
Topicqapu.observer
Producerobserver-service
ConsumerDashboard, Audit Log
TetikleyiciYeni stream_id'li paket gelip eski buffer takilı kabul edilip silindiğinde
KritiklikOrta — takilı buffer temizleme

Payload

{
"event": "observer.buffer.replaced.v1",
"meta": {
"trace_id": "01a2b3c4d5e6f7g8h9i1",
"producer_service": "observer-service",
"produced_at": "2026-03-11T16:30:05.200Z",
"process_ms": 8
},
"context": {
"device_id": "400000011D081B70",
"raw_id": "20260311_163005_001",
"stream_id": 9823413,
"device_time": "2026-03-11T16:30:00Z",
"stream_time": "2026-03-11T16:30:05Z",
"iccid": "8946120802005507363",
"imei": "354485417650889",
"firmware": "1.0.7"
},
"data": {
"replaced_stream_id": 9823412,
"replaced_status": "in_progress",
"replaced_stage": "calibration",
"stale_duration_sec": 425,
"stale_threshold_sec": 420
},
"error": null
}

Alan Açıklamaları

AlanTipZorunluAçıklama
data.replaced_stream_idintegerEvetSilinen eski buffer'ın stream_id'si
data.replaced_statusstringEvetEski buffer durumu: in_progress vb
data.replaced_stagestringEvetEski buffer'ın takılı olduğu aşama
data.stale_duration_secintegerEvetEski paket alındıktan sonra geçen süre (saniye)
data.stale_threshold_secintegerEvetTakilma eşiği (ayarlanabilir, tip 420s)

observer.retry.requested.v1

Özet

BaşlıkDeğer
Topicqapu.observer
Producerobserver-service
ConsumerRetry Worker, Rule Service Input
TetikleyiciBir stage failed event ürettiğinde ve retry sayısı < 3 ise
KritiklikYüksek — retry orchestration

Payload

{
"event": "observer.retry.requested.v1",
"meta": {
"trace_id": "01a2b3c4d5e6f7g8h9i0",
"producer_service": "observer-service",
"produced_at": "2026-03-11T15:22:25.100Z",
"process_ms": 6
},
"context": {
"device_id": "400000011D081B70",
"raw_id": "20260311_152206_001",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z",
"iccid": "8946120802005507363",
"imei": "354485417650889",
"firmware": "1.0.7"
},
"data": {
"retry_stage": "calibration",
"retry_count": 1,
"max_retries": 3,
"backoff_wait_ms": 5000,
"failed_error_code": "CALIB_DB_TIMEOUT",
"scheduled_retry_at": "2026-03-11T15:22:30Z"
},
"error": null
}

Alan Açıklamaları

AlanTipZorunluAçıklama
data.retry_stagestringEvetTekrar denencek aşama
data.retry_countintegerEvetŞu anki retry sayısı (1, 2 veya 3)
data.max_retriesintegerEvetMaksimum retry limit (hardcoded 3)
data.backoff_wait_msintegerEvetRetry öncesi bekleme süresi (ms) — exponential backoff
data.failed_error_codestringEvetHataya neden olan error_code
data.scheduled_retry_atstring (ISO 8601)EvetPlanlanan retry zamanı

observer.escalated.v1

Özet

BaşlıkDeğer
Topicqapu.observer
Producerobserver-service
ConsumerAlertingSystem, Dashboard, Audit Log
TetikleyiciMax retry (3) aşılıp hata kalıcı kabul edildiğinde
KritiklikYüksek — kalıcı hata eskalasyonu

Payload

{
"event": "observer.escalated.v1",
"meta": {
"trace_id": "01a2b3c4d5e6f7g8h9i0",
"producer_service": "observer-service",
"produced_at": "2026-03-11T15:23:10.200Z",
"process_ms": 7
},
"context": {
"device_id": "400000011D081B70",
"raw_id": "20260311_152206_001",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z",
"iccid": "8946120802005507363",
"imei": "354485417650889",
"firmware": "1.0.7"
},
"data": {
"failed_stage": "calibration",
"failure_reason": "CALIB_DB_TIMEOUT — connection persistent",
"total_retries_attempted": 3,
"first_failed_at": "2026-03-11T15:22:15Z",
"last_failed_at": "2026-03-11T15:23:10Z",
"escalation_action": "mark_permanent_failed"
},
"error": {
"failed_stage": "calibration",
"error_code": "CALIB_DB_WRITE_FAIL",
"error_message": "Database connection timeout after 3 retries",
"retryable": false,
"failed_at": "2026-03-11T15:23:10Z"
}
}

Alan Açıklamaları

AlanTipZorunluAçıklama
data.failed_stagestringEvetKalıcı hataya sebep olan aşama
data.failure_reasonstringEvetHata sebebi özeti
data.total_retries_attemptedintegerEvetKaç retry yapıldığı
data.first_failed_atstring (ISO 8601)Evetİlk hata zamanı
data.last_failed_atstring (ISO 8601)EvetSon retry hata zamanı
data.escalation_actionstringEvetİşlem: mark_permanent_failed

observer.completed.v1

Özet

BaşlıkDeğer
Topicqapu.observer
Producerobserver-service
ConsumerDashboard, Analytics, Performance Logging
TetikleyiciRule aşaması başarıyla tamamlandığında (veya no-trigger başarı)
KritiklikDüşük — başarı path görünürlüğü

Payload

{
"event": "observer.completed.v1",
"meta": {
"trace_id": "01a2b3c4d5e6f7g8h9i0",
"producer_service": "observer-service",
"produced_at": "2026-03-11T15:23:45.300Z",
"process_ms": 9
},
"context": {
"device_id": "400000011D081B70",
"raw_id": "20260311_152206_001",
"stream_id": 9823412,
"device_time": "2026-03-11T15:22:06Z",
"stream_time": "2026-03-11T15:22:10Z",
"iccid": "8946120802005507363",
"imei": "354485417650889",
"firmware": "1.0.7"
},
"data": {
"pipeline_status": "completed",
"completion_reason": "rule_success",
"total_duration_ms": 95000,
"stage_durations_ms": {
"ingest": 100,
"stream": 150,
"calibration": 2000,
"raw_writer": 3000,
"synthesis": 15000,
"window": 8000,
"rule": 67000
},
"total_retries": 0,
"triggered_rule_count": 1
},
"error": null
}

Alan Açıklamaları

AlanTipZorunluAçıklama
data.pipeline_statusstringEvetcompleted veya no_trigger_success
data.completion_reasonstringEvetTamamlama sebebi
data.total_duration_msintegerEvetIngesta'dan rule'e kadar toplam süre (ms)
data.stage_durations_msobjectEvetHer aşamanın ayrı ayrı işlem süresi
data.total_retriesintegerEvetTüm pipeline'da yapılan toplam retry sayısı
data.triggered_rule_countintegerEvetTetiklenen kural sayısı

Hata Modeli

Observer başarısız işlemelerde error alanı doldurulur; örneğin escalated event'te:

"error": {
"failed_stage": "calibration",
"error_code": "CALIB_DB_WRITE_FAIL",
"error_message": "Database connection timeout after 3 retries",
"retryable": false,
"failed_at": "2026-03-11T15:23:10Z"
}

Consumer Stratejisi

Observer eventleri yalnızca cg.qapu.observer consumer group'u tarafından tüketilir. İşlem akışında:

  • Duplicate/replaced buffer eventleri → Audit + Dashboard
  • Retry requested → Retry Worker Queue'ya gönderilir
  • Escalated → Alert System
  • Completed → Performance Analytics

Detaylı bilgi için bkz. Observer Servisi Index