vector/internal_events/
common.rs1use std::time::Instant;
2
3use vector_lib::NamedInternalEvent;
4pub use vector_lib::internal_event::EventsReceived;
5use vector_lib::internal_event::{
6 ComponentEventsDropped, CounterName, HistogramName, InternalEvent, UNINTENTIONAL, error_stage,
7 error_type,
8};
9use vector_lib::{counter, histogram};
10
11#[derive(Debug, NamedInternalEvent)]
12pub struct EndpointBytesReceived<'a> {
13 pub byte_size: usize,
14 pub protocol: &'a str,
15 pub endpoint: &'a str,
16}
17
18impl InternalEvent for EndpointBytesReceived<'_> {
19 fn emit(self) {
20 trace!(
21 message = "Bytes received.",
22 byte_size = %self.byte_size,
23 protocol = %self.protocol,
24 endpoint = %self.endpoint,
25 );
26 counter!(
27 CounterName::ComponentReceivedBytesTotal,
28 "protocol" => self.protocol.to_owned(),
29 "endpoint" => self.endpoint.to_owned(),
30 )
31 .increment(self.byte_size as u64);
32 }
33}
34
35#[derive(Debug, NamedInternalEvent)]
36pub struct EndpointBytesSent<'a> {
37 pub byte_size: usize,
38 pub protocol: &'a str,
39 pub endpoint: &'a str,
40}
41
42impl InternalEvent for EndpointBytesSent<'_> {
43 fn emit(self) {
44 trace!(
45 message = "Bytes sent.",
46 byte_size = %self.byte_size,
47 protocol = %self.protocol,
48 endpoint = %self.endpoint
49 );
50 counter!(
51 CounterName::ComponentSentBytesTotal,
52 "protocol" => self.protocol.to_string(),
53 "endpoint" => self.endpoint.to_string()
54 )
55 .increment(self.byte_size as u64);
56 }
57}
58
59#[derive(Debug, NamedInternalEvent)]
60pub struct SocketOutgoingConnectionError<E> {
61 pub error: E,
62}
63
64impl<E: std::error::Error> InternalEvent for SocketOutgoingConnectionError<E> {
65 fn emit(self) {
66 error!(
67 message = "Unable to connect.",
68 error = %self.error,
69 error_code = "failed_connecting",
70 error_type = error_type::CONNECTION_FAILED,
71 stage = error_stage::SENDING,
72 );
73 counter!(
74 CounterName::ComponentErrorsTotal,
75 "error_code" => "failed_connecting",
76 "error_type" => error_type::CONNECTION_FAILED,
77 "stage" => error_stage::SENDING,
78 )
79 .increment(1);
80 }
81}
82
83const STREAM_CLOSED: &str = "stream_closed";
84
85#[derive(Debug, NamedInternalEvent)]
86pub struct StreamClosedError {
87 pub count: usize,
88}
89
90impl InternalEvent for StreamClosedError {
91 fn emit(self) {
92 error!(
93 message = "Failed to forward event(s), downstream is closed.",
94 error_code = STREAM_CLOSED,
95 error_type = error_type::WRITER_FAILED,
96 stage = error_stage::SENDING,
97 );
98 counter!(
99 CounterName::ComponentErrorsTotal,
100 "error_code" => STREAM_CLOSED,
101 "error_type" => error_type::WRITER_FAILED,
102 "stage" => error_stage::SENDING,
103 )
104 .increment(1);
105 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
106 count: self.count,
107 reason: "Downstream is closed.",
108 });
109 }
110}
111
112#[derive(Debug, NamedInternalEvent)]
113pub struct CollectionCompleted {
114 pub start: Instant,
115 pub end: Instant,
116}
117
118impl InternalEvent for CollectionCompleted {
119 fn emit(self) {
120 debug!(message = "Collection completed.");
121 counter!(CounterName::CollectCompletedTotal).increment(1);
122 histogram!(HistogramName::CollectDurationSeconds).record(self.end - self.start);
123 }
124}
125
126#[derive(Debug, NamedInternalEvent)]
127pub struct SinkRequestBuildError<E> {
128 pub error: E,
129}
130
131impl<E: std::fmt::Display> InternalEvent for SinkRequestBuildError<E> {
132 fn emit(self) {
133 error!(
137 message = format!("Failed to build request."),
138 error = %self.error,
139 error_type = error_type::ENCODER_FAILED,
140 stage = error_stage::PROCESSING,
141 );
142 counter!(
143 CounterName::ComponentErrorsTotal,
144 "error_type" => error_type::ENCODER_FAILED,
145 "stage" => error_stage::PROCESSING,
146 )
147 .increment(1);
148 }
149}