vector/internal_events/
tcp.rs1use std::net::SocketAddr;
2
3use vector_lib::{
4 NamedInternalEvent, counter,
5 internal_event::{CounterName, InternalEvent, error_stage, error_type},
6};
7
8use crate::{internal_events::SocketOutgoingConnectionError, tls::TlsError};
9
10#[derive(Debug, NamedInternalEvent)]
11pub struct TcpSocketConnectionEstablished {
12 pub peer_addr: Option<SocketAddr>,
13}
14
15impl InternalEvent for TcpSocketConnectionEstablished {
16 fn emit(self) {
17 if let Some(peer_addr) = self.peer_addr {
18 debug!(message = "Connected.", %peer_addr);
19 } else {
20 debug!(message = "Connected.", peer_addr = "unknown");
21 }
22 counter!(CounterName::ConnectionEstablishedTotal, "mode" => "tcp").increment(1);
23 }
24}
25
26#[derive(Debug, NamedInternalEvent)]
27pub struct TcpSocketOutgoingConnectionError<E> {
28 pub error: E,
29}
30
31impl<E: std::error::Error> InternalEvent for TcpSocketOutgoingConnectionError<E> {
32 fn emit(self) {
33 emit!(SocketOutgoingConnectionError { error: self.error });
36 }
37}
38
39#[derive(Debug, NamedInternalEvent)]
40pub struct TcpSocketConnectionShutdown;
41
42impl InternalEvent for TcpSocketConnectionShutdown {
43 fn emit(self) {
44 warn!(message = "Received EOF from the server, shutdown.");
45 counter!(CounterName::ConnectionShutdownTotal, "mode" => "tcp").increment(1);
46 }
47}
48
49#[cfg(all(unix, feature = "sources-dnstap"))]
50#[derive(Debug, NamedInternalEvent)]
51pub struct TcpSocketError<'a, E> {
52 pub(crate) error: &'a E,
53 pub peer_addr: SocketAddr,
54}
55
56#[cfg(all(unix, feature = "sources-dnstap"))]
57impl<E: std::fmt::Display> InternalEvent for TcpSocketError<'_, E> {
58 fn emit(self) {
59 error!(
60 message = "TCP socket error.",
61 error = %self.error,
62 peer_addr = ?self.peer_addr,
63 error_type = error_type::CONNECTION_FAILED,
64 stage = error_stage::PROCESSING,
65 );
66 counter!(
67 CounterName::ComponentErrorsTotal,
68 "error_type" => error_type::CONNECTION_FAILED,
69 "stage" => error_stage::PROCESSING,
70 )
71 .increment(1);
72 }
73}
74
75#[derive(Debug, NamedInternalEvent)]
76pub struct TcpSocketTlsConnectionError {
77 pub error: TlsError,
78}
79
80impl InternalEvent for TcpSocketTlsConnectionError {
81 fn emit(self) {
82 match self.error {
83 TlsError::Handshake { ref source }
87 if source.code() == openssl::ssl::ErrorCode::SYSCALL
88 && source.io_error().is_none() =>
89 {
90 debug!(
91 message = "Connection error, probably a healthcheck.",
92 error = %self.error,
93 );
94 }
95 _ => {
96 error!(
97 message = "Connection error.",
98 error = %self.error,
99 error_code = "connection_failed",
100 error_type = error_type::WRITER_FAILED,
101 stage = error_stage::SENDING,
102 );
103 counter!(
104 CounterName::ComponentErrorsTotal,
105 "error_code" => "connection_failed",
106 "error_type" => error_type::WRITER_FAILED,
107 "stage" => error_stage::SENDING,
108 "mode" => "tcp",
109 )
110 .increment(1);
111 }
112 }
113 }
114}
115
116#[derive(Debug, NamedInternalEvent)]
117pub struct TcpSendAckError {
118 pub error: std::io::Error,
119}
120
121impl InternalEvent for TcpSendAckError {
122 fn emit(self) {
123 error!(
124 message = "Error writing acknowledgement, dropping connection.",
125 error = %self.error,
126 error_code = "ack_failed",
127 error_type = error_type::WRITER_FAILED,
128 stage = error_stage::SENDING,
129 );
130 counter!(
131 CounterName::ComponentErrorsTotal,
132 "error_code" => "ack_failed",
133 "error_type" => error_type::WRITER_FAILED,
134 "stage" => error_stage::SENDING,
135 "mode" => "tcp",
136 )
137 .increment(1);
138 }
139}
140
141#[derive(Debug, NamedInternalEvent)]
142pub struct TcpBytesReceived {
143 pub byte_size: usize,
144 pub peer_addr: SocketAddr,
145}
146
147impl InternalEvent for TcpBytesReceived {
148 fn emit(self) {
149 trace!(
150 message = "Bytes received.",
151 protocol = "tcp",
152 byte_size = %self.byte_size,
153 peer_addr = %self.peer_addr,
154 );
155 counter!(
156 CounterName::ComponentReceivedBytesTotal, "protocol" => "tcp"
157 )
158 .increment(self.byte_size as u64);
159 }
160}