1use std::{collections::BTreeMap, path::PathBuf};
4
5#[cfg(feature = "aws-core")]
6use aws_config::meta::region::ProvideRegion;
7#[cfg(feature = "aws-core")]
8use aws_types::region::Region;
9use http::{HeaderName, HeaderValue, Method, Request, StatusCode, header::AUTHORIZATION};
10use hyper::Body;
11use vector_lib::codecs::{
12 CharacterDelimitedEncoder,
13 encoding::{Framer, Serializer},
14};
15#[cfg(feature = "aws-core")]
16use vector_lib::config::proxy::ProxyConfig;
17
18use super::{
19 encoder::HttpEncoder, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder,
20 sink::HttpSink,
21};
22#[cfg(feature = "aws-core")]
23use crate::aws::AwsAuthentication;
24#[cfg(feature = "aws-core")]
25use crate::sinks::util::http::SigV4Config;
26use crate::{
27 codecs::{EncodingConfigWithFraming, SinkType},
28 http::{Auth, HttpClient, MaybeAuth},
29 sinks::{
30 prelude::*,
31 util::{
32 RealtimeSizeBasedDefaultBatchSettings, UriSerde,
33 http::{
34 HttpService, OrderedHeaderName, RequestConfig, RetryStrategy,
35 http_response_retry_logic,
36 },
37 },
38 },
39};
40
41const CONTENT_TYPE_TEXT: &str = "text/plain";
42const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson";
43const CONTENT_TYPE_JSON: &str = "application/json";
44
45#[configurable_component(sink("http", "Deliver observability event data to an HTTP server."))]
47#[derive(Clone, Debug)]
48#[serde(deny_unknown_fields)]
49pub struct HttpSinkConfig {
50 #[configurable(metadata(docs::examples = "https://10.22.212.22:9000/endpoint"))]
54 pub uri: Template,
55
56 #[serde(default)]
58 pub method: HttpMethod,
59
60 #[configurable(derived)]
61 pub auth: Option<Auth>,
62
63 #[configurable(derived)]
64 #[serde(default)]
65 pub compression: Compression,
66
67 #[serde(flatten)]
68 pub encoding: EncodingConfigWithFraming,
69
70 #[configurable(metadata(docs::examples = "{\"data\":"))]
76 #[serde(default)]
77 pub payload_prefix: String,
78
79 #[configurable(metadata(docs::examples = "}"))]
85 #[serde(default)]
86 pub payload_suffix: String,
87
88 #[configurable(derived)]
89 #[serde(default)]
90 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
91
92 #[configurable(derived)]
93 #[serde(default)]
94 pub request: RequestConfig,
95
96 #[configurable(derived)]
97 pub tls: Option<TlsConfig>,
98
99 #[configurable(derived)]
100 #[serde(
101 default,
102 deserialize_with = "crate::serde::bool_or_struct",
103 skip_serializing_if = "crate::serde::is_default"
104 )]
105 pub acknowledgements: AcknowledgementsConfig,
106
107 #[configurable(derived)]
108 #[serde(default)]
109 pub retry_strategy: RetryStrategy,
110}
111
112#[configurable_component]
118#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
119#[serde(rename_all = "snake_case")]
120pub enum HttpMethod {
121 Get,
123
124 Head,
126
127 #[default]
129 Post,
130
131 Put,
133
134 Delete,
136
137 Options,
139
140 Trace,
142
143 Patch,
145}
146
147impl From<HttpMethod> for Method {
148 fn from(http_method: HttpMethod) -> Self {
149 match http_method {
150 HttpMethod::Head => Self::HEAD,
151 HttpMethod::Get => Self::GET,
152 HttpMethod::Post => Self::POST,
153 HttpMethod::Put => Self::PUT,
154 HttpMethod::Patch => Self::PATCH,
155 HttpMethod::Delete => Self::DELETE,
156 HttpMethod::Options => Self::OPTIONS,
157 HttpMethod::Trace => Self::TRACE,
158 }
159 }
160}
161
162impl HttpSinkConfig {
163 fn build_http_client(&self, cx: &SinkContext) -> crate::Result<HttpClient> {
164 let tls = TlsSettings::from_options(self.tls.as_ref())?;
165 Ok(HttpClient::new(tls, cx.proxy())?)
166 }
167
168 pub(super) fn build_encoder(&self) -> crate::Result<Encoder<Framer>> {
169 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
170 Ok(Encoder::<Framer>::new(framer, serializer))
171 }
172}
173
174impl GenerateConfig for HttpSinkConfig {
175 fn generate_config() -> toml::Value {
176 toml::from_str(
177 r#"uri = "https://10.22.212.22:9000/endpoint"
178 encoding.codec = "json""#,
179 )
180 .unwrap()
181 }
182}
183
184async fn healthcheck(uri: UriSerde, auth: Option<Auth>, client: HttpClient) -> crate::Result<()> {
185 let auth = auth.choose_one(&uri.auth)?;
186 let uri = uri.with_default_parts();
187 let mut request = Request::head(&uri.uri).body(Body::empty()).unwrap();
188
189 if let Some(auth) = auth {
190 auth.apply(&mut request);
191 }
192
193 let response = client.send(request).await?;
194
195 match response.status() {
196 StatusCode::OK => Ok(()),
197 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
198 }
199}
200
201pub(super) fn validate_headers(
202 headers: &BTreeMap<String, String>,
203 configures_auth: bool,
204) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
205 let headers = crate::sinks::util::http::validate_headers(headers)?;
206
207 for name in headers.keys() {
208 if configures_auth && name.inner() == AUTHORIZATION {
209 return Err("Authorization header can not be used with defined auth options".into());
210 }
211 }
212
213 Ok(headers)
214}
215
216pub(super) fn validate_payload_wrapper(
217 payload_prefix: &str,
218 payload_suffix: &str,
219 encoder: &Encoder<Framer>,
220) -> crate::Result<(String, String)> {
221 let payload = [payload_prefix, "{}", payload_suffix].join("");
222 match (
223 encoder.serializer(),
224 encoder.framer(),
225 serde_json::from_str::<serde_json::Value>(&payload),
226 ) {
227 (
228 Serializer::Json(_),
229 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
230 Err(_),
231 ) => Err("Payload prefix and suffix wrapper must produce a valid JSON object.".into()),
232 _ => Ok((payload_prefix.to_owned(), payload_suffix.to_owned())),
233 }
234}
235
236#[async_trait]
237#[typetag::serde(name = "http")]
238impl SinkConfig for HttpSinkConfig {
239 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
240 let batch_settings = self.batch.validate()?.into_batcher_settings()?;
241
242 let encoder = self.build_encoder()?;
243 let transformer = self.encoding.transformer();
244
245 let request = self.request.clone();
246
247 validate_headers(&request.headers, self.auth.is_some())?;
248 let (static_headers, template_headers) = request.split_headers();
249
250 let (payload_prefix, payload_suffix) =
251 validate_payload_wrapper(&self.payload_prefix, &self.payload_suffix, &encoder)?;
252
253 let client = self.build_http_client(&cx)?;
254
255 let healthcheck = match cx.healthcheck.uri {
256 Some(healthcheck_uri) => {
257 healthcheck(healthcheck_uri, self.auth.clone(), client.clone()).boxed()
258 }
259 None => future::ok(()).boxed(),
260 };
261
262 let content_type = {
263 use Framer::*;
264 use Serializer::*;
265 match (encoder.serializer(), encoder.framer()) {
266 (RawMessage(_) | Text(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()),
267 (Json(_), NewlineDelimited(_)) => Some(CONTENT_TYPE_NDJSON.to_owned()),
268 (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
269 Some(CONTENT_TYPE_JSON.to_owned())
270 }
271 #[cfg(feature = "codecs-opentelemetry")]
272 (Otlp(_), _) => Some("application/x-protobuf".to_owned()),
273 _ => None,
274 }
275 };
276
277 let request_builder = HttpRequestBuilder {
278 encoder: HttpEncoder::new(encoder, transformer, payload_prefix, payload_suffix),
279 compression: self.compression,
280 };
281
282 let content_encoding = self.compression.is_compressed().then(|| {
283 self.compression
284 .content_encoding()
285 .expect("Encoding should be specified for compression.")
286 .to_string()
287 });
288
289 let converted_static_headers = static_headers
290 .into_iter()
291 .map(|(name, value)| -> crate::Result<_> {
292 let header_name =
293 HeaderName::from_bytes(name.as_bytes()).map(OrderedHeaderName::from)?;
294 let header_value = HeaderValue::try_from(value)?;
295 Ok((header_name, header_value))
296 })
297 .collect::<Result<BTreeMap<_, _>, _>>()?;
298
299 let http_sink_request_builder = HttpSinkRequestBuilder::new(
300 self.method,
301 self.auth.clone(),
302 converted_static_headers,
303 content_type,
304 content_encoding,
305 );
306
307 let service = match &self.auth {
308 #[cfg(feature = "aws-core")]
309 Some(Auth::Aws { auth, service }) => {
310 let default_region = crate::aws::region_provider(&ProxyConfig::default(), None)?
311 .region()
312 .await;
313 let region = (match &auth {
314 AwsAuthentication::AccessKey { region, .. } => region.clone(),
315 AwsAuthentication::File { .. } => None,
316 AwsAuthentication::Role { region, .. } => region.clone(),
317 AwsAuthentication::Default { region, .. } => region.clone(),
318 })
319 .map_or(default_region, |r| Some(Region::new(r.to_string())))
320 .expect("Region must be specified");
321
322 HttpService::new_with_sig_v4(
323 client,
324 http_sink_request_builder,
325 SigV4Config {
326 shared_credentials_provider: auth
327 .credentials_provider(region.clone(), &ProxyConfig::default(), None)
328 .await?,
329 region: region.clone(),
330 service: service.clone(),
331 },
332 )
333 }
334 _ => HttpService::new(client, http_sink_request_builder),
335 };
336
337 let request_limits = self.request.tower.into_settings();
338
339 let service = ServiceBuilder::new()
340 .settings(
341 request_limits,
342 http_response_retry_logic(self.retry_strategy.clone()),
343 )
344 .service(service);
345
346 let sink = HttpSink::new(
347 service,
348 self.uri.clone(),
349 template_headers,
350 batch_settings,
351 request_builder,
352 );
353
354 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
355 }
356
357 fn input(&self) -> Input {
358 Input::new(self.encoding.config().1.input_type())
359 }
360
361 fn files_to_watch(&self) -> Vec<&PathBuf> {
362 let mut files = Vec::new();
363 if let Some(tls) = &self.tls {
364 if let Some(crt_file) = &tls.crt_file {
365 files.push(crt_file)
366 }
367 if let Some(key_file) = &tls.key_file {
368 files.push(key_file)
369 }
370 };
371 files
372 }
373
374 fn acknowledgements(&self) -> &AcknowledgementsConfig {
375 &self.acknowledgements
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use vector_lib::codecs::encoding::format::JsonSerializerOptions;
382
383 use super::*;
384 use crate::components::validation::prelude::*;
385
386 impl ValidatableComponent for HttpSinkConfig {
387 fn validation_configuration() -> ValidationConfiguration {
388 use std::str::FromStr;
389
390 use vector_lib::{
391 codecs::{JsonSerializerConfig, MetricTagValues},
392 config::LogNamespace,
393 };
394
395 let endpoint = "http://127.0.0.1:9000/endpoint";
396 let uri = UriSerde::from_str(endpoint).expect("should never fail to parse");
397
398 let config = HttpSinkConfig {
399 uri: Template::try_from(endpoint).expect("should never fail to parse"),
400 method: HttpMethod::Post,
401 encoding: EncodingConfigWithFraming::new(
402 None,
403 JsonSerializerConfig::new(
404 MetricTagValues::Full,
405 JsonSerializerOptions::default(),
406 )
407 .into(),
408 Transformer::default(),
409 ),
410 auth: None,
411 compression: Compression::default(),
412 batch: BatchConfig::default(),
413 request: RequestConfig::default(),
414 tls: None,
415 acknowledgements: AcknowledgementsConfig::default(),
416 payload_prefix: String::new(),
417 payload_suffix: String::new(),
418 retry_strategy: RetryStrategy::default(),
419 };
420
421 let external_resource = ExternalResource::new(
422 ResourceDirection::Push,
423 HttpResourceConfig::from_parts(uri.uri, Some(config.method.into())),
424 config.encoding.clone(),
425 );
426
427 ValidationConfiguration::from_sink(
428 Self::NAME,
429 LogNamespace::Legacy,
430 vec![ComponentTestCaseConfig::from_sink(
431 config,
432 None,
433 Some(external_resource),
434 )],
435 )
436 }
437 }
438
439 register_validatable_component!(HttpSinkConfig);
440}