vector/internal_events/
socket.rs

1use std::net::Ipv4Addr;
2
3use vector_lib::{
4    NamedInternalEvent, counter, histogram,
5    internal_event::{
6        ComponentEventsDropped, CounterName, HistogramName, InternalEvent, UNINTENTIONAL,
7        error_stage, error_type,
8    },
9    json_size::JsonSize,
10};
11
12#[derive(Debug, Clone, Copy, Eq, PartialEq)]
13#[allow(dead_code)] // some features only use some variants
14pub enum SocketMode {
15    Tcp,
16    Udp,
17    Unix,
18}
19
20impl SocketMode {
21    pub const fn as_str(self) -> &'static str {
22        match self {
23            Self::Tcp => "tcp",
24            Self::Udp => "udp",
25            Self::Unix => "unix",
26        }
27    }
28}
29
30#[derive(Debug, NamedInternalEvent)]
31pub struct SocketBytesReceived {
32    pub mode: SocketMode,
33    pub byte_size: usize,
34}
35
36impl InternalEvent for SocketBytesReceived {
37    fn emit(self) {
38        let protocol = self.mode.as_str();
39        trace!(
40            message = "Bytes received.",
41            byte_size = %self.byte_size,
42            %protocol,
43        );
44        counter!(
45            CounterName::ComponentReceivedBytesTotal,
46            "protocol" => protocol,
47        )
48        .increment(self.byte_size as u64);
49        histogram!(HistogramName::ComponentReceivedBytes).record(self.byte_size as f64);
50    }
51}
52
53#[derive(Debug, NamedInternalEvent)]
54pub struct SocketEventsReceived {
55    pub mode: SocketMode,
56    pub byte_size: JsonSize,
57    pub count: usize,
58}
59
60impl InternalEvent for SocketEventsReceived {
61    fn emit(self) {
62        let mode = self.mode.as_str();
63        trace!(
64            message = "Events received.",
65            count = self.count,
66            byte_size = self.byte_size.get(),
67            %mode,
68        );
69        counter!(CounterName::ComponentReceivedEventsTotal, "mode" => mode)
70            .increment(self.count as u64);
71        counter!(CounterName::ComponentReceivedEventBytesTotal, "mode" => mode)
72            .increment(self.byte_size.get() as u64);
73        histogram!(HistogramName::ComponentReceivedBytes, "mode" => mode)
74            .record(self.byte_size.get() as f64);
75    }
76}
77
78#[derive(Debug, NamedInternalEvent)]
79pub struct SocketBytesSent {
80    pub mode: SocketMode,
81    pub byte_size: usize,
82}
83
84impl InternalEvent for SocketBytesSent {
85    fn emit(self) {
86        let protocol = self.mode.as_str();
87        trace!(
88            message = "Bytes sent.",
89            byte_size = %self.byte_size,
90            %protocol,
91        );
92        counter!(
93            CounterName::ComponentSentBytesTotal,
94            "protocol" => protocol,
95        )
96        .increment(self.byte_size as u64);
97    }
98}
99
100#[derive(Debug, NamedInternalEvent)]
101pub struct SocketEventsSent {
102    pub mode: SocketMode,
103    pub count: u64,
104    pub byte_size: JsonSize,
105}
106
107impl InternalEvent for SocketEventsSent {
108    fn emit(self) {
109        trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get());
110        counter!(CounterName::ComponentSentEventsTotal, "mode" => self.mode.as_str())
111            .increment(self.count);
112        counter!(CounterName::ComponentSentEventBytesTotal, "mode" => self.mode.as_str())
113            .increment(self.byte_size.get() as u64);
114    }
115}
116
117#[derive(Debug, NamedInternalEvent)]
118pub struct SocketBindError<E> {
119    pub mode: SocketMode,
120    pub error: E,
121}
122
123impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
124    fn emit(self) {
125        let mode = self.mode.as_str();
126        error!(
127            message = "Error binding socket.",
128            error = %self.error,
129            error_code = "socket_bind",
130            error_type = error_type::IO_FAILED,
131            stage = error_stage::INITIALIZING,
132            %mode,
133        );
134        counter!(
135            CounterName::ComponentErrorsTotal,
136            "error_code" => "socket_bind",
137            "error_type" => error_type::IO_FAILED,
138            "stage" => error_stage::INITIALIZING,
139            "mode" => mode,
140        )
141        .increment(1);
142    }
143}
144
145#[derive(Debug, NamedInternalEvent)]
146pub struct SocketMulticastGroupJoinError<E> {
147    pub error: E,
148    pub group_addr: Ipv4Addr,
149    pub interface: Ipv4Addr,
150}
151
152impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
153    fn emit(self) {
154        // Multicast groups are only used in UDP mode
155        let mode = SocketMode::Udp.as_str();
156        let group_addr = self.group_addr.to_string();
157        let interface = self.interface.to_string();
158
159        error!(
160            message = "Error joining multicast group.",
161            error = %self.error,
162            error_code = "socket_multicast_group_join",
163            error_type = error_type::IO_FAILED,
164            stage = error_stage::INITIALIZING,
165            %mode,
166            %group_addr,
167            %interface,
168        );
169        counter!(
170            CounterName::ComponentErrorsTotal,
171            "error_code" => "socket_multicast_group_join",
172            "error_type" => error_type::IO_FAILED,
173            "stage" => error_stage::INITIALIZING,
174            "mode" => mode,
175            "group_addr" => group_addr,
176            "interface" => interface,
177        )
178        .increment(1);
179    }
180}
181
182#[derive(Debug, NamedInternalEvent)]
183pub struct SocketReceiveError<E> {
184    pub mode: SocketMode,
185    pub error: E,
186}
187
188impl<E: std::fmt::Display> InternalEvent for SocketReceiveError<E> {
189    fn emit(self) {
190        let mode = self.mode.as_str();
191        error!(
192            message = "Error receiving data.",
193            error = %self.error,
194            error_code = "socket_receive",
195            error_type = error_type::READER_FAILED,
196            stage = error_stage::RECEIVING,
197            %mode,
198        );
199        counter!(
200            CounterName::ComponentErrorsTotal,
201            "error_code" => "socket_receive",
202            "error_type" => error_type::READER_FAILED,
203            "stage" => error_stage::RECEIVING,
204            "mode" => mode,
205        )
206        .increment(1);
207    }
208}
209
210#[derive(Debug, NamedInternalEvent)]
211pub struct SocketSendError<E> {
212    pub mode: SocketMode,
213    pub error: E,
214}
215
216impl<E: std::fmt::Display> InternalEvent for SocketSendError<E> {
217    fn emit(self) {
218        let mode = self.mode.as_str();
219        let reason = "Error sending data.";
220        error!(
221            message = reason,
222            error = %self.error,
223            error_code = "socket_send",
224            error_type = error_type::WRITER_FAILED,
225            stage = error_stage::SENDING,
226            %mode,
227        );
228        counter!(
229            CounterName::ComponentErrorsTotal,
230            "error_code" => "socket_send",
231            "error_type" => error_type::WRITER_FAILED,
232            "stage" => error_stage::SENDING,
233            "mode" => mode,
234        )
235        .increment(1);
236
237        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
238    }
239}