vector/internal_events/
tcp.rs

1use std::net::SocketAddr;
2
3use vector_lib::{
4    NamedInternalEvent, counter,
5    internal_event::{CounterName, InternalEvent, error_stage, error_type},
6};
7
8use crate::{internal_events::SocketOutgoingConnectionError, tls::TlsError};
9
10#[derive(Debug, NamedInternalEvent)]
11pub struct TcpSocketConnectionEstablished {
12    pub peer_addr: Option<SocketAddr>,
13}
14
15impl InternalEvent for TcpSocketConnectionEstablished {
16    fn emit(self) {
17        if let Some(peer_addr) = self.peer_addr {
18            debug!(message = "Connected.", %peer_addr);
19        } else {
20            debug!(message = "Connected.", peer_addr = "unknown");
21        }
22        counter!(CounterName::ConnectionEstablishedTotal, "mode" => "tcp").increment(1);
23    }
24}
25
26#[derive(Debug, NamedInternalEvent)]
27pub struct TcpSocketOutgoingConnectionError<E> {
28    pub error: E,
29}
30
31impl<E: std::error::Error> InternalEvent for TcpSocketOutgoingConnectionError<E> {
32    fn emit(self) {
33        // ## skip check-duplicate-events ##
34        // ## skip check-validity-events ##
35        emit!(SocketOutgoingConnectionError { error: self.error });
36    }
37}
38
39#[derive(Debug, NamedInternalEvent)]
40pub struct TcpSocketConnectionShutdown;
41
42impl InternalEvent for TcpSocketConnectionShutdown {
43    fn emit(self) {
44        warn!(message = "Received EOF from the server, shutdown.");
45        counter!(CounterName::ConnectionShutdownTotal, "mode" => "tcp").increment(1);
46    }
47}
48
49#[cfg(all(unix, feature = "sources-dnstap"))]
50#[derive(Debug, NamedInternalEvent)]
51pub struct TcpSocketError<'a, E> {
52    pub(crate) error: &'a E,
53    pub peer_addr: SocketAddr,
54}
55
56#[cfg(all(unix, feature = "sources-dnstap"))]
57impl<E: std::fmt::Display> InternalEvent for TcpSocketError<'_, E> {
58    fn emit(self) {
59        error!(
60            message = "TCP socket error.",
61            error = %self.error,
62            peer_addr = ?self.peer_addr,
63            error_type = error_type::CONNECTION_FAILED,
64            stage = error_stage::PROCESSING,
65        );
66        counter!(
67            CounterName::ComponentErrorsTotal,
68            "error_type" => error_type::CONNECTION_FAILED,
69            "stage" => error_stage::PROCESSING,
70        )
71        .increment(1);
72    }
73}
74
75#[derive(Debug, NamedInternalEvent)]
76pub struct TcpSocketTlsConnectionError {
77    pub error: TlsError,
78}
79
80impl InternalEvent for TcpSocketTlsConnectionError {
81    fn emit(self) {
82        match self.error {
83            // Specific error that occurs when the other side is only
84            // doing SYN/SYN-ACK connections for healthcheck.
85            // https://github.com/vectordotdev/vector/issues/7318
86            TlsError::Handshake { ref source }
87                if source.code() == openssl::ssl::ErrorCode::SYSCALL
88                    && source.io_error().is_none() =>
89            {
90                debug!(
91                    message = "Connection error, probably a healthcheck.",
92                    error = %self.error,
93                );
94            }
95            _ => {
96                error!(
97                    message = "Connection error.",
98                    error = %self.error,
99                    error_code = "connection_failed",
100                    error_type = error_type::WRITER_FAILED,
101                    stage = error_stage::SENDING,
102                );
103                counter!(
104                    CounterName::ComponentErrorsTotal,
105                    "error_code" => "connection_failed",
106                    "error_type" => error_type::WRITER_FAILED,
107                    "stage" => error_stage::SENDING,
108                    "mode" => "tcp",
109                )
110                .increment(1);
111            }
112        }
113    }
114}
115
116#[derive(Debug, NamedInternalEvent)]
117pub struct TcpSendAckError {
118    pub error: std::io::Error,
119}
120
121impl InternalEvent for TcpSendAckError {
122    fn emit(self) {
123        error!(
124            message = "Error writing acknowledgement, dropping connection.",
125            error = %self.error,
126            error_code = "ack_failed",
127            error_type = error_type::WRITER_FAILED,
128            stage = error_stage::SENDING,
129        );
130        counter!(
131            CounterName::ComponentErrorsTotal,
132            "error_code" => "ack_failed",
133            "error_type" => error_type::WRITER_FAILED,
134            "stage" => error_stage::SENDING,
135            "mode" => "tcp",
136        )
137        .increment(1);
138    }
139}
140
141#[derive(Debug, NamedInternalEvent)]
142pub struct TcpBytesReceived {
143    pub byte_size: usize,
144    pub peer_addr: SocketAddr,
145}
146
147impl InternalEvent for TcpBytesReceived {
148    fn emit(self) {
149        trace!(
150            message = "Bytes received.",
151            protocol = "tcp",
152            byte_size = %self.byte_size,
153            peer_addr = %self.peer_addr,
154        );
155        counter!(
156            CounterName::ComponentReceivedBytesTotal, "protocol" => "tcp"
157        )
158        .increment(self.byte_size as u64);
159    }
160}