1use std::collections::HashSet;
8use std::path::PathBuf;
9use std::sync::Arc;
10
11use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
12use arrow::error::ArrowError;
13use arrow::json::reader::infer_json_schema_from_iterator;
14use arrow::record_batch::RecordBatch;
15use bytes::{BufMut, BytesMut};
16use derivative::Derivative;
17use parquet::arrow::ArrowWriter;
18use parquet::basic::ZstdLevel;
19use parquet::basic::{Compression as ParquetCodecCompression, GzipLevel};
20use parquet::file::properties::WriterProperties;
21use std::io::{Error, ErrorKind};
22use tracing::warn;
23use vector_common::internal_event::{
24 ComponentEventsDropped, Count, InternalEventHandle, Registered, UNINTENTIONAL, emit, register,
25};
26use vector_config::configurable_component;
27use vector_core::event::Event;
28
29use super::arrow::{ArrowEncodingError, build_record_batch};
30use crate::encoding::format::arrow::vector_log_events_to_json_values;
31use crate::internal_events::{ArrowWriterError, JsonSerializationError, SchemaGenerationError};
32
33type EventsDroppedError = ComponentEventsDropped<'static, UNINTENTIONAL>;
34
35#[configurable_component]
37#[derive(Default, Copy, Clone, Debug, PartialEq)]
38#[configurable(metadata(
39 docs::enum_tag_description = "Compression codec applied per column page inside the Parquet file."
40))]
41#[serde(tag = "algorithm", rename_all = "snake_case")]
42pub enum ParquetCompression {
43 Zstd {
45 #[configurable(validation(range(min = 1, max = 21)))]
47 level: u8,
48 },
49 Gzip {
51 #[configurable(validation(range(min = 1, max = 9)))]
53 level: u8,
54 },
55
56 #[default]
58 Snappy,
59
60 Lz4,
62
63 None,
65}
66
67impl TryFrom<ParquetCompression> for ParquetCodecCompression {
68 type Error = parquet::errors::ParquetError;
69 fn try_from(
70 value: ParquetCompression,
71 ) -> Result<ParquetCodecCompression, parquet::errors::ParquetError> {
72 match value {
73 ParquetCompression::None => Ok(ParquetCodecCompression::UNCOMPRESSED),
74 ParquetCompression::Snappy => Ok(ParquetCodecCompression::SNAPPY),
75 ParquetCompression::Zstd { level } => Ok(ParquetCodecCompression::ZSTD(
76 ZstdLevel::try_new(level.into())?,
77 )),
78 ParquetCompression::Gzip { level } => Ok(ParquetCodecCompression::GZIP(
79 GzipLevel::try_new(level.into())?,
80 )),
81 ParquetCompression::Lz4 => Ok(ParquetCodecCompression::LZ4_RAW),
82 }
83 }
84}
85
86#[configurable_component]
88#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
89#[serde(rename_all = "snake_case")]
90pub enum ParquetSchemaMode {
91 #[default]
93 Relaxed,
94 Strict,
96 AutoInfer,
98}
99
100#[configurable_component]
107#[derive(Clone, Debug, Default)]
108pub struct ParquetSerializerConfig {
109 #[serde(default)]
114 pub schema_file: Option<PathBuf>,
115
116 #[serde(default)]
118 #[configurable(derived)]
119 pub compression: ParquetCompression,
120
121 #[serde(default)]
123 #[configurable(derived)]
124 pub schema_mode: ParquetSchemaMode,
125}
126
127impl ParquetSerializerConfig {
128 fn resolve_schema(&self) -> Result<Schema, Box<dyn std::error::Error + Send + Sync>> {
130 if self.schema_mode == ParquetSchemaMode::AutoInfer {
131 return Ok(Schema::empty());
132 }
133
134 let path = self
135 .schema_file
136 .as_ref()
137 .ok_or("schema_file is required unless schema_mode is auto_infer")?;
138
139 let content = read_schema_file(path, "schema_file")?;
140 let parquet_type = parquet::schema::parser::parse_message_type(&content)
141 .map_err(|e| format!("Failed to parse Parquet schema: {e}"))?;
142 let schema_desc = parquet::schema::types::SchemaDescriptor::new(Arc::new(parquet_type));
143 let arrow_schema = parquet::arrow::parquet_to_arrow_schema(&schema_desc, None)
144 .map_err(|e| format!("Failed to convert Parquet schema to Arrow: {e}"))?;
145 Ok(arrow_schema)
146 }
147
148 pub fn input_type(&self) -> vector_core::config::DataType {
150 vector_core::config::DataType::Log
151 }
152
153 pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
155 vector_core::schema::Requirement::empty()
156 }
157}
158
159fn read_schema_file(
160 path: &std::path::Path,
161 field_name: &str,
162) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
163 const MAX_SCHEMA_FILE_SIZE: u64 = 10 * 1024 * 1024; let display = path.display();
165 let metadata = std::fs::metadata(path)
166 .map_err(|e| format!("Failed to read {field_name} '{display}': {e}"))?;
167 if metadata.len() > MAX_SCHEMA_FILE_SIZE {
168 return Err(format!(
169 "{field_name} '{display}' is too large ({} bytes, max {MAX_SCHEMA_FILE_SIZE})",
170 metadata.len()
171 )
172 .into());
173 }
174 std::fs::read_to_string(path)
175 .map_err(|e| format!("Failed to read {field_name} '{display}': {e}").into())
176}
177
178fn reject_unsupported_arrow_types(
183 schema: &Schema,
184) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
185 fn check_field(field: &Field, path: &str, bad: &mut Vec<String>) {
186 let name = if path.is_empty() {
187 field.name().to_string()
188 } else {
189 format!("{path}.{}", field.name())
190 };
191 match field.data_type() {
192 DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
193 bad.push(format!("'{name}' ({:?})", field.data_type()));
194 }
195 DataType::Struct(fields) => {
196 for f in fields {
197 check_field(f, &name, bad);
198 }
199 }
200 DataType::List(inner) | DataType::LargeList(inner) => {
201 check_field(inner, &name, bad);
202 }
203 DataType::Map(entries_field, _) => {
204 if let DataType::Struct(kv) = entries_field.data_type() {
205 for f in kv {
206 check_field(f, &name, bad);
207 }
208 }
209 }
210 _ => {}
211 }
212 }
213
214 let mut bad = Vec::new();
215 for field in schema.fields() {
216 check_field(field, "", &mut bad);
217 }
218 if !bad.is_empty() {
219 return Err(format!(
220 "Schema contains binary field(s) unsupported by the JSON-based Arrow encoder: {}. \
221 Use Utf8 for base64/hex-encoded data instead.",
222 bad.join(", ")
223 )
224 .into());
225 }
226 Ok(())
227}
228
229#[derive(Derivative)]
231#[derivative(Debug, Clone)]
232pub struct ParquetSerializer {
233 schema: SchemaRef,
234 writer_props: Arc<WriterProperties>,
235 schema_mode: ParquetSchemaMode,
236 schema_field_names: HashSet<String>,
238
239 #[derivative(Debug = "ignore")]
240 events_dropped_handle: Registered<EventsDroppedError>,
241}
242
243impl ParquetSerializer {
244 pub fn new(
246 config: ParquetSerializerConfig,
247 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync + 'static>> {
248 let schema = config.resolve_schema()?;
249 reject_unsupported_arrow_types(&schema)?;
250 let schema_ref = SchemaRef::new(schema);
251
252 let schema_field_names = schema_ref
253 .fields()
254 .iter()
255 .map(|f| f.name().clone())
256 .collect::<HashSet<_>>();
257
258 let writer_props = Arc::new(
259 WriterProperties::builder()
260 .set_compression(config.compression.try_into()?)
261 .build(),
262 );
263
264 Ok(Self {
265 schema: schema_ref,
266 writer_props,
267 schema_mode: config.schema_mode,
268 schema_field_names,
269 events_dropped_handle: register(EventsDroppedError::from(
270 "Events could not be serialized to parquet",
271 )),
272 })
273 }
274
275 pub const fn content_type(&self) -> &'static str {
277 "application/vnd.apache.parquet"
278 }
279
280 fn write_record_batch(
286 record_batch: &RecordBatch,
287 buffer: &mut BytesMut,
288 writer_props: &WriterProperties,
289 ) -> Result<(), parquet::errors::ParquetError> {
290 let mut writer = ArrowWriter::try_new(
291 buffer.writer(),
292 Arc::clone(record_batch.schema_ref()),
293 Some(writer_props.clone()),
294 )
295 .inspect_err(|e| {
296 emit(ArrowWriterError { error: e });
297 })?;
298
299 writer.write(record_batch).inspect_err(|e| {
300 emit(ArrowWriterError { error: e });
301 })?;
302
303 writer.close().inspect_err(|e| {
304 emit(ArrowWriterError { error: e });
305 })?;
306
307 Ok(())
308 }
309}
310
311impl tokio_util::codec::Encoder<Vec<Event>> for ParquetSerializer {
312 type Error = vector_common::Error;
313
314 fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
315 if events.is_empty() {
316 return Ok(());
317 }
318
319 let json_values = match vector_log_events_to_json_values(&events) {
320 Ok(values) => values,
321 Err(e) => {
322 emit(JsonSerializationError { error: &e });
323 return Err(Box::new(e));
324 }
325 };
326
327 let non_log_count = events.len() - json_values.len();
328
329 if non_log_count > 0 {
330 warn!(
331 message = "Non-log events dropped by Parquet encoder ",
332 %non_log_count,
333 internal_log_rate_secs = 10,
334 );
335 self.events_dropped_handle.emit(Count(non_log_count))
336 }
337
338 if json_values.is_empty() {
339 return Ok(());
340 }
341
342 match self.schema_mode {
343 ParquetSchemaMode::Strict => {
345 for event in &events {
346 if let Some(log) = event.maybe_as_log()
347 && let Some(object_map) = log.as_map()
348 {
349 for top_level in object_map.keys() {
350 if !self.schema_field_names.contains(top_level.as_str()) {
351 return Err(Box::new(ArrowEncodingError::SchemaFetchError {
352 message: format!(
353 "Strict schema mode: event contains field '{top_level}' not in schema",
354 ),
355 }));
356 }
357 }
358 }
359 }
360 }
361 ParquetSchemaMode::AutoInfer => {
362 let schema = ParquetSchemaGenerator::infer_schema(&json_values)?;
363 self.schema = Arc::new(ParquetSchemaGenerator::try_normalize_schema(
364 &events, schema,
365 ));
366 }
367 ParquetSchemaMode::Relaxed => {}
368 }
369
370 let record_batch =
371 build_record_batch(Arc::clone(&self.schema), &json_values).map_err(Box::new)?;
372
373 Self::write_record_batch(&record_batch, buffer, &self.writer_props).map_err(Box::new)?;
374
375 Ok(())
376 }
377}
378
379pub struct ParquetSchemaGenerator {}
380
381impl ParquetSchemaGenerator {
382 pub fn infer_schema(events: &[serde_json::Value]) -> Result<Schema, Error> {
383 let schema = infer_json_schema_from_iterator(events.iter().map(Ok::<_, ArrowError>))
384 .map_err(|e| {
385 emit(SchemaGenerationError { error: &e });
386 Error::new(ErrorKind::InvalidData, e.to_string())
387 })?;
388
389 Ok(schema)
390 }
391
392 fn try_normalize_schema(events: &[Event], schema: Schema) -> Schema {
395 let mut ts_seen: HashSet<String> = HashSet::new();
396 let mut non_ts_seen: HashSet<String> = HashSet::new();
397
398 for event in events.iter().filter_map(Event::maybe_as_log) {
399 if let Some(object_map) = event.as_map() {
400 for (path, value) in object_map {
401 if value.is_timestamp() {
402 ts_seen.insert(path.to_string());
403 } else if !value.is_null() {
404 non_ts_seen.insert(path.to_string());
405 }
406 }
407 }
408 }
409
410 let new_fields: Vec<Field> = schema
411 .fields()
412 .iter()
413 .map(|f| {
414 if ts_seen.contains(f.name()) && !non_ts_seen.contains(f.name()) {
415 Field::new(
416 f.name(),
417 DataType::Timestamp(
418 arrow::datatypes::TimeUnit::Microsecond,
419 Some("UTC".into()),
420 ),
421 f.is_nullable(),
422 )
423 } else {
424 f.as_ref().clone()
425 }
426 })
427 .collect();
428
429 Schema::new_with_metadata(new_fields, schema.metadata().clone())
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use bytes::Bytes;
437 use parquet::file::reader::{FileReader, SerializedFileReader};
438 use parquet::record::reader::RowIter;
439 use tokio_util::codec::Encoder;
440 use vector_core::event::LogEvent;
441
442 fn create_event<V>(fields: Vec<(&str, V)>) -> Event
443 where
444 V: Into<vector_core::event::Value>,
445 {
446 let mut log = LogEvent::default();
447 for (key, value) in fields {
448 log.insert(key, value.into());
449 }
450 Event::Log(log)
451 }
452
453 fn assert_parquet_magic(data: &[u8]) {
454 assert!(data.len() >= 4, "Output too short to be valid Parquet");
455 assert_eq!(&data[..4], b"PAR1", "Missing Parquet magic bytes");
456 }
457
458 fn parquet_row_count(data: &[u8]) -> usize {
459 let reader =
460 SerializedFileReader::new(Bytes::copy_from_slice(data)).expect("Invalid Parquet file");
461 let iter = RowIter::from_file_into(Box::new(reader));
462 iter.count()
463 }
464
465 fn parquet_column_names(data: &[u8]) -> Vec<String> {
466 let reader =
467 SerializedFileReader::new(Bytes::copy_from_slice(data)).expect("Invalid Parquet file");
468 let schema = reader.metadata().file_metadata().schema_descr();
469 schema
470 .columns()
471 .iter()
472 .map(|c| c.name().to_string())
473 .collect()
474 }
475
476 fn parse_timestamp(s: &str) -> chrono::DateTime<chrono::Utc> {
477 chrono::DateTime::parse_from_rfc3339(s)
478 .expect("invalid test timestamp")
479 .with_timezone(&chrono::Utc)
480 }
481
482 fn demo_log_event(
483 message: &str,
484 timestamp: chrono::DateTime<chrono::Utc>,
485 status_code: i64,
486 response_time_secs: f64,
487 ) -> Event {
488 use vector_core::event::Value;
489 let mut log = LogEvent::default();
490 log.insert("host", "localhost");
491 log.insert("message", message);
492 log.insert("service", "vector");
493 log.insert("source_type", "demo_logs");
494 log.insert("timestamp", Value::Timestamp(timestamp));
495 log.insert("random_time", Value::Timestamp(timestamp));
496 log.insert("status_code", Value::Integer(status_code));
497 log.insert("response_time_secs", response_time_secs);
498 Event::Log(log)
499 }
500
501 fn sample_events() -> Vec<Event> {
502 const EVENTS: [(&str, &str, i64, f64); 5] = [
503 (
504 "GET /api/v1/health HTTP/1.1",
505 "2026-03-05T20:49:08.037194Z",
506 200,
507 0.037,
508 ),
509 (
510 "POST /api/v1/ingest HTTP/1.1",
511 "2026-03-05T20:49:09.038051Z",
512 201,
513 0.013,
514 ),
515 (
516 "GET /metrics HTTP/1.1",
517 "2026-03-05T20:49:10.036612Z",
518 200,
519 0.022,
520 ),
521 (
522 "DELETE /api/v1/resource HTTP/1.1",
523 "2026-03-05T20:49:11.537131Z",
524 404,
525 0.005,
526 ),
527 (
528 "PATCH /api/v1/config HTTP/1.1",
529 "2026-03-05T20:49:12.037491Z",
530 500,
531 0.091,
532 ),
533 ];
534 EVENTS
535 .iter()
536 .map(|(msg, ts, status, rt)| demo_log_event(msg, parse_timestamp(ts), *status, *rt))
537 .collect()
538 }
539
540 fn encode_autoinfer_and_read_schema(
541 events: Vec<Event>,
542 ) -> (arrow::datatypes::SchemaRef, usize) {
543 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
544
545 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
546 schema_mode: ParquetSchemaMode::AutoInfer,
547 ..Default::default()
548 })
549 .expect("AutoInfer serializer should be created without a static schema");
550
551 let mut buffer = BytesMut::new();
552 serializer
553 .encode(events, &mut buffer)
554 .expect("encoding should succeed");
555
556 let data = buffer.freeze();
557 assert_parquet_magic(&data);
558
559 let builder = ParquetRecordBatchReaderBuilder::try_new(data)
560 .expect("should build ParquetRecordBatchReaderBuilder");
561 let schema = builder.schema().clone();
562 let num_rows: usize = builder
563 .build()
564 .expect("should build reader")
565 .map(|b| b.expect("batch read error").num_rows())
566 .sum();
567 (schema, num_rows)
568 }
569
570 fn write_temp_schema(name: &str, content: &str) -> std::path::PathBuf {
574 use std::io::Write;
575 let path = std::env::temp_dir().join(format!(
576 "vector_parquet_test_{}_{}.schema",
577 std::process::id(),
578 name,
579 ));
580 let mut f = std::fs::File::create(&path).expect("Failed to create schema file");
581 write!(f, "{content}").expect("Failed to write schema");
582 path
583 }
584
585 #[test]
588 fn encode_input_produces_parquet_output() {
589 let events = sample_events();
590 let n_events = events.len();
591 let (schema, num_rows) = encode_autoinfer_and_read_schema(events);
592
593 assert_eq!(num_rows, n_events, "row count should match event count");
594
595 for field_name in &["timestamp", "random_time"] {
596 let field = schema
597 .field_with_name(field_name)
598 .unwrap_or_else(|_| panic!("field '{field_name}' should exist in schema"));
599 assert!(
600 matches!(
601 field.data_type(),
602 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, Some(tz)) if tz.as_ref() == "UTC"
603 ),
604 "'{field_name}' should be Timestamp(Microsecond, UTC), got {:?}",
605 field.data_type()
606 );
607 }
608
609 let status_field = schema
610 .field_with_name("status_code")
611 .expect("status_code field should exist");
612 assert_eq!(status_field.data_type(), &DataType::Int64);
613
614 let rt_field = schema
615 .field_with_name("response_time_secs")
616 .expect("response_time_secs field should exist");
617 assert_eq!(rt_field.data_type(), &DataType::Float64);
618
619 for field_name in &["host", "message", "service", "source_type"] {
620 let field = schema
621 .field_with_name(field_name)
622 .unwrap_or_else(|_| panic!("field '{field_name}' should exist in schema"));
623 assert_eq!(field.data_type(), &DataType::Utf8);
624 }
625 }
626
627 #[test]
628 fn test_parquet_empty_events() {
629 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
630 schema_mode: ParquetSchemaMode::AutoInfer,
631 ..Default::default()
632 })
633 .expect("AutoInfer serializer should succeed");
634
635 let events: Vec<Event> = vec![];
636 let mut buffer = BytesMut::new();
637 serializer
638 .encode(events, &mut buffer)
639 .expect("Empty events should succeed");
640
641 assert!(buffer.is_empty(), "Buffer should be empty for empty events");
642 }
643
644 #[test]
645 fn test_parquet_compression_variants() {
646 let events = vec![create_event(vec![("msg", "hello world")])];
647
648 let compressions = vec![
649 ParquetCompression::None,
650 ParquetCompression::Snappy,
651 ParquetCompression::Zstd { level: 1 },
652 ParquetCompression::Gzip { level: 1 },
653 ParquetCompression::Lz4,
654 ];
655
656 for compression in compressions {
657 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
658 schema_mode: ParquetSchemaMode::AutoInfer,
659 compression,
660 ..Default::default()
661 })
662 .expect("Failed to create serializer");
663
664 let mut buffer = BytesMut::new();
665 serializer
666 .encode(events.clone(), &mut buffer)
667 .unwrap_or_else(|e| panic!("Encoding with {:?} failed: {}", compression, e));
668
669 let data = buffer.freeze();
670 assert_parquet_magic(&data);
671 assert_eq!(
672 parquet_row_count(&data),
673 1,
674 "Wrong row count for {:?}",
675 compression
676 );
677 }
678 }
679
680 #[test]
681 fn test_parquet_output_has_footer() {
682 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
683 schema_mode: ParquetSchemaMode::AutoInfer,
684 ..Default::default()
685 })
686 .expect("AutoInfer serializer should succeed");
687
688 let events = vec![create_event(vec![("msg", "test")])];
689 let mut buffer = BytesMut::new();
690 serializer.encode(events, &mut buffer).unwrap();
691
692 let data = buffer.freeze();
693 let len = data.len();
694 assert!(len >= 8, "Parquet output too short");
695 assert_eq!(
696 &data[len - 4..],
697 b"PAR1",
698 "Parquet footer magic bytes missing"
699 );
700 }
701
702 #[test]
703 fn test_writer_props_arc_shared() {
704 let serializer = ParquetSerializer::new(ParquetSerializerConfig {
705 schema_mode: ParquetSchemaMode::AutoInfer,
706 ..Default::default()
707 })
708 .expect("AutoInfer serializer should succeed");
709 let cloned = serializer.clone();
710
711 assert_eq!(Arc::strong_count(&serializer.writer_props), 2);
712 drop(cloned);
713 assert_eq!(Arc::strong_count(&serializer.writer_props), 1);
714 }
715
716 #[test]
717 fn test_mixed_log_and_non_log_events() {
718 use vector_core::event::{Metric, MetricKind, MetricValue};
719
720 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
721 schema_mode: ParquetSchemaMode::AutoInfer,
722 ..Default::default()
723 })
724 .expect("AutoInfer serializer should succeed");
725
726 let metric = Metric::new(
727 "cpu.usage",
728 MetricKind::Absolute,
729 MetricValue::Gauge { value: 42.0 },
730 );
731 let events = vec![
732 create_event(vec![("msg", "hello")]),
733 Event::Metric(metric),
734 create_event(vec![("msg", "world")]),
735 ];
736
737 let mut buffer = BytesMut::new();
738 serializer
739 .encode(events, &mut buffer)
740 .expect("Mixed batch should succeed (non-log events dropped)");
741
742 assert_parquet_magic(&buffer);
743 assert_eq!(parquet_row_count(&buffer), 2);
744 }
745
746 #[test]
749 fn test_parquet_schema_file() {
750 let schema_path = write_temp_schema(
751 "schema_file",
752 "message logs {\n required binary name (STRING);\n optional int64 age;\n}",
753 );
754
755 let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
756 "schema_file": schema_path.to_str().unwrap()
757 }))
758 .expect("Config should deserialize");
759
760 let mut serializer =
761 ParquetSerializer::new(config).expect("Should create serializer from schema file");
762
763 let mut log = LogEvent::default();
764 log.insert("name", "alice");
765
766 let mut buffer = BytesMut::new();
767 serializer
768 .encode(vec![Event::Log(log)], &mut buffer)
769 .expect("Encoding with schema file should succeed");
770
771 let data = buffer.freeze();
772 assert_parquet_magic(&data);
773 assert_eq!(parquet_row_count(&data), 1);
774
775 let columns = parquet_column_names(&data);
776 assert_eq!(columns, vec!["name", "age"]);
777 }
778
779 #[test]
780 fn test_parquet_schema_file_not_found_error() {
781 let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
782 "schema_file": "/nonexistent/path/schema.parquet"
783 }))
784 .expect("Config should deserialize");
785
786 let result = ParquetSerializer::new(config);
787 assert!(result.is_err(), "Missing schema file should error");
788 assert!(
789 result.unwrap_err().to_string().contains("Failed to read"),
790 "Error should mention file read failure"
791 );
792 }
793
794 #[test]
795 fn test_parquet_schema_file_invalid_syntax_error() {
796 let schema_path = write_temp_schema(
797 "invalid_syntax",
798 "this is not valid parquet schema syntax !!!",
799 );
800
801 let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
802 "schema_file": schema_path.to_str().unwrap()
803 }))
804 .expect("Config should deserialize");
805
806 let result = ParquetSerializer::new(config);
807 assert!(result.is_err(), "Invalid Parquet schema should error");
808 assert!(
809 result
810 .unwrap_err()
811 .to_string()
812 .contains("Failed to parse Parquet schema"),
813 "Error should mention parsing failure"
814 );
815 }
816
817 #[test]
818 fn test_parquet_no_schema_error() {
819 let config = ParquetSerializerConfig::default();
820 let result = ParquetSerializer::new(config);
821 assert!(
822 result.is_err(),
823 "Should fail without schema_file or auto_infer"
824 );
825 }
826
827 #[test]
830 fn test_parquet_strict_mode_rejects_extra_fields() {
831 let schema_path = write_temp_schema(
832 "strict_rejects",
833 "message logs {\n required binary name (STRING);\n}",
834 );
835
836 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
837 schema_file: Some(schema_path),
838 schema_mode: ParquetSchemaMode::Strict,
839 ..Default::default()
840 })
841 .expect("Failed to create strict serializer");
842
843 let events = vec![create_event(vec![("name", "alice"), ("city", "paris")])];
844 let mut buffer = BytesMut::new();
845 let result = serializer.encode(events, &mut buffer);
846 assert!(result.is_err(), "Strict mode should reject extra fields");
847 assert!(result.unwrap_err().to_string().contains("city"));
848 }
849
850 #[test]
851 fn test_parquet_strict_mode_allows_schema_fields() {
852 let schema_path = write_temp_schema(
853 "strict_allows",
854 "message logs {\n required binary name (STRING);\n required binary level (STRING);\n}",
855 );
856
857 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
858 schema_file: Some(schema_path),
859 schema_mode: ParquetSchemaMode::Strict,
860 ..Default::default()
861 })
862 .expect("Failed to create strict serializer");
863
864 let mut log = LogEvent::default();
865 log.insert("name", "test");
866 log.insert("level", "info");
867
868 let mut buffer = BytesMut::new();
869 assert!(
870 serializer
871 .encode(vec![Event::Log(log)], &mut buffer)
872 .is_ok(),
873 "Strict mode should pass when all fields match schema"
874 );
875 }
876
877 #[test]
878 fn test_parquet_relaxed_mode_drops_extra_fields() {
879 let schema_path = write_temp_schema(
880 "relaxed_drops",
881 "message logs {\n required binary name (STRING);\n}",
882 );
883
884 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
885 schema_file: Some(schema_path),
886 schema_mode: ParquetSchemaMode::Relaxed,
887 ..Default::default()
888 })
889 .expect("Failed to create relaxed serializer");
890
891 let events = vec![create_event(vec![("name", "alice"), ("city", "paris")])];
892 let mut buffer = BytesMut::new();
893 serializer
894 .encode(events, &mut buffer)
895 .expect("Relaxed mode should drop extra fields silently");
896
897 let data = buffer.freeze();
898 assert_parquet_magic(&data);
899 assert_eq!(parquet_row_count(&data), 1);
900 let columns = parquet_column_names(&data);
901 assert_eq!(columns, vec!["name"]);
902 }
903
904 #[test]
905 fn test_parquet_type_mismatch_returns_error() {
906 let schema_path =
907 write_temp_schema("type_mismatch", "message logs {\n required int64 name;\n}");
908
909 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
910 schema_file: Some(schema_path),
911 schema_mode: ParquetSchemaMode::Relaxed,
912 ..Default::default()
913 })
914 .expect("Failed to create serializer");
915
916 let events = vec![create_event(vec![("name", "not_an_integer")])];
917 let mut buffer = BytesMut::new();
918 let result = serializer.encode(events, &mut buffer);
919 assert!(result.is_err(), "Type mismatch should return an error");
920 let err = result.unwrap_err().to_string();
921 assert!(
922 err.contains("Int64"),
923 "Error should mention the expected type, got: {err}"
924 );
925 }
926
927 #[test]
928 fn test_parquet_schema_file_binary_without_string_annotation_rejected() {
929 let schema_path = write_temp_schema(
932 "binary_rejected",
933 "message logs {\n required binary name (STRING);\n optional binary raw_data;\n}",
934 );
935
936 let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
937 "schema_file": schema_path.to_str().unwrap()
938 }))
939 .expect("Config should deserialize");
940
941 let result = ParquetSerializer::new(config);
942 assert!(
943 result.is_err(),
944 "Parquet binary without STRING annotation should be rejected"
945 );
946 assert!(
947 result.unwrap_err().to_string().contains("raw_data"),
948 "Error should name the offending field"
949 );
950 }
951}