vector_common/internal_event/
events_sent.rs

1use std::sync::Arc;
2
3use metrics::Counter;
4
5use crate::counter;
6use tracing::trace;
7
8use super::{CountByteSize, CounterName, OptionalTag, Output, SharedString};
9use crate::config::ComponentKey;
10
11pub const DEFAULT_OUTPUT: &str = "_default";
12
13crate::registered_event!(
14    EventsSent {
15        output: Option<SharedString>,
16    } => {
17        events: Counter = if let Some(output) = &self.output {
18            counter!(CounterName::ComponentSentEventsTotal, "output" => output.clone())
19        } else {
20            counter!(CounterName::ComponentSentEventsTotal)
21        },
22        event_bytes: Counter = if let Some(output) = &self.output {
23            counter!(CounterName::ComponentSentEventBytesTotal, "output" => output.clone())
24        } else {
25            counter!(CounterName::ComponentSentEventBytesTotal)
26        },
27        output: Option<SharedString> = self.output,
28    }
29
30    fn emit(&self, data: CountByteSize) {
31        let CountByteSize(count, byte_size) = data;
32
33        match &self.output {
34            Some(output) => {
35                trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output);
36            }
37            None => {
38                trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get());
39            }
40        }
41
42        self.events.increment(count as u64);
43        self.event_bytes.increment(byte_size.get() as u64);
44    }
45);
46
47impl From<Output> for EventsSent {
48    fn from(output: Output) -> Self {
49        Self { output: output.0 }
50    }
51}
52
53/// Makes a list of the tags to use with the events sent event.
54fn make_tags(
55    source: &OptionalTag<Arc<ComponentKey>>,
56    service: &OptionalTag<String>,
57) -> Vec<(&'static str, String)> {
58    let mut tags = Vec::new();
59    if let OptionalTag::Specified(tag) = source {
60        tags.push((
61            "source",
62            tag.as_ref()
63                .map_or_else(|| "-".to_string(), |tag| tag.id().to_string()),
64        ));
65    }
66
67    if let OptionalTag::Specified(tag) = service {
68        tags.push(("service", tag.clone().unwrap_or("-".to_string())));
69    }
70
71    tags
72}
73
74crate::registered_event!(
75    TaggedEventsSent {
76        source: OptionalTag<Arc<ComponentKey>>,
77        service: OptionalTag<String>,
78    } => {
79        events: Counter = {
80            counter!(CounterName::ComponentSentEventsTotal, &make_tags(&self.source, &self.service))
81        },
82        event_bytes: Counter = {
83            counter!(CounterName::ComponentSentEventBytesTotal, &make_tags(&self.source, &self.service))
84        },
85    }
86
87    fn emit(&self, data: CountByteSize) {
88        let CountByteSize(count, byte_size) = data;
89        trace!(message = "Events sent.", %count, %byte_size);
90
91        self.events.increment(count as u64);
92        self.event_bytes.increment(byte_size.get() as u64);
93    }
94
95    fn register(_fixed: (), tags: TaggedEventsSent) {
96        super::register(tags)
97    }
98);
99
100impl TaggedEventsSent {
101    #[must_use]
102    pub fn new_empty() -> Self {
103        Self {
104            source: OptionalTag::Specified(None),
105            service: OptionalTag::Specified(None),
106        }
107    }
108
109    #[must_use]
110    pub fn new_unspecified() -> Self {
111        Self {
112            source: OptionalTag::Ignored,
113            service: OptionalTag::Ignored,
114        }
115    }
116}