1use aws_sdk_s3::Client as S3Client;
2use tower::ServiceBuilder;
3#[cfg(feature = "codecs-parquet")]
4use vector_lib::codecs::BatchEncoder;
5#[cfg(feature = "codecs-parquet")]
6use vector_lib::codecs::encoding::{BatchSerializerConfig, format::ParquetSerializerConfig};
7use vector_lib::{
8 TimeZone,
9 codecs::{
10 EncoderKind, TextSerializerConfig,
11 encoding::{Framer, FramingConfig},
12 },
13 configurable::configurable_component,
14 sink::VectorSink,
15};
16
17use super::sink::S3RequestOptions;
18use crate::{
19 aws::{AwsAuthentication, RegionOrEndpoint},
20 codecs::{Encoder, EncodingConfigWithFraming, SinkType},
21 config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
22 sinks::{
23 Healthcheck,
24 s3_common::{
25 self,
26 config::{RetryStrategy, S3Options},
27 partitioner::S3KeyPartitioner,
28 service::S3Service,
29 sink::S3Sink,
30 },
31 util::{
32 BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
33 TowerRequestConfig, timezone_to_offset,
34 },
35 },
36 template::Template,
37 tls::TlsConfig,
38};
39
40#[cfg(feature = "codecs-parquet")]
42#[configurable_component]
43#[derive(Clone, Debug)]
44#[serde(tag = "codec", rename_all = "snake_case")]
45#[configurable(metadata(
46 docs::enum_tag_description = "The codec to use for batch encoding events."
47))]
48pub enum S3BatchEncoding {
49 Parquet(ParquetSerializerConfig),
53}
54
55#[configurable_component(sink(
57 "aws_s3",
58 "Store observability events in the AWS S3 object storage system."
59))]
60#[derive(Clone, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct S3SinkConfig {
63 #[configurable(metadata(docs::examples = "my-bucket"))]
67 pub bucket: String,
68
69 #[serde(default = "default_key_prefix")]
75 #[configurable(metadata(docs::templateable))]
76 #[configurable(metadata(docs::examples = "date=%F/hour=%H"))]
77 #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d"))]
78 #[configurable(metadata(docs::examples = "application_id={{ application_id }}/date=%F"))]
79 pub key_prefix: String,
80
81 #[serde(default = "default_filename_time_format")]
98 pub filename_time_format: String,
99
100 #[serde(default = "crate::serde::default_true")]
109 #[configurable(metadata(docs::human_name = "Append UUID to Filename"))]
110 pub filename_append_uuid: bool,
111
112 #[configurable(metadata(docs::examples = "json"))]
116 pub filename_extension: Option<String>,
117
118 #[serde(flatten)]
119 pub options: S3Options,
120
121 #[serde(flatten)]
122 pub region: RegionOrEndpoint,
123
124 #[serde(flatten)]
125 pub encoding: EncodingConfigWithFraming,
126
127 #[cfg(feature = "codecs-parquet")]
133 #[configurable(derived)]
134 #[serde(default)]
135 pub batch_encoding: Option<S3BatchEncoding>,
136
137 #[configurable(derived)]
144 #[serde(default = "Compression::gzip_default")]
145 pub compression: Compression,
146
147 #[configurable(derived)]
148 #[serde(default)]
149 pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
150
151 #[configurable(derived)]
152 #[serde(default)]
153 pub request: TowerRequestConfig,
154
155 #[configurable(derived)]
156 pub tls: Option<TlsConfig>,
157
158 #[configurable(derived)]
159 #[serde(default)]
160 pub auth: AwsAuthentication,
161
162 #[configurable(derived)]
163 #[serde(
164 default,
165 deserialize_with = "crate::serde::bool_or_struct",
166 skip_serializing_if = "crate::serde::is_default"
167 )]
168 pub acknowledgements: AcknowledgementsConfig,
169
170 #[configurable(derived)]
171 #[serde(default)]
172 pub timezone: Option<TimeZone>,
173
174 #[serde(default = "crate::serde::default_true")]
178 pub force_path_style: bool,
179
180 #[configurable(derived)]
185 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
186 pub retry_strategy: RetryStrategy,
187}
188
189pub(super) fn default_key_prefix() -> String {
190 "date=%F".to_string()
191}
192
193pub(super) fn default_filename_time_format() -> String {
194 "%s".to_string()
195}
196
197impl GenerateConfig for S3SinkConfig {
198 fn generate_config() -> toml::Value {
199 toml::Value::try_from(Self {
200 bucket: "".to_owned(),
201 key_prefix: default_key_prefix(),
202 filename_time_format: default_filename_time_format(),
203 filename_append_uuid: true,
204 filename_extension: None,
205 options: S3Options::default(),
206 region: RegionOrEndpoint::default(),
207 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
208 #[cfg(feature = "codecs-parquet")]
209 batch_encoding: None,
210 compression: Compression::gzip_default(),
211 batch: BatchConfig::default(),
212 request: TowerRequestConfig::default(),
213 tls: Some(TlsConfig::default()),
214 auth: AwsAuthentication::default(),
215 acknowledgements: Default::default(),
216 timezone: Default::default(),
217 force_path_style: Default::default(),
218 retry_strategy: Default::default(),
219 })
220 .unwrap()
221 }
222}
223
224#[async_trait::async_trait]
225#[typetag::serde(name = "aws_s3")]
226impl SinkConfig for S3SinkConfig {
227 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
228 let service = self.create_service(&cx.proxy).await?;
229 let healthcheck = self.build_healthcheck(service.client())?;
230 let sink = self.build_processor(service, cx)?;
231 Ok((sink, healthcheck))
232 }
233
234 fn input(&self) -> Input {
235 #[cfg(feature = "codecs-parquet")]
236 if let Some(batch_encoding) = &self.batch_encoding {
237 let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
238 let resolved = BatchSerializerConfig::Parquet(parquet_config.clone());
239 return Input::new(resolved.input_type());
240 }
241 Input::new(self.encoding.config().1.input_type())
242 }
243
244 fn acknowledgements(&self) -> &AcknowledgementsConfig {
245 &self.acknowledgements
246 }
247}
248
249impl S3SinkConfig {
250 pub fn build_processor(
251 &self,
252 service: S3Service,
253 cx: SinkContext,
254 ) -> crate::Result<VectorSink> {
255 let request_limits = self.request.into_settings();
260 let retry_strategy = self.retry_strategy.clone();
261 let service = ServiceBuilder::new()
262 .settings(request_limits, retry_strategy)
263 .service(service);
264
265 let offset = self
266 .timezone
267 .or(cx.globals.timezone)
268 .and_then(timezone_to_offset);
269
270 let batch_settings = self.batch.into_batcher_settings()?;
272
273 let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);
274
275 let ssekms_key_id = self
276 .options
277 .ssekms_key_id
278 .as_ref()
279 .cloned()
280 .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
281 .transpose()?;
282
283 let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
284
285 let transformer = self.encoding.transformer();
286
287 #[cfg(feature = "codecs-parquet")]
290 if let Some(batch_encoding) = &self.batch_encoding {
291 let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
292 let resolved_batch_config = BatchSerializerConfig::Parquet(parquet_config.clone());
293
294 let batch_serializer = resolved_batch_config.build_batch_serializer()?;
295 let batch_encoder = BatchEncoder::new(batch_serializer);
296
297 let mut api_options = self.options.clone();
300 if api_options.content_type.is_none() {
301 api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
302 }
303
304 let encoder = EncoderKind::Batch(batch_encoder);
305
306 let filename_extension = self.filename_extension.clone().or_else(|| {
307 Some(
308 match batch_encoding {
309 S3BatchEncoding::Parquet(_) => "parquet",
310 }
311 .to_string(),
312 )
313 });
314
315 if self.compression != Compression::None {
316 warn!("Top level compression setting ignored when batch_encoding set to parquet.")
317 }
318
319 let request_options = S3RequestOptions {
320 bucket: self.bucket.clone(),
321 api_options,
322 filename_extension,
323 filename_time_format: self.filename_time_format.clone(),
324 filename_append_uuid: self.filename_append_uuid,
325 encoder: (transformer, encoder),
326 compression: Compression::None,
328 filename_tz_offset: offset,
329 };
330
331 let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
332 return Ok(VectorSink::from_event_streamsink(sink));
333 }
334
335 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
336 let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
337
338 let request_options = S3RequestOptions {
339 bucket: self.bucket.clone(),
340 api_options: self.options.clone(),
341 filename_extension: self.filename_extension.clone(),
342 filename_time_format: self.filename_time_format.clone(),
343 filename_append_uuid: self.filename_append_uuid,
344 encoder: (transformer, encoder),
345 compression: self.compression,
346 filename_tz_offset: offset,
347 };
348
349 let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
350
351 Ok(VectorSink::from_event_streamsink(sink))
352 }
353
354 pub fn build_healthcheck(&self, client: S3Client) -> crate::Result<Healthcheck> {
355 s3_common::config::build_healthcheck(self.bucket.clone(), client)
356 }
357
358 pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
359 s3_common::config::create_service(
360 &self.region,
361 &self.auth,
362 proxy,
363 self.tls.as_ref(),
364 self.force_path_style,
365 )
366 .await
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::S3SinkConfig;
373
374 #[test]
375 fn generate_config() {
376 crate::test_util::test_generate_config::<S3SinkConfig>();
377 }
378
379 #[cfg(feature = "codecs-parquet")]
381 #[test]
382 fn parquet_batch_encoding_correct_toml_shape() {
383 let config: S3SinkConfig = toml::from_str(
384 r#"
385 bucket = "test-bucket"
386 compression = "none"
387
388 [encoding]
389 codec = "text"
390
391 [batch_encoding]
392 schema_mode = "auto_infer"
393 codec = "parquet"
394
395 [batch_encoding.compression]
396 algorithm = "snappy"
397
398 "#,
399 )
400 .expect("correct batch_encoding shape should parse");
401
402 let batch_enc = config
403 .batch_encoding
404 .expect("batch_encoding should be Some");
405 let super::S3BatchEncoding::Parquet(ref p) = batch_enc;
406 use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode};
407 assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer);
408 assert_eq!(p.compression, ParquetCompression::Snappy);
409 }
410
411 #[cfg(feature = "codecs-parquet")]
414 #[test]
415 fn parquet_content_type_auto_detected() {
416 use vector_lib::codecs::encoding::format::{
417 ParquetCompression, ParquetSchemaMode, ParquetSerializerConfig,
418 };
419
420 use crate::sinks::s3_common::config::S3Options;
421 use crate::sinks::util::{BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression};
422 use vector_lib::codecs::TextSerializerConfig;
423 use vector_lib::codecs::encoding::{BatchSerializerConfig, FramingConfig};
424
425 let parquet_config = ParquetSerializerConfig {
426 schema_mode: ParquetSchemaMode::AutoInfer,
427 compression: ParquetCompression::Snappy,
428 ..Default::default()
429 };
430
431 let config = S3SinkConfig {
432 bucket: "test".to_string(),
433 key_prefix: super::default_key_prefix(),
434 filename_time_format: super::default_filename_time_format(),
435 filename_append_uuid: true,
436 filename_extension: None,
437 options: S3Options::default(),
438 region: crate::aws::RegionOrEndpoint::with_both("us-east-1", "http://localhost:4566"),
439 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
440 batch_encoding: Some(super::S3BatchEncoding::Parquet(parquet_config)),
441 compression: Compression::None,
442 batch: BatchConfig::<BulkSizeBasedDefaultBatchSettings>::default(),
443 request: Default::default(),
444 tls: Default::default(),
445 auth: Default::default(),
446 acknowledgements: Default::default(),
447 timezone: Default::default(),
448 force_path_style: true,
449 retry_strategy: Default::default(),
450 };
451
452 let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
453 let batch_config = BatchSerializerConfig::Parquet(p.clone());
454 let batch_serializer = batch_config.build_batch_serializer().unwrap();
455 let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
456
457 let mut api_options = config.options.clone();
458 if api_options.content_type.is_none() {
459 api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
460 }
461
462 assert_eq!(
463 api_options.content_type.as_deref(),
464 Some("application/vnd.apache.parquet"),
465 "Content-Type must be auto-detected for Parquet"
466 );
467 }
468
469 #[cfg(feature = "codecs-parquet")]
471 #[test]
472 fn parquet_content_type_user_override_preserved() {
473 let config: S3SinkConfig = toml::from_str(
474 r#"
475 bucket = "test-bucket"
476 compression = "none"
477 content_type = "application/octet-stream"
478
479 [encoding]
480 codec = "text"
481
482 [batch_encoding]
483 codec = "parquet"
484 schema_mode = "auto_infer"
485
486 [batch_encoding.compression]
487 algorithm = "gzip"
488 level = 9
489 "#,
490 )
491 .unwrap();
492
493 let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
494 let batch_config = vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p.clone());
495 let batch_serializer = batch_config.build_batch_serializer().unwrap();
496 let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
497
498 let mut api_options = config.options.clone();
499 if api_options.content_type.is_none() {
500 api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
501 }
502
503 assert_eq!(
504 api_options.content_type.as_deref(),
505 Some("application/octet-stream"),
506 "User-specified Content-Type must not be overridden"
507 );
508 }
509
510 #[cfg(feature = "codecs-parquet")]
513 #[test]
514 fn parquet_batch_encoding_rejects_unsupported_codec() {
515 let err = serde_yaml::from_str::<S3SinkConfig>(
516 r#"
517 bucket: test-bucket
518 compression: none
519 encoding:
520 codec: text
521 batch_encoding:
522 codec: arrow_stream
523 "#,
524 )
525 .unwrap_err();
526
527 assert!(
528 err.to_string().contains("arrow_stream"),
529 "expected error to mention the offending codec, got: {err}"
530 );
531 }
532
533 #[cfg(feature = "codecs-parquet")]
535 #[test]
536 fn parquet_filename_extension_user_override() {
537 let config: S3SinkConfig = toml::from_str(
538 r#"
539 bucket = "test-bucket"
540 compression = "none"
541 filename_extension = "pq"
542
543 [encoding]
544 codec = "text"
545
546 [batch_encoding]
547 codec = "parquet"
548 schema_mode = "auto_infer"
549 "#,
550 )
551 .unwrap();
552
553 assert_eq!(config.filename_extension.as_deref(), Some("pq"));
554 }
555
556 #[cfg(feature = "codecs-parquet")]
558 #[test]
559 fn parquet_schema_mode_defaults_to_relaxed() {
560 use vector_lib::codecs::encoding::format::ParquetSchemaMode;
561
562 let config: S3SinkConfig = toml::from_str(
563 r#"
564 bucket = "test-bucket"
565 compression = "none"
566
567 [encoding]
568 codec = "text"
569
570 [batch_encoding]
571 codec = "parquet"
572 "#,
573 )
574 .unwrap();
575
576 let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
577 assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed);
578 }
579
580 #[cfg(feature = "codecs-parquet")]
582 #[test]
583 fn parquet_schema_mode_strict_parsed() {
584 use vector_lib::codecs::encoding::format::ParquetSchemaMode;
585
586 let config: S3SinkConfig = toml::from_str(
587 r#"
588 bucket = "test-bucket"
589 compression = "none"
590
591 [encoding]
592 codec = "text"
593
594 [batch_encoding]
595 codec = "parquet"
596 schema_mode = "strict"
597 schema_file = "tmp/something.schema"
598 "#,
599 )
600 .unwrap();
601
602 let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
603 assert_eq!(p.schema_mode, ParquetSchemaMode::Strict);
604 }
605}