1mod ddsketch;
2mod label_filter;
3mod metric_matcher;
4mod recency;
5mod recorder;
6mod storage;
7
8use std::{sync::OnceLock, time::Duration};
9
10use chrono::Utc;
11use metric_matcher::MetricKeyMatcher;
12use metrics::Key;
13use metrics_tracing_context::TracingContextLayer;
14use metrics_util::layers::Layer;
15use snafu::Snafu;
16
17pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
18use self::{
19 label_filter::VectorLabelFilter,
20 recorder::{Registry, VectorRecorder},
21};
22use crate::{
23 config::metrics_expiration::PerMetricSetExpiration,
24 event::{Metric, MetricValue},
25};
26
27type Result<T> = std::result::Result<T, Error>;
28
29#[derive(Clone, Debug, PartialEq, Snafu)]
30pub enum Error {
31 #[snafu(display("Recorder already initialized."))]
32 AlreadyInitialized,
33 #[snafu(display("Metrics system was not initialized."))]
34 NotInitialized,
35 #[snafu(display("Timeout value of {} must be positive.", timeout))]
36 TimeoutMustBePositive { timeout: f64 },
37 #[snafu(display("Invalid regex pattern: {}.", pattern))]
38 InvalidRegexPattern { pattern: String },
39}
40
41static CONTROLLER: OnceLock<Controller> = OnceLock::new();
42
43const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
47static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
48
49const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
51static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
52
53pub struct Controller {
55 recorder: VectorRecorder,
56}
57
58fn metrics_enabled() -> bool {
59 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
60}
61
62fn tracing_context_layer_enabled() -> bool {
63 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
64}
65
66fn init(recorder: VectorRecorder) -> Result<()> {
67 if !metrics_enabled() {
70 metrics::set_global_recorder(metrics::NoopRecorder)
71 .map_err(|_| Error::AlreadyInitialized)?;
72 info!(message = "Internal metrics core is disabled.");
73 return Ok(());
74 }
75
76 if tracing_context_layer_enabled() {
84 metrics::set_global_recorder(
86 TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
87 )
88 .map_err(|_| Error::AlreadyInitialized)?;
89 } else {
90 metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
91 }
92
93 let controller = Controller { recorder };
103 CONTROLLER
104 .set(controller)
105 .map_err(|_| Error::AlreadyInitialized)?;
106
107 Ok(())
108}
109
110pub fn init_global() -> Result<()> {
116 init(VectorRecorder::new_global())
117}
118
119pub fn init_test() {
122 if init(VectorRecorder::new_test()).is_err() {
123 while CONTROLLER.get().is_none() {}
130 }
131}
132
133impl Controller {
134 pub fn reset(&self) {
136 self.recorder.with_registry(Registry::clear);
137 }
138
139 pub fn get() -> Result<&'static Self> {
146 CONTROLLER.get().ok_or(Error::NotInitialized)
147 }
148
149 pub fn set_expiry(
156 &self,
157 global_timeout: Option<f64>,
158 expire_metrics_per_metric_set: Vec<PerMetricSetExpiration>,
159 ) -> Result<()> {
160 if let Some(timeout) = global_timeout
161 && timeout <= 0.0
162 {
163 return Err(Error::TimeoutMustBePositive { timeout });
164 }
165 let per_metric_expiration = expire_metrics_per_metric_set
166 .into_iter()
167 .map(TryInto::try_into)
168 .collect::<Result<Vec<(MetricKeyMatcher, Duration)>>>()?;
169
170 self.recorder.with_registry(|registry| {
171 registry.set_expiry(
172 global_timeout.map(Duration::from_secs_f64),
173 per_metric_expiration,
174 );
175 });
176 Ok(())
177 }
178
179 pub fn capture_metrics(&self) -> Vec<Metric> {
182 let timestamp = Utc::now();
183
184 let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
185
186 #[allow(clippy::cast_precision_loss)]
187 let value = (metrics.len() + 2) as f64;
188 metrics.push(Metric::from_metric_kv(
189 &CARDINALITY_KEY,
190 MetricValue::Gauge { value },
191 timestamp,
192 ));
193 metrics.push(Metric::from_metric_kv(
194 &CARDINALITY_COUNTER_KEY,
195 MetricValue::Counter { value },
196 timestamp,
197 ));
198
199 metrics
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use strum::IntoEnumIterator;
206 use vector_common::{
207 counter, gauge,
208 internal_event::{CounterName, GaugeName},
209 };
210
211 use super::*;
212 use crate::{
213 config::metrics_expiration::{
214 MetricLabelMatcher, MetricLabelMatcherConfig, MetricNameMatcherConfig,
215 },
216 event::MetricKind,
217 };
218
219 const IDLE_TIMEOUT: f64 = 0.5;
220
221 fn init_metrics() -> &'static Controller {
222 init_test();
223 Controller::get().expect("Could not get global metrics controller")
224 }
225
226 #[test]
227 fn cardinality_matches() {
228 for cardinality in [0, 1, 10, 100, 1000, 10000] {
229 init_test();
230 let controller = Controller::get().unwrap();
231 controller.reset();
232
233 let name = CounterName::iter().next().unwrap();
234 for idx in 0..cardinality {
235 counter!(name, "idx" => idx.to_string()).increment(1);
236 }
237
238 let metrics = controller.capture_metrics();
239 assert_eq!(metrics.len(), cardinality + 2);
240
241 #[allow(clippy::cast_precision_loss)]
242 let value = metrics.len() as f64;
243 for metric in metrics {
244 match metric.name() {
245 CARDINALITY_KEY_NAME => {
246 assert_eq!(metric.value(), &MetricValue::Gauge { value });
247 assert_eq!(metric.kind(), MetricKind::Absolute);
248 }
249 CARDINALITY_COUNTER_KEY_NAME => {
250 assert_eq!(metric.value(), &MetricValue::Counter { value });
251 assert_eq!(metric.kind(), MetricKind::Absolute);
252 }
253 _ => {}
254 }
255 }
256 }
257 }
258
259 #[test]
260 fn handles_registered_metrics() {
261 let controller = init_metrics();
262
263 let counter = counter!(CounterName::iter().next().unwrap());
264 assert_eq!(controller.capture_metrics().len(), 3);
265 counter.increment(1);
266 assert_eq!(controller.capture_metrics().len(), 3);
267 let gauge = gauge!(GaugeName::iter().next().unwrap());
268 assert_eq!(controller.capture_metrics().len(), 4);
269 gauge.set(1.0);
270 assert_eq!(controller.capture_metrics().len(), 4);
271 }
272
273 #[test]
274 fn expires_metrics() {
275 let controller = init_metrics();
276 controller
277 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
278 .unwrap();
279
280 let mut names = CounterName::iter();
281 let name_a = names.next().unwrap();
282 let name_b = names.next().unwrap();
283
284 counter!(name_a).increment(1);
285 counter!(name_b).increment(2);
286 assert_eq!(controller.capture_metrics().len(), 4);
287
288 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
289 counter!(name_a).increment(3);
290 assert_eq!(controller.capture_metrics().len(), 3);
291 }
292
293 #[test]
294 fn expires_metrics_tags() {
295 let controller = init_metrics();
296 controller
297 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
298 .unwrap();
299
300 let name = CounterName::iter().next().unwrap();
301 counter!(name, "tag" => "value1").increment(1);
302 counter!(name, "tag" => "value2").increment(2);
303 assert_eq!(controller.capture_metrics().len(), 4);
304
305 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
306 counter!(name, "tag" => "value1").increment(3);
307 assert_eq!(controller.capture_metrics().len(), 3);
308 }
309
310 #[test]
311 fn skips_expiring_registered() {
312 let controller = init_metrics();
313 controller
314 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
315 .unwrap();
316
317 let mut names = CounterName::iter();
318 let name_a = names.next().unwrap();
319 let name_b = names.next().unwrap();
320
321 let a = counter!(name_a);
322 counter!(name_b).increment(5);
323 assert_eq!(controller.capture_metrics().len(), 4);
324 a.increment(1);
325 assert_eq!(controller.capture_metrics().len(), 4);
326
327 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
328 assert_eq!(controller.capture_metrics().len(), 3);
329
330 a.increment(1);
331 let metrics = controller.capture_metrics();
332 assert_eq!(metrics.len(), 3);
333 let metric = metrics
334 .into_iter()
335 .find(|metric| metric.name() == name_a.as_str())
336 .expect("Test metric is not present");
337 match metric.value() {
338 MetricValue::Counter { value } => assert_eq!(*value, 2.0),
339 value => panic!("Invalid metric value {value:?}"),
340 }
341 }
342
343 #[test]
344 fn expires_metrics_per_set() {
345 let controller = init_metrics();
346
347 let mut names = CounterName::iter();
348 let name_a = names.next().unwrap();
349 let name_b = names.next().unwrap();
350
351 controller
352 .set_expiry(
353 None,
354 vec![PerMetricSetExpiration {
355 name: Some(MetricNameMatcherConfig::Exact {
356 value: name_b.as_str().to_string(),
357 }),
358 labels: None,
359 expire_secs: IDLE_TIMEOUT,
360 }],
361 )
362 .unwrap();
363
364 counter!(name_a).increment(1);
365 counter!(name_b).increment(2);
366 assert_eq!(controller.capture_metrics().len(), 4);
367
368 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
369 counter!(name_a).increment(3);
370 assert_eq!(controller.capture_metrics().len(), 3);
371 }
372
373 #[test]
374 fn expires_metrics_multiple_different_sets() {
375 let controller = init_metrics();
376
377 let mut names = CounterName::iter();
378 let name_a = names.next().unwrap();
379 let name_b = names.next().unwrap();
380 let name_c = names.next().unwrap();
381 let name_d = names.next().unwrap();
382
383 controller
384 .set_expiry(
385 Some(IDLE_TIMEOUT * 3.0),
386 vec![
387 PerMetricSetExpiration {
388 name: Some(MetricNameMatcherConfig::Exact {
389 value: name_c.as_str().to_string(),
390 }),
391 labels: None,
392 expire_secs: IDLE_TIMEOUT,
393 },
394 PerMetricSetExpiration {
395 name: None,
396 labels: Some(MetricLabelMatcherConfig::All {
397 matchers: vec![MetricLabelMatcher::Exact {
398 key: "tag".to_string(),
399 value: "value1".to_string(),
400 }],
401 }),
402 expire_secs: IDLE_TIMEOUT * 2.0,
403 },
404 ],
405 )
406 .unwrap();
407
408 counter!(name_a).increment(1);
409 counter!(name_b).increment(1);
410 counter!(name_c).increment(2);
411 counter!(name_d, "tag" => "value1").increment(3);
412 assert_eq!(controller.capture_metrics().len(), 6);
413
414 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 1.5));
415 counter!(name_b).increment(3);
416 assert_eq!(controller.capture_metrics().len(), 5);
417
418 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
419 counter!(name_b).increment(3);
420 assert_eq!(controller.capture_metrics().len(), 4);
421
422 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
423 counter!(name_b).increment(3);
424 assert_eq!(controller.capture_metrics().len(), 3);
425 }
426}