vector/internal_events/
http_client.rs1use std::time::Duration;
2
3use http::{
4 Request, Response,
5 header::{self, HeaderMap, HeaderValue},
6};
7use hyper::{Error, body::HttpBody};
8use vector_lib::{
9 NamedInternalEvent, counter, histogram,
10 internal_event::{CounterName, HistogramName, InternalEvent, error_stage, error_type},
11};
12
13#[derive(Debug, NamedInternalEvent)]
14pub struct AboutToSendHttpRequest<'a, T> {
15 pub request: &'a Request<T>,
16}
17
18fn remove_sensitive(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue> {
19 let mut headers = headers.clone();
20 for name in &[
21 header::AUTHORIZATION,
22 header::PROXY_AUTHORIZATION,
23 header::COOKIE,
24 header::SET_COOKIE,
25 ] {
26 if let Some(value) = headers.get_mut(name) {
27 value.set_sensitive(true);
28 }
29 }
30 headers
31}
32
33impl<T: HttpBody> InternalEvent for AboutToSendHttpRequest<'_, T> {
34 fn emit(self) {
35 debug!(
36 message = "Sending HTTP request.",
37 uri = %self.request.uri(),
38 method = %self.request.method(),
39 version = ?self.request.version(),
40 headers = ?remove_sensitive(self.request.headers()),
41 body = %FormatBody(self.request.body()),
42 );
43 counter!(CounterName::HttpClientRequestsSentTotal, "method" => self.request.method().to_string())
44 .increment(1);
45 }
46}
47
48#[derive(Debug, NamedInternalEvent)]
49pub struct GotHttpResponse<'a, T> {
50 pub response: &'a Response<T>,
51 pub roundtrip: Duration,
52}
53
54impl<T: HttpBody> InternalEvent for GotHttpResponse<'_, T> {
55 fn emit(self) {
56 debug!(
57 message = "HTTP response.",
58 status = %self.response.status(),
59 version = ?self.response.version(),
60 headers = ?remove_sensitive(self.response.headers()),
61 body = %FormatBody(self.response.body()),
62 );
63 counter!(
64 CounterName::HttpClientResponsesTotal,
65 "status" => self.response.status().as_u16().to_string(),
66 )
67 .increment(1);
68 histogram!(HistogramName::HttpClientRttSeconds).record(self.roundtrip);
69 histogram!(
70 HistogramName::HttpClientResponseRttSeconds,
71 "status" => self.response.status().as_u16().to_string(),
72 )
73 .record(self.roundtrip);
74 }
75}
76
77#[derive(Debug, NamedInternalEvent)]
78pub struct GotHttpWarning<'a> {
79 pub error: &'a Error,
80 pub roundtrip: Duration,
81}
82
83impl InternalEvent for GotHttpWarning<'_> {
84 fn emit(self) {
85 warn!(
86 message = "HTTP error.",
87 error = %self.error,
88 error_type = error_type::REQUEST_FAILED,
89 stage = error_stage::PROCESSING,
90 );
91 counter!(CounterName::HttpClientErrorsTotal, "error_kind" => self.error.to_string())
92 .increment(1);
93 histogram!(HistogramName::HttpClientRttSeconds).record(self.roundtrip);
94 histogram!(HistogramName::HttpClientErrorRttSeconds, "error_kind" => self.error.to_string())
95 .record(self.roundtrip);
96 }
97}
98
99struct FormatBody<'a, B>(&'a B);
101
102impl<B: HttpBody> std::fmt::Display for FormatBody<'_, B> {
103 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
104 let size = self.0.size_hint();
105 match (size.lower(), size.upper()) {
106 (0, None) => write!(fmt, "[unknown]"),
107 (lower, None) => write!(fmt, "[>={lower} bytes]"),
108
109 (0, Some(0)) => write!(fmt, "[empty]"),
110 (0, Some(upper)) => write!(fmt, "[<={upper} bytes]"),
111
112 (lower, Some(upper)) if lower == upper => write!(fmt, "[{lower} bytes]"),
113 (lower, Some(upper)) => write!(fmt, "[{lower}..={upper} bytes]"),
114 }
115 }
116}