1#[cfg(feature = "aws-core")]
2use aws_credential_types::provider::SharedCredentialsProvider;
3#[cfg(feature = "aws-core")]
4use aws_types::region::Region;
5use bytes::{Buf, Bytes};
6use futures::{Sink, future::BoxFuture};
7use headers::HeaderName;
8use http::{HeaderValue, Request, Response, StatusCode, header};
9use http_body::Body as _;
10use tracing::debug;
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13pub struct OrderedHeaderName(HeaderName);
14
15impl OrderedHeaderName {
16 pub const fn new(header_name: HeaderName) -> Self {
17 Self(header_name)
18 }
19
20 pub const fn inner(&self) -> &HeaderName {
21 &self.0
22 }
23}
24
25impl From<HeaderName> for OrderedHeaderName {
26 fn from(header_name: HeaderName) -> Self {
27 Self(header_name)
28 }
29}
30
31impl Ord for OrderedHeaderName {
32 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
33 self.0.as_str().cmp(other.0.as_str())
34 }
35}
36
37impl PartialOrd for OrderedHeaderName {
38 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
39 Some(self.cmp(other))
40 }
41}
42use std::{
43 collections::BTreeMap,
44 fmt,
45 future::Future,
46 hash::Hash,
47 marker::PhantomData,
48 pin::Pin,
49 sync::Arc,
50 task::{Context, Poll, ready},
51 time::Duration,
52};
53
54use hyper::Body;
55use pin_project::pin_project;
56use snafu::{ResultExt, Snafu};
57use tower::{Service, ServiceBuilder};
58use tower_http::decompression::DecompressionLayer;
59use vector_lib::{
60 ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
61 stream::batcher::limiter::ItemBatchSize,
62};
63
64use super::{
65 Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig,
66 TowerRequestSettings,
67 retries::{RetryAction, RetryLogic},
68 sink::{self, Response as _},
69 uri,
70};
71#[cfg(feature = "aws-core")]
72use crate::aws::sign_request;
73use crate::{
74 event::Event,
75 http::{HttpClient, HttpError},
76 internal_events::{EndpointBytesSent, SinkRequestBuildError},
77 sinks::prelude::*,
78 template::Template,
79};
80
81pub trait HttpEventEncoder<Output> {
82 fn encode_event(&mut self, event: Event) -> Option<Output>;
84}
85
86pub trait HttpSink: Send + Sync + 'static {
87 type Input;
88 type Output;
89 type Encoder: HttpEventEncoder<Self::Input>;
90
91 fn build_encoder(&self) -> Self::Encoder;
92 fn build_request(
93 &self,
94 events: Self::Output,
95 ) -> impl Future<Output = crate::Result<http::Request<Bytes>>> + Send;
96}
97
98#[pin_project]
114pub struct BatchedHttpSink<T, B, RL = HttpRetryLogic<<B as Batch>::Output>>
115where
116 B: Batch,
117 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
118 T: HttpSink<Input = B::Input, Output = B::Output>,
119 RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
120 + Send
121 + 'static,
122{
123 sink: Arc<T>,
124 #[pin]
125 inner: TowerBatchedSink<
126 HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
127 B,
128 RL,
129 >,
130 encoder: T::Encoder,
131 slot: Option<EncodedEvent<B::Input>>,
135}
136
137impl<T, B> BatchedHttpSink<T, B>
138where
139 B: Batch,
140 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
141 T: HttpSink<Input = B::Input, Output = B::Output>,
142{
143 pub fn new(
144 sink: T,
145 batch: B,
146 request_settings: TowerRequestSettings,
147 batch_timeout: Duration,
148 client: HttpClient,
149 ) -> Self {
150 Self::with_logic(
151 sink,
152 batch,
153 HttpRetryLogic::default(),
154 request_settings,
155 batch_timeout,
156 client,
157 )
158 }
159}
160
161impl<T, B, RL> BatchedHttpSink<T, B, RL>
162where
163 B: Batch,
164 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
165 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
166 + Send
167 + 'static,
168 T: HttpSink<Input = B::Input, Output = B::Output>,
169{
170 pub fn with_logic(
171 sink: T,
172 batch: B,
173 retry_logic: RL,
174 request_settings: TowerRequestSettings,
175 batch_timeout: Duration,
176 client: HttpClient,
177 ) -> Self {
178 let sink = Arc::new(sink);
179
180 let sink1 = Arc::clone(&sink);
181 let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
182 let sink = Arc::clone(&sink1);
183 Box::pin(async move { sink.build_request(b).await })
184 };
185
186 let svc = HttpBatchService::new(client, request_builder);
187 let inner = request_settings.batch_sink(retry_logic, svc, batch, batch_timeout);
188 let encoder = sink.build_encoder();
189
190 Self {
191 sink,
192 inner,
193 encoder,
194 slot: None,
195 }
196 }
197}
198
199impl<T, B, RL> Sink<Event> for BatchedHttpSink<T, B, RL>
200where
201 B: Batch,
202 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
203 T: HttpSink<Input = B::Input, Output = B::Output>,
204 RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
205 + Send
206 + 'static,
207{
208 type Error = crate::Error;
209
210 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
211 if self.slot.is_some() {
212 match self.as_mut().poll_flush(cx) {
213 Poll::Ready(Ok(())) => {}
214 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
215 Poll::Pending => {
216 if self.slot.is_some() {
217 return Poll::Pending;
218 }
219 }
220 }
221 }
222
223 Poll::Ready(Ok(()))
224 }
225
226 fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
227 let byte_size = event.size_of();
228 let json_byte_size = event.estimated_json_encoded_size_of();
229 let finalizers = event.metadata_mut().take_finalizers();
230 if let Some(item) = self.encoder.encode_event(event) {
231 *self.project().slot = Some(EncodedEvent {
232 item,
233 finalizers,
234 byte_size,
235 json_byte_size,
236 });
237 }
238
239 Ok(())
240 }
241
242 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
243 let mut this = self.project();
244 if this.slot.is_some() {
245 ready!(this.inner.as_mut().poll_ready(cx))?;
246 this.inner.as_mut().start_send(this.slot.take().unwrap())?;
247 }
248
249 this.inner.poll_flush(cx)
250 }
251
252 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
253 ready!(self.as_mut().poll_flush(cx))?;
254 self.project().inner.poll_close(cx)
255 }
256}
257
258#[pin_project]
260pub struct PartitionHttpSink<T, B, K, RL = HttpRetryLogic<<B as Batch>::Output>>
261where
262 B: Batch,
263 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
264 B::Input: Partition<K>,
265 K: Hash + Eq + Clone + Send + 'static,
266 T: HttpSink<Input = B::Input, Output = B::Output>,
267 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
268{
269 sink: Arc<T>,
270 #[pin]
271 inner: TowerPartitionSink<
272 HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
273 B,
274 RL,
275 K,
276 >,
277 encoder: T::Encoder,
278 slot: Option<EncodedEvent<B::Input>>,
279}
280
281impl<T, B, K> PartitionHttpSink<T, B, K, HttpRetryLogic<<B as Batch>::Output>>
282where
283 B: Batch,
284 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
285 B::Input: Partition<K>,
286 K: Hash + Eq + Clone + Send + 'static,
287 T: HttpSink<Input = B::Input, Output = B::Output>,
288{
289 pub fn new(
290 sink: T,
291 batch: B,
292 request_settings: TowerRequestSettings,
293 batch_timeout: Duration,
294 client: HttpClient,
295 ) -> Self {
296 Self::with_retry_logic(
297 sink,
298 batch,
299 HttpRetryLogic::default(),
300 request_settings,
301 batch_timeout,
302 client,
303 )
304 }
305}
306
307impl<T, B, K, RL> PartitionHttpSink<T, B, K, RL>
308where
309 B: Batch,
310 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
311 B::Input: Partition<K>,
312 K: Hash + Eq + Clone + Send + 'static,
313 T: HttpSink<Input = B::Input, Output = B::Output>,
314 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
315 + Send
316 + 'static,
317{
318 pub fn with_retry_logic(
319 sink: T,
320 batch: B,
321 retry_logic: RL,
322 request_settings: TowerRequestSettings,
323 batch_timeout: Duration,
324 client: HttpClient,
325 ) -> Self {
326 let sink = Arc::new(sink);
327
328 let sink1 = Arc::clone(&sink);
329 let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
330 let sink = Arc::clone(&sink1);
331 Box::pin(async move { sink.build_request(b).await })
332 };
333
334 let svc = HttpBatchService::new(client, request_builder);
335 let inner = request_settings.partition_sink(retry_logic, svc, batch, batch_timeout);
336 let encoder = sink.build_encoder();
337
338 Self {
339 sink,
340 inner,
341 encoder,
342 slot: None,
343 }
344 }
345
346 pub fn ordered(mut self) -> Self {
348 self.inner.ordered();
349 self
350 }
351}
352
353impl<T, B, K, RL> Sink<Event> for PartitionHttpSink<T, B, K, RL>
354where
355 B: Batch,
356 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
357 B::Input: Partition<K>,
358 K: Hash + Eq + Clone + Send + 'static,
359 T: HttpSink<Input = B::Input, Output = B::Output>,
360 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
361{
362 type Error = crate::Error;
363
364 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
365 if self.slot.is_some() {
366 match self.as_mut().poll_flush(cx) {
367 Poll::Ready(Ok(())) => {}
368 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
369 Poll::Pending => {
370 if self.slot.is_some() {
371 return Poll::Pending;
372 }
373 }
374 }
375 }
376
377 Poll::Ready(Ok(()))
378 }
379
380 fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
381 let finalizers = event.metadata_mut().take_finalizers();
382 let byte_size = event.size_of();
383 let json_byte_size = event.estimated_json_encoded_size_of();
384
385 if let Some(item) = self.encoder.encode_event(event) {
386 *self.project().slot = Some(EncodedEvent {
387 item,
388 finalizers,
389 byte_size,
390 json_byte_size,
391 });
392 }
393
394 Ok(())
395 }
396
397 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
398 let mut this = self.project();
399 if this.slot.is_some() {
400 ready!(this.inner.as_mut().poll_ready(cx))?;
401 this.inner.as_mut().start_send(this.slot.take().unwrap())?;
402 }
403
404 this.inner.poll_flush(cx)
405 }
406
407 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
408 ready!(self.as_mut().poll_flush(cx))?;
409 self.project().inner.poll_close(cx)
410 }
411}
412
413#[cfg(feature = "aws-core")]
414#[derive(Clone)]
415pub struct SigV4Config {
416 pub(crate) shared_credentials_provider: SharedCredentialsProvider,
417 pub(crate) region: Region,
418 pub(crate) service: String,
419}
420
421pub struct HttpBatchService<F, B = Bytes> {
428 inner: HttpClient<Body>,
429 request_builder: Arc<dyn Fn(B) -> F + Send + Sync>,
430 #[cfg(feature = "aws-core")]
431 sig_v4_config: Option<SigV4Config>,
432}
433
434impl<F, B> HttpBatchService<F, B> {
435 pub fn new(
436 inner: HttpClient,
437 request_builder: impl Fn(B) -> F + Send + Sync + 'static,
438 ) -> Self {
439 HttpBatchService {
440 inner,
441 request_builder: Arc::new(Box::new(request_builder)),
442 #[cfg(feature = "aws-core")]
443 sig_v4_config: None,
444 }
445 }
446
447 #[cfg(feature = "aws-core")]
448 pub fn new_with_sig_v4(
449 inner: HttpClient,
450 request_builder: impl Fn(B) -> F + Send + Sync + 'static,
451 sig_v4_config: SigV4Config,
452 ) -> Self {
453 HttpBatchService {
454 inner,
455 request_builder: Arc::new(Box::new(request_builder)),
456 sig_v4_config: Some(sig_v4_config),
457 }
458 }
459}
460
461impl<F, B> Service<B> for HttpBatchService<F, B>
462where
463 F: Future<Output = crate::Result<hyper::Request<Bytes>>> + Send + 'static,
464 B: ByteSizeOf + Send + 'static,
465{
466 type Response = http::Response<Bytes>;
467 type Error = crate::Error;
468 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
469
470 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
471 Poll::Ready(Ok(()))
472 }
473
474 fn call(&mut self, body: B) -> Self::Future {
475 let request_builder = Arc::clone(&self.request_builder);
476 #[cfg(feature = "aws-core")]
477 let sig_v4_config = self.sig_v4_config.clone();
478 let http_client = self.inner.clone();
479
480 Box::pin(async move {
481 let request = request_builder(body).await.inspect_err(|error| {
482 emit!(SinkRequestBuildError { error });
483 })?;
484
485 #[cfg(feature = "aws-core")]
486 let request = match sig_v4_config {
487 None => request,
488 Some(sig_v4_config) => {
489 let mut signed_request = request;
490 sign_request(
491 sig_v4_config.service.as_str(),
492 &mut signed_request,
493 &sig_v4_config.shared_credentials_provider,
494 Some(&sig_v4_config.region),
495 false,
496 )
497 .await?;
498
499 signed_request
500 }
501 };
502 let byte_size = request.body().len();
503 let request = request.map(Body::from);
504 let (protocol, endpoint) = uri::protocol_endpoint(request.uri().clone());
505
506 let mut decompression_service = ServiceBuilder::new()
507 .layer(DecompressionLayer::new())
508 .service(http_client);
509
510 let response = decompression_service.call(request).await?;
514
515 if response.status().is_success() {
516 emit!(EndpointBytesSent {
517 byte_size,
518 protocol: &protocol,
519 endpoint: &endpoint
520 });
521 }
522
523 let (parts, body) = response.into_parts();
524 let mut body = body.collect().await?.aggregate();
525 Ok(hyper::Response::from_parts(
526 parts,
527 body.copy_to_bytes(body.remaining()),
528 ))
529 })
530 }
531}
532
533impl<F, B> Clone for HttpBatchService<F, B> {
534 fn clone(&self) -> Self {
535 Self {
536 inner: self.inner.clone(),
537 request_builder: Arc::clone(&self.request_builder),
538 #[cfg(feature = "aws-core")]
539 sig_v4_config: self.sig_v4_config.clone(),
540 }
541 }
542}
543
544impl<T: fmt::Debug> sink::Response for http::Response<T> {
545 fn is_successful(&self) -> bool {
546 self.status().is_success()
547 }
548
549 fn is_transient(&self) -> bool {
550 self.status().is_server_error()
551 }
552}
553
554mod status_code_vec {
556 use http::StatusCode;
557 use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error};
558
559 pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<StatusCode>, D::Error>
561 where
562 D: Deserializer<'de>,
563 {
564 Vec::<u16>::deserialize(deserializer)?
565 .into_iter()
566 .map(StatusCode::from_u16)
567 .collect::<Result<Vec<_>, _>>()
568 .map_err(Error::custom)
569 }
570
571 pub fn serialize<S>(status_codes: &[StatusCode], serializer: S) -> Result<S::Ok, S::Error>
573 where
574 S: Serializer,
575 {
576 status_codes
577 .iter()
578 .map(StatusCode::as_u16)
579 .collect::<Vec<_>>()
580 .serialize(serializer)
581 }
582}
583
584#[configurable_component]
590#[derive(Debug, Clone, Default, PartialEq)]
591#[serde(tag = "type", rename_all = "snake_case")]
592#[configurable(metadata(docs::enum_tag_description = "The retry strategy enum."))]
593pub enum RetryStrategy {
594 None,
596
597 #[default]
599 Default,
600
601 All,
603
604 Custom {
606 #[serde(with = "status_code_vec")]
608 status_codes: Vec<StatusCode>,
609 },
610}
611
612impl RetryStrategy {
613 #[must_use]
615 const fn name(&self) -> &str {
616 match self {
617 Self::None => "Never retry strategy",
618 Self::Default => "Default retry strategy",
619 Self::All => "Retry all strategy",
620 Self::Custom { .. } => "Custom retry strategy",
621 }
622 }
623
624 #[must_use]
635 pub fn retry_action<Req>(&self, status: http::StatusCode) -> RetryAction<Req> {
636 if status.is_success() {
637 return RetryAction::Successful;
638 }
639
640 let reason = format!(
641 "{}: {}",
642 self.name(),
643 status.canonical_reason().unwrap_or_else(|| status.as_str())
644 )
645 .into();
646
647 match self {
648 Self::None => RetryAction::DontRetry(reason),
649 Self::Default => match status {
650 StatusCode::TOO_MANY_REQUESTS | StatusCode::REQUEST_TIMEOUT => {
651 RetryAction::Retry(reason)
652 }
653 StatusCode::NOT_IMPLEMENTED => RetryAction::DontRetry(reason),
654 _ => {
655 if status.is_server_error() {
656 RetryAction::Retry(reason)
657 } else {
658 RetryAction::DontRetry(reason)
659 }
660 }
661 },
662 Self::All => RetryAction::Retry(reason),
663 Self::Custom { status_codes } => {
664 if status_codes.contains(&status) {
665 RetryAction::Retry(reason)
666 } else {
667 RetryAction::DontRetry(reason)
668 }
669 }
670 }
671 }
672}
673
674#[derive(Debug, Clone)]
675pub struct HttpRetryLogic<Req> {
676 request: PhantomData<Req>,
677 retry_strategy: RetryStrategy,
678}
679
680impl<Req> Default for HttpRetryLogic<Req> {
681 fn default() -> Self {
682 Self {
683 request: PhantomData,
684 retry_strategy: RetryStrategy::Default,
685 }
686 }
687}
688
689impl<Req: Clone + Send + Sync + 'static> RetryLogic for HttpRetryLogic<Req> {
690 type Error = HttpError;
691 type Request = Req;
692 type Response = hyper::Response<Bytes>;
693
694 fn is_retriable_error(&self, error: &Self::Error) -> bool {
695 if self.retry_strategy == RetryStrategy::None {
696 false
697 } else {
698 error.is_retriable()
699 }
700 }
701
702 fn is_retriable_timeout(&self) -> bool {
703 self.retry_strategy != RetryStrategy::None
704 }
705
706 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
707 let status = response.status();
708 if !status.is_success() {
709 debug!(
710 message = "HTTP response.",
711 %status,
712 body = %String::from_utf8_lossy(response.body()),
713 );
714 }
715 self.retry_strategy.retry_action(status)
716 }
717}
718
719#[derive(Debug)]
722pub struct HttpStatusRetryLogic<F, Req, Res> {
723 func: F,
724 request: PhantomData<Req>,
725 response: PhantomData<Res>,
726 retry_strategy: RetryStrategy,
727}
728
729impl<F, Req, Res> HttpStatusRetryLogic<F, Req, Res>
730where
731 F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
732 Req: Send + Sync + 'static,
733 Res: Send + Sync + 'static,
734{
735 pub const fn new(func: F, retry_strategy: RetryStrategy) -> HttpStatusRetryLogic<F, Req, Res> {
736 HttpStatusRetryLogic {
737 func,
738 request: PhantomData,
739 response: PhantomData,
740 retry_strategy,
741 }
742 }
743}
744
745impl<F, Req, Res> RetryLogic for HttpStatusRetryLogic<F, Req, Res>
746where
747 F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
748 Req: Send + Sync + 'static,
749 Res: Send + Sync + 'static,
750{
751 type Error = HttpError;
752 type Request = Req;
753 type Response = Res;
754
755 fn is_retriable_error(&self, error: &Self::Error) -> bool {
756 if self.retry_strategy == RetryStrategy::None {
757 false
758 } else {
759 error.is_retriable()
760 }
761 }
762
763 fn is_retriable_timeout(&self) -> bool {
764 self.retry_strategy != RetryStrategy::None
765 }
766
767 fn should_retry_response(&self, response: &Res) -> RetryAction<Req> {
768 let status = (self.func)(response);
769 self.retry_strategy.retry_action(status)
770 }
771}
772
773impl<F, Req, Res> Clone for HttpStatusRetryLogic<F, Req, Res>
774where
775 F: Clone,
776{
777 fn clone(&self) -> Self {
778 Self {
779 func: self.func.clone(),
780 request: PhantomData,
781 response: PhantomData,
782 retry_strategy: self.retry_strategy.clone(),
783 }
784 }
785}
786
787#[configurable_component]
789#[derive(Clone, Debug, Default)]
790pub struct RequestConfig {
791 #[serde(flatten)]
792 pub tower: TowerRequestConfig,
793
794 #[serde(default)]
796 #[configurable(metadata(
797 docs::additional_props_description = "An HTTP request header and its value. Both header names and values support templating with event data."
798 ))]
799 #[configurable(metadata(docs::examples = "headers_examples()"))]
800 pub headers: BTreeMap<String, String>,
801}
802
803fn headers_examples() -> BTreeMap<String, String> {
804 btreemap! {
805 "Accept" => "text/plain",
806 "X-My-Custom-Header" => "A-Value",
807 "X-Event-Level" => "{{level}}",
808 "X-Event-Timestamp" => "{{timestamp}}",
809 }
810}
811
812impl RequestConfig {
813 pub fn split_headers(&self) -> (BTreeMap<String, String>, BTreeMap<String, Template>) {
814 let mut static_headers = BTreeMap::new();
815 let mut template_headers = BTreeMap::new();
816
817 for (name, value) in &self.headers {
818 match Template::try_from(value.as_str()) {
819 Ok(template) if !template.is_dynamic() => {
820 static_headers.insert(name.clone(), value.clone());
821 }
822 Ok(template) => {
823 template_headers.insert(name.clone(), template);
824 }
825 Err(_) => {
826 static_headers.insert(name.clone(), value.clone());
827 }
828 }
829 }
830
831 (static_headers, template_headers)
832 }
833}
834
835#[derive(Debug, Snafu)]
836pub enum HeaderValidationError {
837 #[snafu(display("{}: {}", source, name))]
838 InvalidHeaderName {
839 name: String,
840 source: header::InvalidHeaderName,
841 },
842 #[snafu(display("{}: {}", source, value))]
843 InvalidHeaderValue {
844 value: String,
845 source: header::InvalidHeaderValue,
846 },
847}
848
849pub fn validate_headers(
850 headers: &BTreeMap<String, String>,
851) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
852 let mut validated_headers = BTreeMap::new();
853 for (name, value) in headers {
854 let name = HeaderName::from_bytes(name.as_bytes())
855 .with_context(|_| InvalidHeaderNameSnafu { name })?;
856 let value = HeaderValue::from_bytes(value.as_bytes())
857 .with_context(|_| InvalidHeaderValueSnafu { value })?;
858
859 validated_headers.insert(name.into(), value);
860 }
861
862 Ok(validated_headers)
863}
864
865#[derive(Debug, Clone)]
867pub struct HttpRequest<T: Send> {
868 payload: Bytes,
869 finalizers: EventFinalizers,
870 request_metadata: RequestMetadata,
871 additional_metadata: T,
872}
873
874impl<T: Send> HttpRequest<T> {
875 pub const fn new(
877 payload: Bytes,
878 finalizers: EventFinalizers,
879 request_metadata: RequestMetadata,
880 additional_metadata: T,
881 ) -> Self {
882 Self {
883 payload,
884 finalizers,
885 request_metadata,
886 additional_metadata,
887 }
888 }
889
890 pub const fn get_additional_metadata(&self) -> &T {
891 &self.additional_metadata
892 }
893
894 pub fn take_payload(&mut self) -> Bytes {
895 std::mem::take(&mut self.payload)
896 }
897}
898
899impl<T: Send> Finalizable for HttpRequest<T> {
900 fn take_finalizers(&mut self) -> EventFinalizers {
901 self.finalizers.take_finalizers()
902 }
903}
904
905impl<T: Send> MetaDescriptive for HttpRequest<T> {
906 fn get_metadata(&self) -> &RequestMetadata {
907 &self.request_metadata
908 }
909
910 fn metadata_mut(&mut self) -> &mut RequestMetadata {
911 &mut self.request_metadata
912 }
913}
914
915impl<T: Send> ByteSizeOf for HttpRequest<T> {
916 fn allocated_bytes(&self) -> usize {
917 self.payload.allocated_bytes() + self.finalizers.allocated_bytes()
918 }
919}
920
921pub struct HttpResponse {
923 pub http_response: Response<Bytes>,
924 pub events_byte_size: GroupedCountByteSize,
925 pub raw_byte_size: usize,
926}
927
928impl DriverResponse for HttpResponse {
929 fn event_status(&self) -> EventStatus {
930 if self.http_response.is_successful() {
931 EventStatus::Delivered
932 } else if self.http_response.is_transient() {
933 EventStatus::Errored
934 } else {
935 EventStatus::Rejected
936 }
937 }
938
939 fn events_sent(&self) -> &GroupedCountByteSize {
940 &self.events_byte_size
941 }
942
943 fn bytes_sent(&self) -> Option<usize> {
944 Some(self.raw_byte_size)
945 }
946}
947
948pub fn http_response_retry_logic<Request: Clone + Send + Sync + 'static>(
950 retry_strategy: RetryStrategy,
951) -> HttpStatusRetryLogic<
952 impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static,
953 Request,
954 HttpResponse,
955> {
956 HttpStatusRetryLogic::new(
957 |req: &HttpResponse| req.http_response.status(),
958 retry_strategy,
959 )
960}
961
962#[derive(Default)]
964pub struct HttpJsonBatchSizer;
965
966impl ItemBatchSize<Event> for HttpJsonBatchSizer {
967 fn size(&self, item: &Event) -> usize {
968 item.estimated_json_encoded_size_of().get()
969 }
970}
971
972pub trait HttpServiceRequestBuilder<T: Send> {
974 fn build(&self, request: HttpRequest<T>) -> Result<Request<Bytes>, crate::Error>;
975}
976
977#[derive(Clone)]
979pub struct HttpService<B, T: Send> {
980 batch_service:
981 HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HttpRequest<T>>,
982 _phantom: PhantomData<B>,
983}
984
985impl<B, T: Send + 'static> HttpService<B, T>
986where
987 B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
988{
989 pub fn new(http_client: HttpClient<Body>, http_request_builder: B) -> Self {
990 let http_request_builder = Arc::new(http_request_builder);
991
992 let batch_service = HttpBatchService::new(http_client, move |req: HttpRequest<T>| {
993 let request_builder = Arc::clone(&http_request_builder);
994
995 let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
996 Box::pin(async move { request_builder.build(req) });
997
998 fut
999 });
1000 Self {
1001 batch_service,
1002 _phantom: PhantomData,
1003 }
1004 }
1005
1006 #[cfg(feature = "aws-core")]
1007 pub fn new_with_sig_v4(
1008 http_client: HttpClient<Body>,
1009 http_request_builder: B,
1010 sig_v4_config: SigV4Config,
1011 ) -> Self {
1012 let http_request_builder = Arc::new(http_request_builder);
1013
1014 let batch_service = HttpBatchService::new_with_sig_v4(
1015 http_client,
1016 move |req: HttpRequest<T>| {
1017 let request_builder = Arc::clone(&http_request_builder);
1018
1019 let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
1020 Box::pin(async move { request_builder.build(req) });
1021
1022 fut
1023 },
1024 sig_v4_config,
1025 );
1026 Self {
1027 batch_service,
1028 _phantom: PhantomData,
1029 }
1030 }
1031}
1032
1033impl<B, T: Send + 'static> Service<HttpRequest<T>> for HttpService<B, T>
1034where
1035 B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
1036{
1037 type Response = HttpResponse;
1038 type Error = crate::Error;
1039 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
1040
1041 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1042 Poll::Ready(Ok(()))
1043 }
1044
1045 fn call(&mut self, mut request: HttpRequest<T>) -> Self::Future {
1046 let mut http_service = self.batch_service.clone();
1047
1048 let metadata = std::mem::take(request.metadata_mut());
1051 let raw_byte_size = metadata.request_encoded_size();
1052 let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
1053
1054 Box::pin(async move {
1055 let http_response = http_service.call(request).await?;
1056
1057 Ok(HttpResponse {
1058 http_response,
1059 events_byte_size,
1060 raw_byte_size,
1061 })
1062 })
1063 }
1064}
1065
1066#[cfg(test)]
1067mod test {
1068 #![allow(clippy::print_stderr)] use futures::{StreamExt, future::ready};
1071 use hyper::{
1072 Response, Server, Uri,
1073 service::{make_service_fn, service_fn},
1074 };
1075
1076 use super::*;
1077 use crate::{config::ProxyConfig, test_util::addr::next_addr};
1078
1079 #[test]
1080 fn util_http_retry_logic() {
1081 let logic = HttpRetryLogic::<()>::default();
1082
1083 let response_408 = Response::builder().status(408).body(Bytes::new()).unwrap();
1084 let response_429 = Response::builder().status(429).body(Bytes::new()).unwrap();
1085 let response_500 = Response::builder().status(500).body(Bytes::new()).unwrap();
1086 let response_400 = Response::builder().status(400).body(Bytes::new()).unwrap();
1087 let response_501 = Response::builder().status(501).body(Bytes::new()).unwrap();
1088 assert!(logic.should_retry_response(&response_429).is_retryable());
1089 assert!(logic.should_retry_response(&response_500).is_retryable());
1090 assert!(logic.should_retry_response(&response_408).is_retryable());
1091 assert!(
1092 logic
1093 .should_retry_response(&response_400)
1094 .is_not_retryable()
1095 );
1096 assert!(
1097 logic
1098 .should_retry_response(&response_501)
1099 .is_not_retryable()
1100 );
1101 }
1102
1103 #[test]
1104 fn retry_strategy_none_preserves_success_and_rejects_failures() {
1105 let strategy = RetryStrategy::None;
1106
1107 assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful());
1108 assert!(
1109 strategy
1110 .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR)
1111 .is_not_retryable()
1112 );
1113 }
1114
1115 #[test]
1116 fn retry_strategy_none_disables_timeout_retries() {
1117 let logic = HttpRetryLogic::<()> {
1118 request: PhantomData,
1119 retry_strategy: RetryStrategy::None,
1120 };
1121 let status_logic =
1122 HttpStatusRetryLogic::<_, (), ()>::new(|_: &()| StatusCode::OK, RetryStrategy::None);
1123
1124 assert!(!logic.is_retriable_timeout());
1125 assert!(!status_logic.is_retriable_timeout());
1126 }
1127
1128 #[test]
1129 fn retry_strategy_all_preserves_success_and_retries_failures() {
1130 let strategy = RetryStrategy::All;
1131
1132 assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful());
1133 assert!(
1134 strategy
1135 .retry_action::<()>(StatusCode::BAD_REQUEST)
1136 .is_retryable()
1137 );
1138 assert!(
1139 strategy
1140 .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR)
1141 .is_retryable()
1142 );
1143 }
1144
1145 #[test]
1146 fn retry_strategy_custom_only_retries_configured_statuses() {
1147 let strategy = RetryStrategy::Custom {
1148 status_codes: vec![StatusCode::BAD_REQUEST],
1149 };
1150
1151 assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful());
1152 assert!(
1153 strategy
1154 .retry_action::<()>(StatusCode::BAD_REQUEST)
1155 .is_retryable()
1156 );
1157 assert!(
1158 strategy
1159 .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR)
1160 .is_not_retryable()
1161 );
1162 }
1163
1164 #[test]
1165 fn retry_strategy_custom_serde_roundtrips_status_codes() {
1166 let json = r#"{"type":"custom","status_codes":[400,503]}"#;
1167 let strategy: RetryStrategy = serde_json::from_str(json).unwrap();
1168 assert_eq!(
1169 strategy,
1170 RetryStrategy::Custom {
1171 status_codes: vec![StatusCode::BAD_REQUEST, StatusCode::SERVICE_UNAVAILABLE],
1172 }
1173 );
1174 let encoded = serde_json::to_string(&strategy).unwrap();
1175 let roundtrip: RetryStrategy = serde_json::from_str(&encoded).unwrap();
1176 assert_eq!(roundtrip, strategy);
1177 }
1178
1179 #[test]
1180 fn retry_strategy_custom_serde_rejects_invalid_status_codes() {
1181 let json = r#"{"type":"custom","status_codes":[1000]}"#;
1183 let result = serde_json::from_str::<RetryStrategy>(json);
1184 assert!(
1185 result.is_err(),
1186 "expected invalid status code to fail deserialization"
1187 );
1188 }
1189
1190 #[tokio::test]
1191 async fn util_http_it_makes_http_requests() {
1192 let (_guard, addr) = next_addr();
1193
1194 let uri = format!("http://{}:{}/", addr.ip(), addr.port())
1195 .parse::<Uri>()
1196 .unwrap();
1197
1198 let request = Bytes::from("hello");
1199 let proxy = ProxyConfig::default();
1200 let client = HttpClient::new(None, &proxy).unwrap();
1201 let mut service = HttpBatchService::new(client, move |body: Bytes| {
1202 Box::pin(ready(
1203 http::Request::post(&uri).body(body).map_err(Into::into),
1204 ))
1205 });
1206
1207 let (tx, rx) = futures::channel::mpsc::channel(10);
1208
1209 let new_service = make_service_fn(move |_| {
1210 let tx = tx.clone();
1211
1212 let svc = service_fn(move |req: http::Request<Body>| {
1213 let mut tx = tx.clone();
1214
1215 async move {
1216 let mut body = http_body::Body::collect(req.into_body())
1217 .await
1218 .map_err(|error| format!("error: {error}"))?
1219 .aggregate();
1220 let string = String::from_utf8(body.copy_to_bytes(body.remaining()).to_vec())
1221 .map_err(|_| "Wasn't UTF-8".to_string())?;
1222 tx.try_send(string).map_err(|_| "Send error".to_string())?;
1223
1224 Ok::<_, crate::Error>(Response::new(Body::from("")))
1225 }
1226 });
1227
1228 async move { Ok::<_, std::convert::Infallible>(svc) }
1229 });
1230
1231 tokio::spawn(async move {
1232 if let Err(error) = Server::bind(&addr).serve(new_service).await {
1233 eprintln!("Server error: {error}");
1234 }
1235 });
1236
1237 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1238 service.call(request).await.unwrap();
1239
1240 let (body, _rest) = StreamExt::into_future(rx).await;
1241 assert_eq!(body.unwrap(), "hello");
1242 }
1243}