vector/sinks/http/
config.rs

1//! Configuration for the `http` sink.
2
3use std::{collections::BTreeMap, path::PathBuf};
4
5#[cfg(feature = "aws-core")]
6use aws_config::meta::region::ProvideRegion;
7#[cfg(feature = "aws-core")]
8use aws_types::region::Region;
9use http::{HeaderName, HeaderValue, Method, Request, StatusCode, header::AUTHORIZATION};
10use hyper::Body;
11use vector_lib::codecs::{
12    CharacterDelimitedEncoder,
13    encoding::{Framer, Serializer},
14};
15#[cfg(feature = "aws-core")]
16use vector_lib::config::proxy::ProxyConfig;
17
18use super::{
19    encoder::HttpEncoder, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder,
20    sink::HttpSink,
21};
22#[cfg(feature = "aws-core")]
23use crate::aws::AwsAuthentication;
24#[cfg(feature = "aws-core")]
25use crate::sinks::util::http::SigV4Config;
26use crate::{
27    codecs::{EncodingConfigWithFraming, SinkType},
28    http::{Auth, HttpClient, MaybeAuth},
29    sinks::{
30        prelude::*,
31        util::{
32            RealtimeSizeBasedDefaultBatchSettings, UriSerde,
33            http::{
34                HttpService, OrderedHeaderName, RequestConfig, RetryStrategy,
35                http_response_retry_logic,
36            },
37        },
38    },
39};
40
41const CONTENT_TYPE_TEXT: &str = "text/plain";
42const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson";
43const CONTENT_TYPE_JSON: &str = "application/json";
44
45/// Configuration for the `http` sink.
46#[configurable_component(sink("http", "Deliver observability event data to an HTTP server."))]
47#[derive(Clone, Debug)]
48#[serde(deny_unknown_fields)]
49pub struct HttpSinkConfig {
50    /// The full URI to make HTTP requests to.
51    ///
52    /// This should include the protocol and host, but can also include the port, path, and any other valid part of a URI.
53    #[configurable(metadata(docs::examples = "https://10.22.212.22:9000/endpoint"))]
54    pub uri: Template,
55
56    /// The HTTP method to use when making the request.
57    #[serde(default)]
58    pub method: HttpMethod,
59
60    #[configurable(derived)]
61    pub auth: Option<Auth>,
62
63    #[configurable(derived)]
64    #[serde(default)]
65    pub compression: Compression,
66
67    #[serde(flatten)]
68    pub encoding: EncodingConfigWithFraming,
69
70    /// A string to prefix the payload with.
71    ///
72    /// This option is ignored if the encoding is not character delimited JSON.
73    ///
74    /// If specified, the `payload_suffix` must also be specified and together they must produce a valid JSON object.
75    #[configurable(metadata(docs::examples = "{\"data\":"))]
76    #[serde(default)]
77    pub payload_prefix: String,
78
79    /// A string to suffix the payload with.
80    ///
81    /// This option is ignored if the encoding is not character delimited JSON.
82    ///
83    /// If specified, the `payload_prefix` must also be specified and together they must produce a valid JSON object.
84    #[configurable(metadata(docs::examples = "}"))]
85    #[serde(default)]
86    pub payload_suffix: String,
87
88    #[configurable(derived)]
89    #[serde(default)]
90    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
91
92    #[configurable(derived)]
93    #[serde(default)]
94    pub request: RequestConfig,
95
96    #[configurable(derived)]
97    pub tls: Option<TlsConfig>,
98
99    #[configurable(derived)]
100    #[serde(
101        default,
102        deserialize_with = "crate::serde::bool_or_struct",
103        skip_serializing_if = "crate::serde::is_default"
104    )]
105    pub acknowledgements: AcknowledgementsConfig,
106
107    #[configurable(derived)]
108    #[serde(default)]
109    pub retry_strategy: RetryStrategy,
110}
111
112/// HTTP method.
113///
114/// A subset of the HTTP methods described in [RFC 9110, section 9.1][rfc9110] are supported.
115///
116/// [rfc9110]: https://datatracker.ietf.org/doc/html/rfc9110#section-9.1
117#[configurable_component]
118#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
119#[serde(rename_all = "snake_case")]
120pub enum HttpMethod {
121    /// GET.
122    Get,
123
124    /// HEAD.
125    Head,
126
127    /// POST.
128    #[default]
129    Post,
130
131    /// PUT.
132    Put,
133
134    /// DELETE.
135    Delete,
136
137    /// OPTIONS.
138    Options,
139
140    /// TRACE.
141    Trace,
142
143    /// PATCH.
144    Patch,
145}
146
147impl From<HttpMethod> for Method {
148    fn from(http_method: HttpMethod) -> Self {
149        match http_method {
150            HttpMethod::Head => Self::HEAD,
151            HttpMethod::Get => Self::GET,
152            HttpMethod::Post => Self::POST,
153            HttpMethod::Put => Self::PUT,
154            HttpMethod::Patch => Self::PATCH,
155            HttpMethod::Delete => Self::DELETE,
156            HttpMethod::Options => Self::OPTIONS,
157            HttpMethod::Trace => Self::TRACE,
158        }
159    }
160}
161
162impl HttpSinkConfig {
163    fn build_http_client(&self, cx: &SinkContext) -> crate::Result<HttpClient> {
164        let tls = TlsSettings::from_options(self.tls.as_ref())?;
165        Ok(HttpClient::new(tls, cx.proxy())?)
166    }
167
168    pub(super) fn build_encoder(&self) -> crate::Result<Encoder<Framer>> {
169        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
170        Ok(Encoder::<Framer>::new(framer, serializer))
171    }
172}
173
174impl GenerateConfig for HttpSinkConfig {
175    fn generate_config() -> toml::Value {
176        toml::from_str(
177            r#"uri = "https://10.22.212.22:9000/endpoint"
178            encoding.codec = "json""#,
179        )
180        .unwrap()
181    }
182}
183
184async fn healthcheck(uri: UriSerde, auth: Option<Auth>, client: HttpClient) -> crate::Result<()> {
185    let auth = auth.choose_one(&uri.auth)?;
186    let uri = uri.with_default_parts();
187    let mut request = Request::head(&uri.uri).body(Body::empty()).unwrap();
188
189    if let Some(auth) = auth {
190        auth.apply(&mut request);
191    }
192
193    let response = client.send(request).await?;
194
195    match response.status() {
196        StatusCode::OK => Ok(()),
197        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
198    }
199}
200
201pub(super) fn validate_headers(
202    headers: &BTreeMap<String, String>,
203    configures_auth: bool,
204) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
205    let headers = crate::sinks::util::http::validate_headers(headers)?;
206
207    for name in headers.keys() {
208        if configures_auth && name.inner() == AUTHORIZATION {
209            return Err("Authorization header can not be used with defined auth options".into());
210        }
211    }
212
213    Ok(headers)
214}
215
216pub(super) fn validate_payload_wrapper(
217    payload_prefix: &str,
218    payload_suffix: &str,
219    encoder: &Encoder<Framer>,
220) -> crate::Result<(String, String)> {
221    let payload = [payload_prefix, "{}", payload_suffix].join("");
222    match (
223        encoder.serializer(),
224        encoder.framer(),
225        serde_json::from_str::<serde_json::Value>(&payload),
226    ) {
227        (
228            Serializer::Json(_),
229            Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
230            Err(_),
231        ) => Err("Payload prefix and suffix wrapper must produce a valid JSON object.".into()),
232        _ => Ok((payload_prefix.to_owned(), payload_suffix.to_owned())),
233    }
234}
235
236#[async_trait]
237#[typetag::serde(name = "http")]
238impl SinkConfig for HttpSinkConfig {
239    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
240        let batch_settings = self.batch.validate()?.into_batcher_settings()?;
241
242        let encoder = self.build_encoder()?;
243        let transformer = self.encoding.transformer();
244
245        let request = self.request.clone();
246
247        validate_headers(&request.headers, self.auth.is_some())?;
248        let (static_headers, template_headers) = request.split_headers();
249
250        let (payload_prefix, payload_suffix) =
251            validate_payload_wrapper(&self.payload_prefix, &self.payload_suffix, &encoder)?;
252
253        let client = self.build_http_client(&cx)?;
254
255        let healthcheck = match cx.healthcheck.uri {
256            Some(healthcheck_uri) => {
257                healthcheck(healthcheck_uri, self.auth.clone(), client.clone()).boxed()
258            }
259            None => future::ok(()).boxed(),
260        };
261
262        let content_type = {
263            use Framer::*;
264            use Serializer::*;
265            match (encoder.serializer(), encoder.framer()) {
266                (RawMessage(_) | Text(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()),
267                (Json(_), NewlineDelimited(_)) => Some(CONTENT_TYPE_NDJSON.to_owned()),
268                (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
269                    Some(CONTENT_TYPE_JSON.to_owned())
270                }
271                #[cfg(feature = "codecs-opentelemetry")]
272                (Otlp(_), _) => Some("application/x-protobuf".to_owned()),
273                _ => None,
274            }
275        };
276
277        let request_builder = HttpRequestBuilder {
278            encoder: HttpEncoder::new(encoder, transformer, payload_prefix, payload_suffix),
279            compression: self.compression,
280        };
281
282        let content_encoding = self.compression.is_compressed().then(|| {
283            self.compression
284                .content_encoding()
285                .expect("Encoding should be specified for compression.")
286                .to_string()
287        });
288
289        let converted_static_headers = static_headers
290            .into_iter()
291            .map(|(name, value)| -> crate::Result<_> {
292                let header_name =
293                    HeaderName::from_bytes(name.as_bytes()).map(OrderedHeaderName::from)?;
294                let header_value = HeaderValue::try_from(value)?;
295                Ok((header_name, header_value))
296            })
297            .collect::<Result<BTreeMap<_, _>, _>>()?;
298
299        let http_sink_request_builder = HttpSinkRequestBuilder::new(
300            self.method,
301            self.auth.clone(),
302            converted_static_headers,
303            content_type,
304            content_encoding,
305        );
306
307        let service = match &self.auth {
308            #[cfg(feature = "aws-core")]
309            Some(Auth::Aws { auth, service }) => {
310                let default_region = crate::aws::region_provider(&ProxyConfig::default(), None)?
311                    .region()
312                    .await;
313                let region = (match &auth {
314                    AwsAuthentication::AccessKey { region, .. } => region.clone(),
315                    AwsAuthentication::File { .. } => None,
316                    AwsAuthentication::Role { region, .. } => region.clone(),
317                    AwsAuthentication::Default { region, .. } => region.clone(),
318                })
319                .map_or(default_region, |r| Some(Region::new(r.to_string())))
320                .expect("Region must be specified");
321
322                HttpService::new_with_sig_v4(
323                    client,
324                    http_sink_request_builder,
325                    SigV4Config {
326                        shared_credentials_provider: auth
327                            .credentials_provider(region.clone(), &ProxyConfig::default(), None)
328                            .await?,
329                        region: region.clone(),
330                        service: service.clone(),
331                    },
332                )
333            }
334            _ => HttpService::new(client, http_sink_request_builder),
335        };
336
337        let request_limits = self.request.tower.into_settings();
338
339        let service = ServiceBuilder::new()
340            .settings(
341                request_limits,
342                http_response_retry_logic(self.retry_strategy.clone()),
343            )
344            .service(service);
345
346        let sink = HttpSink::new(
347            service,
348            self.uri.clone(),
349            template_headers,
350            batch_settings,
351            request_builder,
352        );
353
354        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
355    }
356
357    fn input(&self) -> Input {
358        Input::new(self.encoding.config().1.input_type())
359    }
360
361    fn files_to_watch(&self) -> Vec<&PathBuf> {
362        let mut files = Vec::new();
363        if let Some(tls) = &self.tls {
364            if let Some(crt_file) = &tls.crt_file {
365                files.push(crt_file)
366            }
367            if let Some(key_file) = &tls.key_file {
368                files.push(key_file)
369            }
370        };
371        files
372    }
373
374    fn acknowledgements(&self) -> &AcknowledgementsConfig {
375        &self.acknowledgements
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use vector_lib::codecs::encoding::format::JsonSerializerOptions;
382
383    use super::*;
384    use crate::components::validation::prelude::*;
385
386    impl ValidatableComponent for HttpSinkConfig {
387        fn validation_configuration() -> ValidationConfiguration {
388            use std::str::FromStr;
389
390            use vector_lib::{
391                codecs::{JsonSerializerConfig, MetricTagValues},
392                config::LogNamespace,
393            };
394
395            let endpoint = "http://127.0.0.1:9000/endpoint";
396            let uri = UriSerde::from_str(endpoint).expect("should never fail to parse");
397
398            let config = HttpSinkConfig {
399                uri: Template::try_from(endpoint).expect("should never fail to parse"),
400                method: HttpMethod::Post,
401                encoding: EncodingConfigWithFraming::new(
402                    None,
403                    JsonSerializerConfig::new(
404                        MetricTagValues::Full,
405                        JsonSerializerOptions::default(),
406                    )
407                    .into(),
408                    Transformer::default(),
409                ),
410                auth: None,
411                compression: Compression::default(),
412                batch: BatchConfig::default(),
413                request: RequestConfig::default(),
414                tls: None,
415                acknowledgements: AcknowledgementsConfig::default(),
416                payload_prefix: String::new(),
417                payload_suffix: String::new(),
418                retry_strategy: RetryStrategy::default(),
419            };
420
421            let external_resource = ExternalResource::new(
422                ResourceDirection::Push,
423                HttpResourceConfig::from_parts(uri.uri, Some(config.method.into())),
424                config.encoding.clone(),
425            );
426
427            ValidationConfiguration::from_sink(
428                Self::NAME,
429                LogNamespace::Legacy,
430                vec![ComponentTestCaseConfig::from_sink(
431                    config,
432                    None,
433                    Some(external_resource),
434                )],
435            )
436        }
437    }
438
439    register_validatable_component!(HttpSinkConfig);
440}