1use std::time::Duration;
2
3use futures::StreamExt;
4use serde_with::serde_as;
5use tokio::time;
6use tokio_stream::wrappers::IntervalStream;
7use vector_lib::{
8 ByteSizeOf, EstimatedJsonEncodedSizeOf,
9 config::LogNamespace,
10 configurable::configurable_component,
11 internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
12 lookup::lookup_v2::OptionalValuePath,
13};
14
15use crate::{
16 SourceSender,
17 config::{SourceConfig, SourceContext, SourceOutput, log_schema},
18 internal_events::{EventsReceived, StreamClosedError},
19 metrics::Controller,
20 shutdown::ShutdownSignal,
21};
22
23#[serde_as]
25#[configurable_component(source(
26 "internal_metrics",
27 "Expose internal metrics emitted by the running Vector instance."
28))]
29#[derive(Clone, Debug)]
30#[serde(deny_unknown_fields, default)]
31pub struct InternalMetricsConfig {
32 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
34 #[serde(default = "default_scrape_interval")]
35 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
36 pub scrape_interval_secs: Duration,
37
38 #[configurable(derived)]
39 pub tags: TagsConfig,
40
41 #[serde(default = "default_namespace")]
43 pub namespace: String,
44}
45
46impl Default for InternalMetricsConfig {
47 fn default() -> Self {
48 Self {
49 scrape_interval_secs: default_scrape_interval(),
50 tags: TagsConfig::default(),
51 namespace: default_namespace(),
52 }
53 }
54}
55
56#[configurable_component]
58#[derive(Clone, Debug, Default)]
59#[serde(deny_unknown_fields, default)]
60pub struct TagsConfig {
61 pub host_key: Option<OptionalValuePath>,
71
72 #[configurable(metadata(docs::examples = "pid"))]
77 pub pid_key: Option<String>,
78}
79
80fn default_scrape_interval() -> Duration {
81 Duration::from_secs_f64(1.0)
82}
83
84fn default_namespace() -> String {
85 "vector".to_owned()
86}
87
88impl_generate_config_from_default!(InternalMetricsConfig);
89
90#[async_trait::async_trait]
91#[typetag::serde(name = "internal_metrics")]
92impl SourceConfig for InternalMetricsConfig {
93 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
94 if self.scrape_interval_secs.is_zero() {
95 warn!(
96 "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
97 );
98 }
99 let interval = self.scrape_interval_secs;
100
101 let namespace = self.namespace.clone();
103
104 let host_key = self
105 .tags
106 .host_key
107 .clone()
108 .unwrap_or(log_schema().host_key().cloned().into());
109
110 let pid_key = self
111 .tags
112 .pid_key
113 .as_deref()
114 .and_then(|tag| (!tag.is_empty()).then(|| tag.to_owned()));
115
116 Ok(Box::pin(
117 InternalMetrics {
118 namespace,
119 host_key,
120 pid_key,
121 controller: Controller::get()?,
122 interval,
123 out: cx.out,
124 shutdown: cx.shutdown,
125 }
126 .run(),
127 ))
128 }
129
130 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
131 vec![SourceOutput::new_metrics()]
132 }
133
134 fn can_acknowledge(&self) -> bool {
135 false
136 }
137}
138
139struct InternalMetrics<'a> {
140 namespace: String,
141 host_key: OptionalValuePath,
142 pid_key: Option<String>,
143 controller: &'a Controller,
144 interval: time::Duration,
145 out: SourceSender,
146 shutdown: ShutdownSignal,
147}
148
149impl InternalMetrics<'_> {
150 async fn run(mut self) -> Result<(), ()> {
151 let events_received = register!(EventsReceived);
152 let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
153 let mut interval =
154 IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
155 while interval.next().await.is_some() {
156 let hostname = crate::get_hostname();
157 let pid = std::process::id().to_string();
158
159 let metrics = self.controller.capture_metrics();
160 let count = metrics.len();
161 let byte_size = metrics.size_of();
162 let json_size = metrics.estimated_json_encoded_size_of();
163
164 bytes_received.emit(ByteSize(byte_size));
165 events_received.emit(CountByteSize(count, json_size));
166
167 let batch = metrics.into_iter().map(|mut metric| {
168 if self.namespace != "vector" {
171 metric = metric.with_namespace(Some(self.namespace.clone()));
172 }
173
174 if let Some(host_key) = &self.host_key.path
175 && let Ok(hostname) = &hostname
176 {
177 metric.replace_tag(host_key.to_string(), hostname.to_owned());
178 }
179 if let Some(pid_key) = &self.pid_key {
180 metric.replace_tag(pid_key.to_owned(), pid.clone());
181 }
182 metric
183 });
184
185 if (self.out.send_batch(batch).await).is_err() {
186 emit!(StreamClosedError { count });
187 return Err(());
188 }
189 }
190
191 Ok(())
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use std::collections::BTreeMap;
198
199 use strum::IntoEnumIterator;
200 use vector_lib::{
201 counter, gauge, histogram,
202 internal_event::{CounterName, GaugeName, HistogramName},
203 metric_tags,
204 metrics::Controller,
205 };
206
207 use super::*;
208 use crate::{
209 event::{
210 Event,
211 metric::{Metric, MetricValue},
212 },
213 test_util::{
214 self,
215 components::{SOURCE_TAGS, run_and_assert_source_compliance},
216 },
217 };
218
219 #[test]
220 fn generate_config() {
221 test_util::test_generate_config::<InternalMetricsConfig>();
222 }
223
224 #[test]
225 fn captures_internal_metrics() {
226 test_util::trace_init();
227
228 std::thread::sleep(std::time::Duration::from_millis(300));
230
231 let gauge_name = GaugeName::iter().next().unwrap();
232 let counter_name = CounterName::iter().next().unwrap();
233 let mut histogram_iter = HistogramName::iter();
234 let histogram_name = histogram_iter.next().unwrap();
235 let histogram_tagged_name = histogram_iter.next().unwrap();
236
237 gauge!(gauge_name).set(1.0);
238 gauge!(gauge_name).set(2.0);
239 counter!(counter_name).increment(3);
240 counter!(counter_name).increment(4);
241 histogram!(histogram_name).record(5.0);
242 histogram!(histogram_name).record(6.0);
243 histogram!(histogram_tagged_name, "host" => "foo").record(8.0);
244 histogram!(histogram_tagged_name, "host" => "foo").record(8.1);
245
246 let controller = Controller::get().expect("no controller");
247
248 std::thread::sleep(std::time::Duration::from_millis(300));
250
251 let output = controller
252 .capture_metrics()
253 .into_iter()
254 .map(|metric| (metric.name().to_string(), metric))
255 .collect::<BTreeMap<String, Metric>>();
256
257 assert_eq!(
258 &MetricValue::Gauge { value: 2.0 },
259 output[gauge_name.as_str()].value()
260 );
261 assert_eq!(
262 &MetricValue::Counter { value: 7.0 },
263 output[counter_name.as_str()].value()
264 );
265
266 match &output[histogram_name.as_str()].value() {
267 MetricValue::AggregatedHistogram {
268 buckets,
269 count,
270 sum,
271 } => {
272 assert_eq!(buckets[15].count, 2);
277 assert_eq!(*count, 2);
278 assert_eq!(*sum, 11.0);
279 }
280 _ => panic!("wrong type"),
281 }
282
283 match &output[histogram_tagged_name.as_str()].value() {
284 MetricValue::AggregatedHistogram {
285 buckets,
286 count,
287 sum,
288 } => {
289 assert_eq!(buckets[15].count, 1);
294 assert_eq!(buckets[16].count, 1);
295 assert_eq!(*count, 2);
296 assert_eq!(*sum, 16.1);
297 }
298 _ => panic!("wrong type"),
299 }
300
301 let labels = metric_tags!("host" => "foo");
302 assert_eq!(Some(&labels), output[histogram_tagged_name.as_str()].tags());
303 }
304
305 async fn event_from_config(config: InternalMetricsConfig) -> Event {
306 let mut events = run_and_assert_source_compliance(
307 config,
308 time::Duration::from_millis(100),
309 &SOURCE_TAGS,
310 )
311 .await;
312
313 assert!(!events.is_empty());
314 events.remove(0)
315 }
316
317 #[tokio::test]
318 async fn default_namespace() {
319 let event = event_from_config(InternalMetricsConfig::default()).await;
320
321 assert_eq!(event.as_metric().namespace(), Some("vector"));
322 }
323
324 #[tokio::test]
325 async fn sets_tags() {
326 let event = event_from_config(InternalMetricsConfig {
327 tags: TagsConfig {
328 host_key: Some(OptionalValuePath::new("my_host_key")),
329 pid_key: Some(String::from("my_pid_key")),
330 },
331 ..Default::default()
332 })
333 .await;
334
335 let metric = event.as_metric();
336
337 assert!(metric.tag_value("my_host_key").is_some());
338 assert!(metric.tag_value("my_pid_key").is_some());
339 }
340
341 #[tokio::test]
342 async fn only_host_tags_by_default() {
343 let event = event_from_config(InternalMetricsConfig::default()).await;
344
345 let metric = event.as_metric();
346
347 assert!(metric.tag_value("host").is_some());
348 assert!(metric.tag_value("pid").is_none());
349 }
350
351 #[tokio::test]
352 async fn namespace() {
353 let namespace = "totally_custom";
354
355 let config = InternalMetricsConfig {
356 namespace: namespace.to_owned(),
357 ..InternalMetricsConfig::default()
358 };
359
360 let event = event_from_config(config).await;
361
362 assert_eq!(event.as_metric().namespace(), Some(namespace));
363 }
364}