vector_core/metrics/
mod.rs

1mod ddsketch;
2mod label_filter;
3mod metric_matcher;
4mod recency;
5mod recorder;
6mod storage;
7
8use std::{sync::OnceLock, time::Duration};
9
10use chrono::Utc;
11use metric_matcher::MetricKeyMatcher;
12use metrics::Key;
13use metrics_tracing_context::TracingContextLayer;
14use metrics_util::layers::Layer;
15use snafu::Snafu;
16
17pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
18use self::{
19    label_filter::VectorLabelFilter,
20    recorder::{Registry, VectorRecorder},
21};
22use crate::{
23    config::metrics_expiration::PerMetricSetExpiration,
24    event::{Metric, MetricValue},
25};
26
27type Result<T> = std::result::Result<T, Error>;
28
29#[derive(Clone, Debug, PartialEq, Snafu)]
30pub enum Error {
31    #[snafu(display("Recorder already initialized."))]
32    AlreadyInitialized,
33    #[snafu(display("Metrics system was not initialized."))]
34    NotInitialized,
35    #[snafu(display("Timeout value of {} must be positive.", timeout))]
36    TimeoutMustBePositive { timeout: f64 },
37    #[snafu(display("Invalid regex pattern: {}.", pattern))]
38    InvalidRegexPattern { pattern: String },
39}
40
41static CONTROLLER: OnceLock<Controller> = OnceLock::new();
42
43// Cardinality counter parameters, expose the internal metrics registry
44// cardinality. Useful for the end users to help understand the characteristics
45// of their environment and how vectors acts in it.
46const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
47static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
48
49// Older deprecated counter key name
50const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
51static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
52
53/// Controller allows capturing metric snapshots.
54pub struct Controller {
55    recorder: VectorRecorder,
56}
57
58fn metrics_enabled() -> bool {
59    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
60}
61
62fn tracing_context_layer_enabled() -> bool {
63    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
64}
65
66fn init(recorder: VectorRecorder) -> Result<()> {
67    // An escape hatch to allow disabling internal metrics core. May be used for
68    // performance reasons. This is a hidden and undocumented functionality.
69    if !metrics_enabled() {
70        metrics::set_global_recorder(metrics::NoopRecorder)
71            .map_err(|_| Error::AlreadyInitialized)?;
72        info!(message = "Internal metrics core is disabled.");
73        return Ok(());
74    }
75
76    ////
77    //// Initialize the recorder.
78    ////
79
80    // The recorder is the interface between metrics-rs and our registry. In our
81    // case it doesn't _do_ much other than shepherd into the registry and
82    // update the cardinality counter, see above, as needed.
83    if tracing_context_layer_enabled() {
84        // Apply a layer to capture tracing span fields as labels.
85        metrics::set_global_recorder(
86            TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
87        )
88        .map_err(|_| Error::AlreadyInitialized)?;
89    } else {
90        metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
91    }
92
93    ////
94    //// Prepare the controller
95    ////
96
97    // The `Controller` is a safe spot in memory for us to stash a clone of the registry -- where
98    // metrics are actually kept -- so that our sub-systems interested in these metrics can grab
99    // copies. See `capture_metrics` and its callers for an example. Note that this is done last to
100    // allow `init_test` below to use the initialization state of `CONTROLLER` to wait for the above
101    // steps to complete in another thread.
102    let controller = Controller { recorder };
103    CONTROLLER
104        .set(controller)
105        .map_err(|_| Error::AlreadyInitialized)?;
106
107    Ok(())
108}
109
110/// Initialize the default metrics sub-system
111///
112/// # Errors
113///
114/// This function will error if it is called multiple times.
115pub fn init_global() -> Result<()> {
116    init(VectorRecorder::new_global())
117}
118
119/// Initialize the thread-local metrics sub-system. This function will loop until a recorder is
120/// actually set.
121pub fn init_test() {
122    if init(VectorRecorder::new_test()).is_err() {
123        // The only error case returned by `init` is `AlreadyInitialized`. A race condition is
124        // possible here: if metrics are being initialized by two (or more) test threads
125        // simultaneously, the ones that fail to set return immediately, possibly allowing
126        // subsequent code to execute before the static recorder value is actually set within the
127        // `metrics` crate. To prevent subsequent code from running with an unset recorder, loop
128        // here until a recorder is available.
129        while CONTROLLER.get().is_none() {}
130    }
131}
132
133impl Controller {
134    /// Clear all metrics from the registry.
135    pub fn reset(&self) {
136        self.recorder.with_registry(Registry::clear);
137    }
138
139    /// Get a handle to the globally registered controller, if it's initialized.
140    ///
141    /// # Errors
142    ///
143    /// This function will fail if the metrics subsystem has not been correctly
144    /// initialized.
145    pub fn get() -> Result<&'static Self> {
146        CONTROLLER.get().ok_or(Error::NotInitialized)
147    }
148
149    /// Set or clear the expiry time after which idle metrics are dropped from the set of captured
150    /// metrics. Invalid timeouts (zero or negative values) are silently remapped to no expiry.
151    ///
152    /// # Errors
153    ///
154    /// The contained timeout value must be positive.
155    pub fn set_expiry(
156        &self,
157        global_timeout: Option<f64>,
158        expire_metrics_per_metric_set: Vec<PerMetricSetExpiration>,
159    ) -> Result<()> {
160        if let Some(timeout) = global_timeout
161            && timeout <= 0.0
162        {
163            return Err(Error::TimeoutMustBePositive { timeout });
164        }
165        let per_metric_expiration = expire_metrics_per_metric_set
166            .into_iter()
167            .map(TryInto::try_into)
168            .collect::<Result<Vec<(MetricKeyMatcher, Duration)>>>()?;
169
170        self.recorder.with_registry(|registry| {
171            registry.set_expiry(
172                global_timeout.map(Duration::from_secs_f64),
173                per_metric_expiration,
174            );
175        });
176        Ok(())
177    }
178
179    /// Take a snapshot of all gathered metrics and expose them as metric
180    /// [`Event`](crate::event::Event)s.
181    pub fn capture_metrics(&self) -> Vec<Metric> {
182        let timestamp = Utc::now();
183
184        let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
185
186        #[allow(clippy::cast_precision_loss)]
187        let value = (metrics.len() + 2) as f64;
188        metrics.push(Metric::from_metric_kv(
189            &CARDINALITY_KEY,
190            MetricValue::Gauge { value },
191            timestamp,
192        ));
193        metrics.push(Metric::from_metric_kv(
194            &CARDINALITY_COUNTER_KEY,
195            MetricValue::Counter { value },
196            timestamp,
197        ));
198
199        metrics
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use strum::IntoEnumIterator;
206    use vector_common::{
207        counter, gauge,
208        internal_event::{CounterName, GaugeName},
209    };
210
211    use super::*;
212    use crate::{
213        config::metrics_expiration::{
214            MetricLabelMatcher, MetricLabelMatcherConfig, MetricNameMatcherConfig,
215        },
216        event::MetricKind,
217    };
218
219    const IDLE_TIMEOUT: f64 = 0.5;
220
221    fn init_metrics() -> &'static Controller {
222        init_test();
223        Controller::get().expect("Could not get global metrics controller")
224    }
225
226    #[test]
227    fn cardinality_matches() {
228        for cardinality in [0, 1, 10, 100, 1000, 10000] {
229            init_test();
230            let controller = Controller::get().unwrap();
231            controller.reset();
232
233            let name = CounterName::iter().next().unwrap();
234            for idx in 0..cardinality {
235                counter!(name, "idx" => idx.to_string()).increment(1);
236            }
237
238            let metrics = controller.capture_metrics();
239            assert_eq!(metrics.len(), cardinality + 2);
240
241            #[allow(clippy::cast_precision_loss)]
242            let value = metrics.len() as f64;
243            for metric in metrics {
244                match metric.name() {
245                    CARDINALITY_KEY_NAME => {
246                        assert_eq!(metric.value(), &MetricValue::Gauge { value });
247                        assert_eq!(metric.kind(), MetricKind::Absolute);
248                    }
249                    CARDINALITY_COUNTER_KEY_NAME => {
250                        assert_eq!(metric.value(), &MetricValue::Counter { value });
251                        assert_eq!(metric.kind(), MetricKind::Absolute);
252                    }
253                    _ => {}
254                }
255            }
256        }
257    }
258
259    #[test]
260    fn handles_registered_metrics() {
261        let controller = init_metrics();
262
263        let counter = counter!(CounterName::iter().next().unwrap());
264        assert_eq!(controller.capture_metrics().len(), 3);
265        counter.increment(1);
266        assert_eq!(controller.capture_metrics().len(), 3);
267        let gauge = gauge!(GaugeName::iter().next().unwrap());
268        assert_eq!(controller.capture_metrics().len(), 4);
269        gauge.set(1.0);
270        assert_eq!(controller.capture_metrics().len(), 4);
271    }
272
273    #[test]
274    fn expires_metrics() {
275        let controller = init_metrics();
276        controller
277            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
278            .unwrap();
279
280        let mut names = CounterName::iter();
281        let name_a = names.next().unwrap();
282        let name_b = names.next().unwrap();
283
284        counter!(name_a).increment(1);
285        counter!(name_b).increment(2);
286        assert_eq!(controller.capture_metrics().len(), 4);
287
288        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
289        counter!(name_a).increment(3);
290        assert_eq!(controller.capture_metrics().len(), 3);
291    }
292
293    #[test]
294    fn expires_metrics_tags() {
295        let controller = init_metrics();
296        controller
297            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
298            .unwrap();
299
300        let name = CounterName::iter().next().unwrap();
301        counter!(name, "tag" => "value1").increment(1);
302        counter!(name, "tag" => "value2").increment(2);
303        assert_eq!(controller.capture_metrics().len(), 4);
304
305        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
306        counter!(name, "tag" => "value1").increment(3);
307        assert_eq!(controller.capture_metrics().len(), 3);
308    }
309
310    #[test]
311    fn skips_expiring_registered() {
312        let controller = init_metrics();
313        controller
314            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
315            .unwrap();
316
317        let mut names = CounterName::iter();
318        let name_a = names.next().unwrap();
319        let name_b = names.next().unwrap();
320
321        let a = counter!(name_a);
322        counter!(name_b).increment(5);
323        assert_eq!(controller.capture_metrics().len(), 4);
324        a.increment(1);
325        assert_eq!(controller.capture_metrics().len(), 4);
326
327        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
328        assert_eq!(controller.capture_metrics().len(), 3);
329
330        a.increment(1);
331        let metrics = controller.capture_metrics();
332        assert_eq!(metrics.len(), 3);
333        let metric = metrics
334            .into_iter()
335            .find(|metric| metric.name() == name_a.as_str())
336            .expect("Test metric is not present");
337        match metric.value() {
338            MetricValue::Counter { value } => assert_eq!(*value, 2.0),
339            value => panic!("Invalid metric value {value:?}"),
340        }
341    }
342
343    #[test]
344    fn expires_metrics_per_set() {
345        let controller = init_metrics();
346
347        let mut names = CounterName::iter();
348        let name_a = names.next().unwrap();
349        let name_b = names.next().unwrap();
350
351        controller
352            .set_expiry(
353                None,
354                vec![PerMetricSetExpiration {
355                    name: Some(MetricNameMatcherConfig::Exact {
356                        value: name_b.as_str().to_string(),
357                    }),
358                    labels: None,
359                    expire_secs: IDLE_TIMEOUT,
360                }],
361            )
362            .unwrap();
363
364        counter!(name_a).increment(1);
365        counter!(name_b).increment(2);
366        assert_eq!(controller.capture_metrics().len(), 4);
367
368        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
369        counter!(name_a).increment(3);
370        assert_eq!(controller.capture_metrics().len(), 3);
371    }
372
373    #[test]
374    fn expires_metrics_multiple_different_sets() {
375        let controller = init_metrics();
376
377        let mut names = CounterName::iter();
378        let name_a = names.next().unwrap();
379        let name_b = names.next().unwrap();
380        let name_c = names.next().unwrap();
381        let name_d = names.next().unwrap();
382
383        controller
384            .set_expiry(
385                Some(IDLE_TIMEOUT * 3.0),
386                vec![
387                    PerMetricSetExpiration {
388                        name: Some(MetricNameMatcherConfig::Exact {
389                            value: name_c.as_str().to_string(),
390                        }),
391                        labels: None,
392                        expire_secs: IDLE_TIMEOUT,
393                    },
394                    PerMetricSetExpiration {
395                        name: None,
396                        labels: Some(MetricLabelMatcherConfig::All {
397                            matchers: vec![MetricLabelMatcher::Exact {
398                                key: "tag".to_string(),
399                                value: "value1".to_string(),
400                            }],
401                        }),
402                        expire_secs: IDLE_TIMEOUT * 2.0,
403                    },
404                ],
405            )
406            .unwrap();
407
408        counter!(name_a).increment(1);
409        counter!(name_b).increment(1);
410        counter!(name_c).increment(2);
411        counter!(name_d, "tag" => "value1").increment(3);
412        assert_eq!(controller.capture_metrics().len(), 6);
413
414        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 1.5));
415        counter!(name_b).increment(3);
416        assert_eq!(controller.capture_metrics().len(), 5);
417
418        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
419        counter!(name_b).increment(3);
420        assert_eq!(controller.capture_metrics().len(), 4);
421
422        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
423        counter!(name_b).increment(3);
424        assert_eq!(controller.capture_metrics().len(), 3);
425    }
426}