vector/internal_events/
common.rs

1use std::time::Instant;
2
3use vector_lib::NamedInternalEvent;
4pub use vector_lib::internal_event::EventsReceived;
5use vector_lib::internal_event::{
6    ComponentEventsDropped, CounterName, HistogramName, InternalEvent, UNINTENTIONAL, error_stage,
7    error_type,
8};
9use vector_lib::{counter, histogram};
10
11#[derive(Debug, NamedInternalEvent)]
12pub struct EndpointBytesReceived<'a> {
13    pub byte_size: usize,
14    pub protocol: &'a str,
15    pub endpoint: &'a str,
16}
17
18impl InternalEvent for EndpointBytesReceived<'_> {
19    fn emit(self) {
20        trace!(
21            message = "Bytes received.",
22            byte_size = %self.byte_size,
23            protocol = %self.protocol,
24            endpoint = %self.endpoint,
25        );
26        counter!(
27            CounterName::ComponentReceivedBytesTotal,
28            "protocol" => self.protocol.to_owned(),
29            "endpoint" => self.endpoint.to_owned(),
30        )
31        .increment(self.byte_size as u64);
32    }
33}
34
35#[derive(Debug, NamedInternalEvent)]
36pub struct EndpointBytesSent<'a> {
37    pub byte_size: usize,
38    pub protocol: &'a str,
39    pub endpoint: &'a str,
40}
41
42impl InternalEvent for EndpointBytesSent<'_> {
43    fn emit(self) {
44        trace!(
45            message = "Bytes sent.",
46            byte_size = %self.byte_size,
47            protocol = %self.protocol,
48            endpoint = %self.endpoint
49        );
50        counter!(
51            CounterName::ComponentSentBytesTotal,
52            "protocol" => self.protocol.to_string(),
53            "endpoint" => self.endpoint.to_string()
54        )
55        .increment(self.byte_size as u64);
56    }
57}
58
59#[derive(Debug, NamedInternalEvent)]
60pub struct SocketOutgoingConnectionError<E> {
61    pub error: E,
62}
63
64impl<E: std::error::Error> InternalEvent for SocketOutgoingConnectionError<E> {
65    fn emit(self) {
66        error!(
67            message = "Unable to connect.",
68            error = %self.error,
69            error_code = "failed_connecting",
70            error_type = error_type::CONNECTION_FAILED,
71            stage = error_stage::SENDING,
72        );
73        counter!(
74            CounterName::ComponentErrorsTotal,
75            "error_code" => "failed_connecting",
76            "error_type" => error_type::CONNECTION_FAILED,
77            "stage" => error_stage::SENDING,
78        )
79        .increment(1);
80    }
81}
82
83const STREAM_CLOSED: &str = "stream_closed";
84
85#[derive(Debug, NamedInternalEvent)]
86pub struct StreamClosedError {
87    pub count: usize,
88}
89
90impl InternalEvent for StreamClosedError {
91    fn emit(self) {
92        error!(
93            message = "Failed to forward event(s), downstream is closed.",
94            error_code = STREAM_CLOSED,
95            error_type = error_type::WRITER_FAILED,
96            stage = error_stage::SENDING,
97        );
98        counter!(
99            CounterName::ComponentErrorsTotal,
100            "error_code" => STREAM_CLOSED,
101            "error_type" => error_type::WRITER_FAILED,
102            "stage" => error_stage::SENDING,
103        )
104        .increment(1);
105        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
106            count: self.count,
107            reason: "Downstream is closed.",
108        });
109    }
110}
111
112#[derive(Debug, NamedInternalEvent)]
113pub struct CollectionCompleted {
114    pub start: Instant,
115    pub end: Instant,
116}
117
118impl InternalEvent for CollectionCompleted {
119    fn emit(self) {
120        debug!(message = "Collection completed.");
121        counter!(CounterName::CollectCompletedTotal).increment(1);
122        histogram!(HistogramName::CollectDurationSeconds).record(self.end - self.start);
123    }
124}
125
126#[derive(Debug, NamedInternalEvent)]
127pub struct SinkRequestBuildError<E> {
128    pub error: E,
129}
130
131impl<E: std::fmt::Display> InternalEvent for SinkRequestBuildError<E> {
132    fn emit(self) {
133        // Providing the name of the sink with the build error is not necessary because the emitted log
134        // message contains the sink name in `component_type` field thanks to `tracing` spans. For example:
135        // "<timestamp> ERROR sink{component_kind="sink" component_id=sink0 component_type=aws_s3 component_name=sink0}: vector::internal_events::common: Failed to build request."
136        error!(
137            message = format!("Failed to build request."),
138            error = %self.error,
139            error_type = error_type::ENCODER_FAILED,
140            stage = error_stage::PROCESSING,
141        );
142        counter!(
143            CounterName::ComponentErrorsTotal,
144            "error_type" => error_type::ENCODER_FAILED,
145            "stage" => error_stage::PROCESSING,
146        )
147        .increment(1);
148    }
149}