vector_core/
latency.rs

1use std::time::Instant;
2
3use metrics::Histogram;
4use vector_common::stats::EwmaGauge;
5use vector_common::{
6    gauge, histogram,
7    internal_event::{GaugeName, HistogramName},
8};
9
10use crate::event::EventArray;
11
12const DEFAULT_LATENCY_EWMA_ALPHA: f64 = 0.9;
13
14#[derive(Debug)]
15pub struct LatencyRecorder {
16    histogram: Histogram,
17    gauge: EwmaGauge,
18}
19
20impl LatencyRecorder {
21    pub fn new(ewma_alpha: Option<f64>) -> Self {
22        Self {
23            histogram: histogram!(HistogramName::ComponentLatencySeconds),
24            gauge: EwmaGauge::new(
25                gauge!(GaugeName::ComponentLatencyMeanSeconds),
26                ewma_alpha.or(Some(DEFAULT_LATENCY_EWMA_ALPHA)),
27            ),
28        }
29    }
30
31    pub fn on_send(&self, events: &mut EventArray, now: Instant) {
32        let mut sum = 0.0;
33        let mut count = 0usize;
34
35        // Since all of the events in the array will most likely have entered and exited the
36        // component at close to the same time, we average all the latencies over the entire array
37        // and record it just once in the EWMA-backed gauge. If we were to record each latency
38        // individually, the gauge would effectively just reflect the latest array's latency,
39        // eliminating the utility of the EWMA averaging. However, we record the individual
40        // latencies in the histogram to get a more granular view of the latency distribution.
41        for mut event in events.iter_events_mut() {
42            let metadata = event.metadata_mut();
43            if let Some(previous) = metadata.last_transform_timestamp() {
44                let latency = now.saturating_duration_since(previous).as_secs_f64();
45                sum += latency;
46                count += 1;
47                self.histogram.record(latency);
48            }
49
50            metadata.set_last_transform_timestamp(now);
51        }
52        if count > 0 {
53            #[expect(
54                clippy::cast_precision_loss,
55                reason = "losing precision is acceptable here"
56            )]
57            let mean = sum / count as f64;
58            self.gauge.record(mean);
59        }
60    }
61}