1use std::net::Ipv4Addr;
2
3use vector_lib::{
4 NamedInternalEvent, counter, histogram,
5 internal_event::{
6 ComponentEventsDropped, CounterName, HistogramName, InternalEvent, UNINTENTIONAL,
7 error_stage, error_type,
8 },
9 json_size::JsonSize,
10};
11
12#[derive(Debug, Clone, Copy, Eq, PartialEq)]
13#[allow(dead_code)] pub enum SocketMode {
15 Tcp,
16 Udp,
17 Unix,
18}
19
20impl SocketMode {
21 pub const fn as_str(self) -> &'static str {
22 match self {
23 Self::Tcp => "tcp",
24 Self::Udp => "udp",
25 Self::Unix => "unix",
26 }
27 }
28}
29
30#[derive(Debug, NamedInternalEvent)]
31pub struct SocketBytesReceived {
32 pub mode: SocketMode,
33 pub byte_size: usize,
34}
35
36impl InternalEvent for SocketBytesReceived {
37 fn emit(self) {
38 let protocol = self.mode.as_str();
39 trace!(
40 message = "Bytes received.",
41 byte_size = %self.byte_size,
42 %protocol,
43 );
44 counter!(
45 CounterName::ComponentReceivedBytesTotal,
46 "protocol" => protocol,
47 )
48 .increment(self.byte_size as u64);
49 histogram!(HistogramName::ComponentReceivedBytes).record(self.byte_size as f64);
50 }
51}
52
53#[derive(Debug, NamedInternalEvent)]
54pub struct SocketEventsReceived {
55 pub mode: SocketMode,
56 pub byte_size: JsonSize,
57 pub count: usize,
58}
59
60impl InternalEvent for SocketEventsReceived {
61 fn emit(self) {
62 let mode = self.mode.as_str();
63 trace!(
64 message = "Events received.",
65 count = self.count,
66 byte_size = self.byte_size.get(),
67 %mode,
68 );
69 counter!(CounterName::ComponentReceivedEventsTotal, "mode" => mode)
70 .increment(self.count as u64);
71 counter!(CounterName::ComponentReceivedEventBytesTotal, "mode" => mode)
72 .increment(self.byte_size.get() as u64);
73 histogram!(HistogramName::ComponentReceivedBytes, "mode" => mode)
74 .record(self.byte_size.get() as f64);
75 }
76}
77
78#[derive(Debug, NamedInternalEvent)]
79pub struct SocketBytesSent {
80 pub mode: SocketMode,
81 pub byte_size: usize,
82}
83
84impl InternalEvent for SocketBytesSent {
85 fn emit(self) {
86 let protocol = self.mode.as_str();
87 trace!(
88 message = "Bytes sent.",
89 byte_size = %self.byte_size,
90 %protocol,
91 );
92 counter!(
93 CounterName::ComponentSentBytesTotal,
94 "protocol" => protocol,
95 )
96 .increment(self.byte_size as u64);
97 }
98}
99
100#[derive(Debug, NamedInternalEvent)]
101pub struct SocketEventsSent {
102 pub mode: SocketMode,
103 pub count: u64,
104 pub byte_size: JsonSize,
105}
106
107impl InternalEvent for SocketEventsSent {
108 fn emit(self) {
109 trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get());
110 counter!(CounterName::ComponentSentEventsTotal, "mode" => self.mode.as_str())
111 .increment(self.count);
112 counter!(CounterName::ComponentSentEventBytesTotal, "mode" => self.mode.as_str())
113 .increment(self.byte_size.get() as u64);
114 }
115}
116
117#[derive(Debug, NamedInternalEvent)]
118pub struct SocketBindError<E> {
119 pub mode: SocketMode,
120 pub error: E,
121}
122
123impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
124 fn emit(self) {
125 let mode = self.mode.as_str();
126 error!(
127 message = "Error binding socket.",
128 error = %self.error,
129 error_code = "socket_bind",
130 error_type = error_type::IO_FAILED,
131 stage = error_stage::INITIALIZING,
132 %mode,
133 );
134 counter!(
135 CounterName::ComponentErrorsTotal,
136 "error_code" => "socket_bind",
137 "error_type" => error_type::IO_FAILED,
138 "stage" => error_stage::INITIALIZING,
139 "mode" => mode,
140 )
141 .increment(1);
142 }
143}
144
145#[derive(Debug, NamedInternalEvent)]
146pub struct SocketMulticastGroupJoinError<E> {
147 pub error: E,
148 pub group_addr: Ipv4Addr,
149 pub interface: Ipv4Addr,
150}
151
152impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
153 fn emit(self) {
154 let mode = SocketMode::Udp.as_str();
156 let group_addr = self.group_addr.to_string();
157 let interface = self.interface.to_string();
158
159 error!(
160 message = "Error joining multicast group.",
161 error = %self.error,
162 error_code = "socket_multicast_group_join",
163 error_type = error_type::IO_FAILED,
164 stage = error_stage::INITIALIZING,
165 %mode,
166 %group_addr,
167 %interface,
168 );
169 counter!(
170 CounterName::ComponentErrorsTotal,
171 "error_code" => "socket_multicast_group_join",
172 "error_type" => error_type::IO_FAILED,
173 "stage" => error_stage::INITIALIZING,
174 "mode" => mode,
175 "group_addr" => group_addr,
176 "interface" => interface,
177 )
178 .increment(1);
179 }
180}
181
182#[derive(Debug, NamedInternalEvent)]
183pub struct SocketReceiveError<E> {
184 pub mode: SocketMode,
185 pub error: E,
186}
187
188impl<E: std::fmt::Display> InternalEvent for SocketReceiveError<E> {
189 fn emit(self) {
190 let mode = self.mode.as_str();
191 error!(
192 message = "Error receiving data.",
193 error = %self.error,
194 error_code = "socket_receive",
195 error_type = error_type::READER_FAILED,
196 stage = error_stage::RECEIVING,
197 %mode,
198 );
199 counter!(
200 CounterName::ComponentErrorsTotal,
201 "error_code" => "socket_receive",
202 "error_type" => error_type::READER_FAILED,
203 "stage" => error_stage::RECEIVING,
204 "mode" => mode,
205 )
206 .increment(1);
207 }
208}
209
210#[derive(Debug, NamedInternalEvent)]
211pub struct SocketSendError<E> {
212 pub mode: SocketMode,
213 pub error: E,
214}
215
216impl<E: std::fmt::Display> InternalEvent for SocketSendError<E> {
217 fn emit(self) {
218 let mode = self.mode.as_str();
219 let reason = "Error sending data.";
220 error!(
221 message = reason,
222 error = %self.error,
223 error_code = "socket_send",
224 error_type = error_type::WRITER_FAILED,
225 stage = error_stage::SENDING,
226 %mode,
227 );
228 counter!(
229 CounterName::ComponentErrorsTotal,
230 "error_code" => "socket_send",
231 "error_type" => error_type::WRITER_FAILED,
232 "stage" => error_stage::SENDING,
233 "mode" => mode,
234 )
235 .increment(1);
236
237 emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
238 }
239}