vector/sources/
internal_metrics.rs

1use std::time::Duration;
2
3use futures::StreamExt;
4use serde_with::serde_as;
5use tokio::time;
6use tokio_stream::wrappers::IntervalStream;
7use vector_lib::{
8    ByteSizeOf, EstimatedJsonEncodedSizeOf,
9    config::LogNamespace,
10    configurable::configurable_component,
11    internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
12    lookup::lookup_v2::OptionalValuePath,
13};
14
15use crate::{
16    SourceSender,
17    config::{SourceConfig, SourceContext, SourceOutput, log_schema},
18    internal_events::{EventsReceived, StreamClosedError},
19    metrics::Controller,
20    shutdown::ShutdownSignal,
21};
22
23/// Configuration for the `internal_metrics` source.
24#[serde_as]
25#[configurable_component(source(
26    "internal_metrics",
27    "Expose internal metrics emitted by the running Vector instance."
28))]
29#[derive(Clone, Debug)]
30#[serde(deny_unknown_fields, default)]
31pub struct InternalMetricsConfig {
32    /// The interval between metric gathering, in seconds.
33    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
34    #[serde(default = "default_scrape_interval")]
35    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
36    pub scrape_interval_secs: Duration,
37
38    #[configurable(derived)]
39    pub tags: TagsConfig,
40
41    /// Overrides the default namespace for the metrics emitted by the source.
42    #[serde(default = "default_namespace")]
43    pub namespace: String,
44}
45
46impl Default for InternalMetricsConfig {
47    fn default() -> Self {
48        Self {
49            scrape_interval_secs: default_scrape_interval(),
50            tags: TagsConfig::default(),
51            namespace: default_namespace(),
52        }
53    }
54}
55
56/// Tag configuration for the `internal_metrics` source.
57#[configurable_component]
58#[derive(Clone, Debug, Default)]
59#[serde(deny_unknown_fields, default)]
60pub struct TagsConfig {
61    /// Overrides the name of the tag used to add the peer host to each metric.
62    ///
63    /// The value is the peer host's address, including the port. For example, `1.2.3.4:9000`.
64    ///
65    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
66    ///
67    /// Set to `""` to suppress this key.
68    ///
69    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
70    pub host_key: Option<OptionalValuePath>,
71
72    /// Sets the name of the tag to use to add the current process ID to each metric.
73    ///
74    ///
75    /// By default, this is not set and the tag is not automatically added.
76    #[configurable(metadata(docs::examples = "pid"))]
77    pub pid_key: Option<String>,
78}
79
80fn default_scrape_interval() -> Duration {
81    Duration::from_secs_f64(1.0)
82}
83
84fn default_namespace() -> String {
85    "vector".to_owned()
86}
87
88impl_generate_config_from_default!(InternalMetricsConfig);
89
90#[async_trait::async_trait]
91#[typetag::serde(name = "internal_metrics")]
92impl SourceConfig for InternalMetricsConfig {
93    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
94        if self.scrape_interval_secs.is_zero() {
95            warn!(
96                "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
97            );
98        }
99        let interval = self.scrape_interval_secs;
100
101        // namespace for created metrics is already "vector" by default.
102        let namespace = self.namespace.clone();
103
104        let host_key = self
105            .tags
106            .host_key
107            .clone()
108            .unwrap_or(log_schema().host_key().cloned().into());
109
110        let pid_key = self
111            .tags
112            .pid_key
113            .as_deref()
114            .and_then(|tag| (!tag.is_empty()).then(|| tag.to_owned()));
115
116        Ok(Box::pin(
117            InternalMetrics {
118                namespace,
119                host_key,
120                pid_key,
121                controller: Controller::get()?,
122                interval,
123                out: cx.out,
124                shutdown: cx.shutdown,
125            }
126            .run(),
127        ))
128    }
129
130    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
131        vec![SourceOutput::new_metrics()]
132    }
133
134    fn can_acknowledge(&self) -> bool {
135        false
136    }
137}
138
139struct InternalMetrics<'a> {
140    namespace: String,
141    host_key: OptionalValuePath,
142    pid_key: Option<String>,
143    controller: &'a Controller,
144    interval: time::Duration,
145    out: SourceSender,
146    shutdown: ShutdownSignal,
147}
148
149impl InternalMetrics<'_> {
150    async fn run(mut self) -> Result<(), ()> {
151        let events_received = register!(EventsReceived);
152        let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
153        let mut interval =
154            IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
155        while interval.next().await.is_some() {
156            let hostname = crate::get_hostname();
157            let pid = std::process::id().to_string();
158
159            let metrics = self.controller.capture_metrics();
160            let count = metrics.len();
161            let byte_size = metrics.size_of();
162            let json_size = metrics.estimated_json_encoded_size_of();
163
164            bytes_received.emit(ByteSize(byte_size));
165            events_received.emit(CountByteSize(count, json_size));
166
167            let batch = metrics.into_iter().map(|mut metric| {
168                // A metric starts out with a default "vector" namespace, but will be overridden
169                // if an explicit namespace is provided to this source.
170                if self.namespace != "vector" {
171                    metric = metric.with_namespace(Some(self.namespace.clone()));
172                }
173
174                if let Some(host_key) = &self.host_key.path
175                    && let Ok(hostname) = &hostname
176                {
177                    metric.replace_tag(host_key.to_string(), hostname.to_owned());
178                }
179                if let Some(pid_key) = &self.pid_key {
180                    metric.replace_tag(pid_key.to_owned(), pid.clone());
181                }
182                metric
183            });
184
185            if (self.out.send_batch(batch).await).is_err() {
186                emit!(StreamClosedError { count });
187                return Err(());
188            }
189        }
190
191        Ok(())
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use std::collections::BTreeMap;
198
199    use strum::IntoEnumIterator;
200    use vector_lib::{
201        counter, gauge, histogram,
202        internal_event::{CounterName, GaugeName, HistogramName},
203        metric_tags,
204        metrics::Controller,
205    };
206
207    use super::*;
208    use crate::{
209        event::{
210            Event,
211            metric::{Metric, MetricValue},
212        },
213        test_util::{
214            self,
215            components::{SOURCE_TAGS, run_and_assert_source_compliance},
216        },
217    };
218
219    #[test]
220    fn generate_config() {
221        test_util::test_generate_config::<InternalMetricsConfig>();
222    }
223
224    #[test]
225    fn captures_internal_metrics() {
226        test_util::trace_init();
227
228        // There *seems* to be a race condition here (CI was flaky), so add a slight delay.
229        std::thread::sleep(std::time::Duration::from_millis(300));
230
231        let gauge_name = GaugeName::iter().next().unwrap();
232        let counter_name = CounterName::iter().next().unwrap();
233        let mut histogram_iter = HistogramName::iter();
234        let histogram_name = histogram_iter.next().unwrap();
235        let histogram_tagged_name = histogram_iter.next().unwrap();
236
237        gauge!(gauge_name).set(1.0);
238        gauge!(gauge_name).set(2.0);
239        counter!(counter_name).increment(3);
240        counter!(counter_name).increment(4);
241        histogram!(histogram_name).record(5.0);
242        histogram!(histogram_name).record(6.0);
243        histogram!(histogram_tagged_name, "host" => "foo").record(8.0);
244        histogram!(histogram_tagged_name, "host" => "foo").record(8.1);
245
246        let controller = Controller::get().expect("no controller");
247
248        // There *seems* to be a race condition here (CI was flaky), so add a slight delay.
249        std::thread::sleep(std::time::Duration::from_millis(300));
250
251        let output = controller
252            .capture_metrics()
253            .into_iter()
254            .map(|metric| (metric.name().to_string(), metric))
255            .collect::<BTreeMap<String, Metric>>();
256
257        assert_eq!(
258            &MetricValue::Gauge { value: 2.0 },
259            output[gauge_name.as_str()].value()
260        );
261        assert_eq!(
262            &MetricValue::Counter { value: 7.0 },
263            output[counter_name.as_str()].value()
264        );
265
266        match &output[histogram_name.as_str()].value() {
267            MetricValue::AggregatedHistogram {
268                buckets,
269                count,
270                sum,
271            } => {
272                // This index is _only_ stable so long as the offsets in
273                // [`metrics::handle::Histogram::new`] are hard-coded. If this
274                // check fails you might look there and see if we've allowed
275                // users to set their own bucket widths.
276                assert_eq!(buckets[15].count, 2);
277                assert_eq!(*count, 2);
278                assert_eq!(*sum, 11.0);
279            }
280            _ => panic!("wrong type"),
281        }
282
283        match &output[histogram_tagged_name.as_str()].value() {
284            MetricValue::AggregatedHistogram {
285                buckets,
286                count,
287                sum,
288            } => {
289                // This index is _only_ stable so long as the offsets in
290                // [`metrics::handle::Histogram::new`] are hard-coded. If this
291                // check fails you might look there and see if we've allowed
292                // users to set their own bucket widths.
293                assert_eq!(buckets[15].count, 1);
294                assert_eq!(buckets[16].count, 1);
295                assert_eq!(*count, 2);
296                assert_eq!(*sum, 16.1);
297            }
298            _ => panic!("wrong type"),
299        }
300
301        let labels = metric_tags!("host" => "foo");
302        assert_eq!(Some(&labels), output[histogram_tagged_name.as_str()].tags());
303    }
304
305    async fn event_from_config(config: InternalMetricsConfig) -> Event {
306        let mut events = run_and_assert_source_compliance(
307            config,
308            time::Duration::from_millis(100),
309            &SOURCE_TAGS,
310        )
311        .await;
312
313        assert!(!events.is_empty());
314        events.remove(0)
315    }
316
317    #[tokio::test]
318    async fn default_namespace() {
319        let event = event_from_config(InternalMetricsConfig::default()).await;
320
321        assert_eq!(event.as_metric().namespace(), Some("vector"));
322    }
323
324    #[tokio::test]
325    async fn sets_tags() {
326        let event = event_from_config(InternalMetricsConfig {
327            tags: TagsConfig {
328                host_key: Some(OptionalValuePath::new("my_host_key")),
329                pid_key: Some(String::from("my_pid_key")),
330            },
331            ..Default::default()
332        })
333        .await;
334
335        let metric = event.as_metric();
336
337        assert!(metric.tag_value("my_host_key").is_some());
338        assert!(metric.tag_value("my_pid_key").is_some());
339    }
340
341    #[tokio::test]
342    async fn only_host_tags_by_default() {
343        let event = event_from_config(InternalMetricsConfig::default()).await;
344
345        let metric = event.as_metric();
346
347        assert!(metric.tag_value("host").is_some());
348        assert!(metric.tag_value("pid").is_none());
349    }
350
351    #[tokio::test]
352    async fn namespace() {
353        let namespace = "totally_custom";
354
355        let config = InternalMetricsConfig {
356            namespace: namespace.to_owned(),
357            ..InternalMetricsConfig::default()
358        };
359
360        let event = event_from_config(config).await;
361
362        assert_eq!(event.as_metric().namespace(), Some(namespace));
363    }
364}