vector/internal_events/
udp.rs

1use vector_lib::{
2    NamedInternalEvent, counter,
3    internal_event::{
4        ComponentEventsDropped, CounterName, InternalEvent, UNINTENTIONAL, error_stage, error_type,
5    },
6};
7
8use crate::internal_events::SocketOutgoingConnectionError;
9
10// TODO: Get rid of this. UDP is connectionless, so there's no "successful" connect event, only
11// successfully binding a socket that can be used for receiving.
12#[derive(Debug, NamedInternalEvent)]
13pub struct UdpSocketConnectionEstablished;
14
15impl InternalEvent for UdpSocketConnectionEstablished {
16    fn emit(self) {
17        debug!(message = "Connected.");
18        counter!(CounterName::ConnectionEstablishedTotal, "mode" => "udp").increment(1);
19    }
20}
21
22// TODO: Get rid of this. UDP is connectionless, so there's no "unsuccessful" connect event, only
23// unsuccessfully binding a socket that can be used for receiving.
24#[derive(NamedInternalEvent)]
25pub struct UdpSocketOutgoingConnectionError<E> {
26    pub error: E,
27}
28
29impl<E: std::error::Error> InternalEvent for UdpSocketOutgoingConnectionError<E> {
30    fn emit(self) {
31        // ## skip check-duplicate-events ##
32        // ## skip check-validity-events ##
33        emit!(SocketOutgoingConnectionError { error: self.error });
34    }
35}
36
37#[derive(Debug, NamedInternalEvent)]
38pub struct UdpSendIncompleteError {
39    pub data_size: usize,
40    pub sent: usize,
41}
42
43impl InternalEvent for UdpSendIncompleteError {
44    fn emit(self) {
45        let reason = "Could not send all data in one UDP datagram.";
46        error!(
47            message = reason,
48            data_size = self.data_size,
49            sent = self.sent,
50            dropped = self.data_size - self.sent,
51            error_type = error_type::WRITER_FAILED,
52            stage = error_stage::SENDING,
53        );
54        counter!(
55            CounterName::ComponentErrorsTotal,
56            "error_type" => error_type::WRITER_FAILED,
57            "stage" => error_stage::SENDING,
58        )
59        .increment(1);
60        // deprecated
61        counter!(CounterName::ConnectionSendErrorsTotal, "mode" => "udp").increment(1);
62
63        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
64    }
65}
66
67#[derive(Debug, NamedInternalEvent)]
68pub struct UdpChunkingError {
69    pub error: vector_common::Error,
70    pub data_size: usize,
71}
72
73impl InternalEvent for UdpChunkingError {
74    fn emit(self) {
75        let reason = "Could not chunk UDP datagram.";
76        error!(
77            message = reason,
78            data_size = self.data_size,
79            error = self.error,
80            error_type = error_type::WRITER_FAILED,
81            stage = error_stage::SENDING,
82        );
83        counter!(
84            CounterName::ComponentErrorsTotal,
85            "error_type" => error_type::WRITER_FAILED,
86            "stage" => error_stage::SENDING,
87        )
88        .increment(1);
89
90        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
91    }
92}