codecs/encoding/format/
parquet.rs

1//! Parquet batch format codec for batched event encoding
2//!
3//! Provides Apache Parquet format encoding with schema file support and auto-inference.
4//! Reuses the Arrow record batch building logic from the Arrow IPC codec,
5//! then writes the batch as a complete Parquet file using `ArrowWriter`.
6
7use std::collections::HashSet;
8use std::path::PathBuf;
9use std::sync::Arc;
10
11use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
12use arrow::error::ArrowError;
13use arrow::json::reader::infer_json_schema_from_iterator;
14use arrow::record_batch::RecordBatch;
15use bytes::{BufMut, BytesMut};
16use derivative::Derivative;
17use parquet::arrow::ArrowWriter;
18use parquet::basic::ZstdLevel;
19use parquet::basic::{Compression as ParquetCodecCompression, GzipLevel};
20use parquet::file::properties::WriterProperties;
21use std::io::{Error, ErrorKind};
22use tracing::warn;
23use vector_common::internal_event::{
24    ComponentEventsDropped, Count, InternalEventHandle, Registered, UNINTENTIONAL, emit, register,
25};
26use vector_config::configurable_component;
27use vector_core::event::Event;
28
29use super::arrow::{ArrowEncodingError, build_record_batch};
30use crate::encoding::format::arrow::vector_log_events_to_json_values;
31use crate::internal_events::{ArrowWriterError, JsonSerializationError, SchemaGenerationError};
32
33type EventsDroppedError = ComponentEventsDropped<'static, UNINTENTIONAL>;
34
35/// Compression algorithm and optional level for archive objects.
36#[configurable_component]
37#[derive(Default, Copy, Clone, Debug, PartialEq)]
38#[configurable(metadata(
39    docs::enum_tag_description = "Compression codec applied per column page inside the Parquet file."
40))]
41#[serde(tag = "algorithm", rename_all = "snake_case")]
42pub enum ParquetCompression {
43    /// Zstd compression. Level must be between 1 and 21.
44    Zstd {
45        /// Compression level (1–21). This is the range Vector supports; higher values compress more but are slower.
46        #[configurable(validation(range(min = 1, max = 21)))]
47        level: u8,
48    },
49    /// Gzip compression. Level must be between 1 and 9.
50    Gzip {
51        /// Compression level (1–9). This is the range Vector supports; higher values compress more but are slower.
52        #[configurable(validation(range(min = 1, max = 9)))]
53        level: u8,
54    },
55
56    /// Snappy compression (no level).
57    #[default]
58    Snappy,
59
60    /// LZ4 raw compression
61    Lz4,
62
63    /// No compression
64    None,
65}
66
67impl TryFrom<ParquetCompression> for ParquetCodecCompression {
68    type Error = parquet::errors::ParquetError;
69    fn try_from(
70        value: ParquetCompression,
71    ) -> Result<ParquetCodecCompression, parquet::errors::ParquetError> {
72        match value {
73            ParquetCompression::None => Ok(ParquetCodecCompression::UNCOMPRESSED),
74            ParquetCompression::Snappy => Ok(ParquetCodecCompression::SNAPPY),
75            ParquetCompression::Zstd { level } => Ok(ParquetCodecCompression::ZSTD(
76                ZstdLevel::try_new(level.into())?,
77            )),
78            ParquetCompression::Gzip { level } => Ok(ParquetCodecCompression::GZIP(
79                GzipLevel::try_new(level.into())?,
80            )),
81            ParquetCompression::Lz4 => Ok(ParquetCodecCompression::LZ4_RAW),
82        }
83    }
84}
85
86/// Schema handling mode.
87#[configurable_component]
88#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
89#[serde(rename_all = "snake_case")]
90pub enum ParquetSchemaMode {
91    /// Missing fields become null. Extra fields are silently dropped.
92    #[default]
93    Relaxed,
94    /// Missing fields become null. Extra fields cause an error.
95    Strict,
96    /// Auto infer schema based on the batch. No schema file needed.
97    AutoInfer,
98}
99
100/// Configuration for the Parquet serializer.
101///
102/// Encodes events as Apache Parquet columnar files, optimized for analytical queries
103/// via Athena, Trino, Spark, and other columnar query engines.
104///
105/// Either `schema_file` must be provided, or `schema_mode` must be set to `auto_infer`.
106#[configurable_component]
107#[derive(Clone, Debug, Default)]
108pub struct ParquetSerializerConfig {
109    /// Path to a native Parquet schema file (`.schema`).
110    ///
111    /// Required unless `schema_mode` is `auto_infer`. The file must contain a valid
112    /// Parquet message type definition.
113    #[serde(default)]
114    pub schema_file: Option<PathBuf>,
115
116    /// Compression codec applied per column page inside the Parquet file.
117    #[serde(default)]
118    #[configurable(derived)]
119    pub compression: ParquetCompression,
120
121    /// Controls how events with fields not present in the schema are handled.
122    #[serde(default)]
123    #[configurable(derived)]
124    pub schema_mode: ParquetSchemaMode,
125}
126
127impl ParquetSerializerConfig {
128    /// Resolve the Arrow schema from the configured schema source.
129    fn resolve_schema(&self) -> Result<Schema, Box<dyn std::error::Error + Send + Sync>> {
130        if self.schema_mode == ParquetSchemaMode::AutoInfer {
131            return Ok(Schema::empty());
132        }
133
134        let path = self
135            .schema_file
136            .as_ref()
137            .ok_or("schema_file is required unless schema_mode is auto_infer")?;
138
139        let content = read_schema_file(path, "schema_file")?;
140        let parquet_type = parquet::schema::parser::parse_message_type(&content)
141            .map_err(|e| format!("Failed to parse Parquet schema: {e}"))?;
142        let schema_desc = parquet::schema::types::SchemaDescriptor::new(Arc::new(parquet_type));
143        let arrow_schema = parquet::arrow::parquet_to_arrow_schema(&schema_desc, None)
144            .map_err(|e| format!("Failed to convert Parquet schema to Arrow: {e}"))?;
145        Ok(arrow_schema)
146    }
147
148    /// The data type of events that are accepted by `ParquetSerializer`.
149    pub fn input_type(&self) -> vector_core::config::DataType {
150        vector_core::config::DataType::Log
151    }
152
153    /// The schema required by the serializer.
154    pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
155        vector_core::schema::Requirement::empty()
156    }
157}
158
159fn read_schema_file(
160    path: &std::path::Path,
161    field_name: &str,
162) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
163    const MAX_SCHEMA_FILE_SIZE: u64 = 10 * 1024 * 1024; // 10 MB
164    let display = path.display();
165    let metadata = std::fs::metadata(path)
166        .map_err(|e| format!("Failed to read {field_name} '{display}': {e}"))?;
167    if metadata.len() > MAX_SCHEMA_FILE_SIZE {
168        return Err(format!(
169            "{field_name} '{display}' is too large ({} bytes, max {MAX_SCHEMA_FILE_SIZE})",
170            metadata.len()
171        )
172        .into());
173    }
174    std::fs::read_to_string(path)
175        .map_err(|e| format!("Failed to read {field_name} '{display}': {e}").into())
176}
177
178/// Check the resolved Arrow schema for data types unsupported by the JSON-based
179/// encode path (`arrow::json::reader::ReaderBuilder`). Binary variants are
180/// accepted by Parquet/Arrow at the schema level but the JSON decoder rejects
181/// them at runtime, so we fail fast here at config time.
182fn reject_unsupported_arrow_types(
183    schema: &Schema,
184) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
185    fn check_field(field: &Field, path: &str, bad: &mut Vec<String>) {
186        let name = if path.is_empty() {
187            field.name().to_string()
188        } else {
189            format!("{path}.{}", field.name())
190        };
191        match field.data_type() {
192            DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
193                bad.push(format!("'{name}' ({:?})", field.data_type()));
194            }
195            DataType::Struct(fields) => {
196                for f in fields {
197                    check_field(f, &name, bad);
198                }
199            }
200            DataType::List(inner) | DataType::LargeList(inner) => {
201                check_field(inner, &name, bad);
202            }
203            DataType::Map(entries_field, _) => {
204                if let DataType::Struct(kv) = entries_field.data_type() {
205                    for f in kv {
206                        check_field(f, &name, bad);
207                    }
208                }
209            }
210            _ => {}
211        }
212    }
213
214    let mut bad = Vec::new();
215    for field in schema.fields() {
216        check_field(field, "", &mut bad);
217    }
218    if !bad.is_empty() {
219        return Err(format!(
220            "Schema contains binary field(s) unsupported by the JSON-based Arrow encoder: {}. \
221             Use Utf8 for base64/hex-encoded data instead.",
222            bad.join(", ")
223        )
224        .into());
225    }
226    Ok(())
227}
228
229/// Parquet batch serializer.
230#[derive(Derivative)]
231#[derivative(Debug, Clone)]
232pub struct ParquetSerializer {
233    schema: SchemaRef,
234    writer_props: Arc<WriterProperties>,
235    schema_mode: ParquetSchemaMode,
236    /// Pre-built set of schema field names for O(1) strict-mode lookups.
237    schema_field_names: HashSet<String>,
238
239    #[derivative(Debug = "ignore")]
240    events_dropped_handle: Registered<EventsDroppedError>,
241}
242
243impl ParquetSerializer {
244    /// Create a new `ParquetSerializer` from the given configuration.
245    pub fn new(
246        config: ParquetSerializerConfig,
247    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync + 'static>> {
248        let schema = config.resolve_schema()?;
249        reject_unsupported_arrow_types(&schema)?;
250        let schema_ref = SchemaRef::new(schema);
251
252        let schema_field_names = schema_ref
253            .fields()
254            .iter()
255            .map(|f| f.name().clone())
256            .collect::<HashSet<_>>();
257
258        let writer_props = Arc::new(
259            WriterProperties::builder()
260                .set_compression(config.compression.try_into()?)
261                .build(),
262        );
263
264        Ok(Self {
265            schema: schema_ref,
266            writer_props,
267            schema_mode: config.schema_mode,
268            schema_field_names,
269            events_dropped_handle: register(EventsDroppedError::from(
270                "Events could not be serialized to parquet",
271            )),
272        })
273    }
274
275    /// Returns the MIME content type for Parquet data.
276    pub const fn content_type(&self) -> &'static str {
277        "application/vnd.apache.parquet"
278    }
279
280    /// Writes `record_batch` into `buffer` as a complete Parquet file.
281    ///
282    /// On failure, emits an [`ArrowWriterError`] internal event (which
283    /// increments `component_errors_total`) before returning the error.
284    /// The caller is responsible for emitting `events_dropped`.
285    fn write_record_batch(
286        record_batch: &RecordBatch,
287        buffer: &mut BytesMut,
288        writer_props: &WriterProperties,
289    ) -> Result<(), parquet::errors::ParquetError> {
290        let mut writer = ArrowWriter::try_new(
291            buffer.writer(),
292            Arc::clone(record_batch.schema_ref()),
293            Some(writer_props.clone()),
294        )
295        .inspect_err(|e| {
296            emit(ArrowWriterError { error: e });
297        })?;
298
299        writer.write(record_batch).inspect_err(|e| {
300            emit(ArrowWriterError { error: e });
301        })?;
302
303        writer.close().inspect_err(|e| {
304            emit(ArrowWriterError { error: e });
305        })?;
306
307        Ok(())
308    }
309}
310
311impl tokio_util::codec::Encoder<Vec<Event>> for ParquetSerializer {
312    type Error = vector_common::Error;
313
314    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
315        if events.is_empty() {
316            return Ok(());
317        }
318
319        let json_values = match vector_log_events_to_json_values(&events) {
320            Ok(values) => values,
321            Err(e) => {
322                emit(JsonSerializationError { error: &e });
323                return Err(Box::new(e));
324            }
325        };
326
327        let non_log_count = events.len() - json_values.len();
328
329        if non_log_count > 0 {
330            warn!(
331                message = "Non-log events dropped by Parquet encoder ",
332                %non_log_count,
333                internal_log_rate_secs = 10,
334            );
335            self.events_dropped_handle.emit(Count(non_log_count))
336        }
337
338        if json_values.is_empty() {
339            return Ok(());
340        }
341
342        match self.schema_mode {
343            // In strict mode, check for extra top-level fields not in the schema.
344            ParquetSchemaMode::Strict => {
345                for event in &events {
346                    if let Some(log) = event.maybe_as_log()
347                        && let Some(object_map) = log.as_map()
348                    {
349                        for top_level in object_map.keys() {
350                            if !self.schema_field_names.contains(top_level.as_str()) {
351                                return Err(Box::new(ArrowEncodingError::SchemaFetchError {
352                                    message: format!(
353                                        "Strict schema mode: event contains field '{top_level}' not in schema",
354                                    ),
355                                }));
356                            }
357                        }
358                    }
359                }
360            }
361            ParquetSchemaMode::AutoInfer => {
362                let schema = ParquetSchemaGenerator::infer_schema(&json_values)?;
363                self.schema = Arc::new(ParquetSchemaGenerator::try_normalize_schema(
364                    &events, schema,
365                ));
366            }
367            ParquetSchemaMode::Relaxed => {}
368        }
369
370        let record_batch =
371            build_record_batch(Arc::clone(&self.schema), &json_values).map_err(Box::new)?;
372
373        Self::write_record_batch(&record_batch, buffer, &self.writer_props).map_err(Box::new)?;
374
375        Ok(())
376    }
377}
378
379pub struct ParquetSchemaGenerator {}
380
381impl ParquetSchemaGenerator {
382    pub fn infer_schema(events: &[serde_json::Value]) -> Result<Schema, Error> {
383        let schema = infer_json_schema_from_iterator(events.iter().map(Ok::<_, ArrowError>))
384            .map_err(|e| {
385                emit(SchemaGenerationError { error: &e });
386                Error::new(ErrorKind::InvalidData, e.to_string())
387            })?;
388
389        Ok(schema)
390    }
391
392    /// Attempt to modify schema to set timestamp fields as Timestamp instead of Utf8.
393    /// Only works for top-level fields.
394    fn try_normalize_schema(events: &[Event], schema: Schema) -> Schema {
395        let mut ts_seen: HashSet<String> = HashSet::new();
396        let mut non_ts_seen: HashSet<String> = HashSet::new();
397
398        for event in events.iter().filter_map(Event::maybe_as_log) {
399            if let Some(object_map) = event.as_map() {
400                for (path, value) in object_map {
401                    if value.is_timestamp() {
402                        ts_seen.insert(path.to_string());
403                    } else if !value.is_null() {
404                        non_ts_seen.insert(path.to_string());
405                    }
406                }
407            }
408        }
409
410        let new_fields: Vec<Field> = schema
411            .fields()
412            .iter()
413            .map(|f| {
414                if ts_seen.contains(f.name()) && !non_ts_seen.contains(f.name()) {
415                    Field::new(
416                        f.name(),
417                        DataType::Timestamp(
418                            arrow::datatypes::TimeUnit::Microsecond,
419                            Some("UTC".into()),
420                        ),
421                        f.is_nullable(),
422                    )
423                } else {
424                    f.as_ref().clone()
425                }
426            })
427            .collect();
428
429        Schema::new_with_metadata(new_fields, schema.metadata().clone())
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use bytes::Bytes;
437    use parquet::file::reader::{FileReader, SerializedFileReader};
438    use parquet::record::reader::RowIter;
439    use tokio_util::codec::Encoder;
440    use vector_core::event::LogEvent;
441
442    fn create_event<V>(fields: Vec<(&str, V)>) -> Event
443    where
444        V: Into<vector_core::event::Value>,
445    {
446        let mut log = LogEvent::default();
447        for (key, value) in fields {
448            log.insert(key, value.into());
449        }
450        Event::Log(log)
451    }
452
453    fn assert_parquet_magic(data: &[u8]) {
454        assert!(data.len() >= 4, "Output too short to be valid Parquet");
455        assert_eq!(&data[..4], b"PAR1", "Missing Parquet magic bytes");
456    }
457
458    fn parquet_row_count(data: &[u8]) -> usize {
459        let reader =
460            SerializedFileReader::new(Bytes::copy_from_slice(data)).expect("Invalid Parquet file");
461        let iter = RowIter::from_file_into(Box::new(reader));
462        iter.count()
463    }
464
465    fn parquet_column_names(data: &[u8]) -> Vec<String> {
466        let reader =
467            SerializedFileReader::new(Bytes::copy_from_slice(data)).expect("Invalid Parquet file");
468        let schema = reader.metadata().file_metadata().schema_descr();
469        schema
470            .columns()
471            .iter()
472            .map(|c| c.name().to_string())
473            .collect()
474    }
475
476    fn parse_timestamp(s: &str) -> chrono::DateTime<chrono::Utc> {
477        chrono::DateTime::parse_from_rfc3339(s)
478            .expect("invalid test timestamp")
479            .with_timezone(&chrono::Utc)
480    }
481
482    fn demo_log_event(
483        message: &str,
484        timestamp: chrono::DateTime<chrono::Utc>,
485        status_code: i64,
486        response_time_secs: f64,
487    ) -> Event {
488        use vector_core::event::Value;
489        let mut log = LogEvent::default();
490        log.insert("host", "localhost");
491        log.insert("message", message);
492        log.insert("service", "vector");
493        log.insert("source_type", "demo_logs");
494        log.insert("timestamp", Value::Timestamp(timestamp));
495        log.insert("random_time", Value::Timestamp(timestamp));
496        log.insert("status_code", Value::Integer(status_code));
497        log.insert("response_time_secs", response_time_secs);
498        Event::Log(log)
499    }
500
501    fn sample_events() -> Vec<Event> {
502        const EVENTS: [(&str, &str, i64, f64); 5] = [
503            (
504                "GET /api/v1/health HTTP/1.1",
505                "2026-03-05T20:49:08.037194Z",
506                200,
507                0.037,
508            ),
509            (
510                "POST /api/v1/ingest HTTP/1.1",
511                "2026-03-05T20:49:09.038051Z",
512                201,
513                0.013,
514            ),
515            (
516                "GET /metrics HTTP/1.1",
517                "2026-03-05T20:49:10.036612Z",
518                200,
519                0.022,
520            ),
521            (
522                "DELETE /api/v1/resource HTTP/1.1",
523                "2026-03-05T20:49:11.537131Z",
524                404,
525                0.005,
526            ),
527            (
528                "PATCH /api/v1/config HTTP/1.1",
529                "2026-03-05T20:49:12.037491Z",
530                500,
531                0.091,
532            ),
533        ];
534        EVENTS
535            .iter()
536            .map(|(msg, ts, status, rt)| demo_log_event(msg, parse_timestamp(ts), *status, *rt))
537            .collect()
538    }
539
540    fn encode_autoinfer_and_read_schema(
541        events: Vec<Event>,
542    ) -> (arrow::datatypes::SchemaRef, usize) {
543        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
544
545        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
546            schema_mode: ParquetSchemaMode::AutoInfer,
547            ..Default::default()
548        })
549        .expect("AutoInfer serializer should be created without a static schema");
550
551        let mut buffer = BytesMut::new();
552        serializer
553            .encode(events, &mut buffer)
554            .expect("encoding should succeed");
555
556        let data = buffer.freeze();
557        assert_parquet_magic(&data);
558
559        let builder = ParquetRecordBatchReaderBuilder::try_new(data)
560            .expect("should build ParquetRecordBatchReaderBuilder");
561        let schema = builder.schema().clone();
562        let num_rows: usize = builder
563            .build()
564            .expect("should build reader")
565            .map(|b| b.expect("batch read error").num_rows())
566            .sum();
567        (schema, num_rows)
568    }
569
570    /// Write a temporary Parquet schema file and return its path.
571    ///
572    /// `name` must be unique per test to avoid parallel-test races on the same file.
573    fn write_temp_schema(name: &str, content: &str) -> std::path::PathBuf {
574        use std::io::Write;
575        let path = std::env::temp_dir().join(format!(
576            "vector_parquet_test_{}_{}.schema",
577            std::process::id(),
578            name,
579        ));
580        let mut f = std::fs::File::create(&path).expect("Failed to create schema file");
581        write!(f, "{content}").expect("Failed to write schema");
582        path
583    }
584
585    // ── AutoInfer mode ───────────────────────────────────────────────────────
586
587    #[test]
588    fn encode_input_produces_parquet_output() {
589        let events = sample_events();
590        let n_events = events.len();
591        let (schema, num_rows) = encode_autoinfer_and_read_schema(events);
592
593        assert_eq!(num_rows, n_events, "row count should match event count");
594
595        for field_name in &["timestamp", "random_time"] {
596            let field = schema
597                .field_with_name(field_name)
598                .unwrap_or_else(|_| panic!("field '{field_name}' should exist in schema"));
599            assert!(
600                matches!(
601                    field.data_type(),
602                    DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, Some(tz)) if tz.as_ref() == "UTC"
603                ),
604                "'{field_name}' should be Timestamp(Microsecond, UTC), got {:?}",
605                field.data_type()
606            );
607        }
608
609        let status_field = schema
610            .field_with_name("status_code")
611            .expect("status_code field should exist");
612        assert_eq!(status_field.data_type(), &DataType::Int64);
613
614        let rt_field = schema
615            .field_with_name("response_time_secs")
616            .expect("response_time_secs field should exist");
617        assert_eq!(rt_field.data_type(), &DataType::Float64);
618
619        for field_name in &["host", "message", "service", "source_type"] {
620            let field = schema
621                .field_with_name(field_name)
622                .unwrap_or_else(|_| panic!("field '{field_name}' should exist in schema"));
623            assert_eq!(field.data_type(), &DataType::Utf8);
624        }
625    }
626
627    #[test]
628    fn test_parquet_empty_events() {
629        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
630            schema_mode: ParquetSchemaMode::AutoInfer,
631            ..Default::default()
632        })
633        .expect("AutoInfer serializer should succeed");
634
635        let events: Vec<Event> = vec![];
636        let mut buffer = BytesMut::new();
637        serializer
638            .encode(events, &mut buffer)
639            .expect("Empty events should succeed");
640
641        assert!(buffer.is_empty(), "Buffer should be empty for empty events");
642    }
643
644    #[test]
645    fn test_parquet_compression_variants() {
646        let events = vec![create_event(vec![("msg", "hello world")])];
647
648        let compressions = vec![
649            ParquetCompression::None,
650            ParquetCompression::Snappy,
651            ParquetCompression::Zstd { level: 1 },
652            ParquetCompression::Gzip { level: 1 },
653            ParquetCompression::Lz4,
654        ];
655
656        for compression in compressions {
657            let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
658                schema_mode: ParquetSchemaMode::AutoInfer,
659                compression,
660                ..Default::default()
661            })
662            .expect("Failed to create serializer");
663
664            let mut buffer = BytesMut::new();
665            serializer
666                .encode(events.clone(), &mut buffer)
667                .unwrap_or_else(|e| panic!("Encoding with {:?} failed: {}", compression, e));
668
669            let data = buffer.freeze();
670            assert_parquet_magic(&data);
671            assert_eq!(
672                parquet_row_count(&data),
673                1,
674                "Wrong row count for {:?}",
675                compression
676            );
677        }
678    }
679
680    #[test]
681    fn test_parquet_output_has_footer() {
682        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
683            schema_mode: ParquetSchemaMode::AutoInfer,
684            ..Default::default()
685        })
686        .expect("AutoInfer serializer should succeed");
687
688        let events = vec![create_event(vec![("msg", "test")])];
689        let mut buffer = BytesMut::new();
690        serializer.encode(events, &mut buffer).unwrap();
691
692        let data = buffer.freeze();
693        let len = data.len();
694        assert!(len >= 8, "Parquet output too short");
695        assert_eq!(
696            &data[len - 4..],
697            b"PAR1",
698            "Parquet footer magic bytes missing"
699        );
700    }
701
702    #[test]
703    fn test_writer_props_arc_shared() {
704        let serializer = ParquetSerializer::new(ParquetSerializerConfig {
705            schema_mode: ParquetSchemaMode::AutoInfer,
706            ..Default::default()
707        })
708        .expect("AutoInfer serializer should succeed");
709        let cloned = serializer.clone();
710
711        assert_eq!(Arc::strong_count(&serializer.writer_props), 2);
712        drop(cloned);
713        assert_eq!(Arc::strong_count(&serializer.writer_props), 1);
714    }
715
716    #[test]
717    fn test_mixed_log_and_non_log_events() {
718        use vector_core::event::{Metric, MetricKind, MetricValue};
719
720        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
721            schema_mode: ParquetSchemaMode::AutoInfer,
722            ..Default::default()
723        })
724        .expect("AutoInfer serializer should succeed");
725
726        let metric = Metric::new(
727            "cpu.usage",
728            MetricKind::Absolute,
729            MetricValue::Gauge { value: 42.0 },
730        );
731        let events = vec![
732            create_event(vec![("msg", "hello")]),
733            Event::Metric(metric),
734            create_event(vec![("msg", "world")]),
735        ];
736
737        let mut buffer = BytesMut::new();
738        serializer
739            .encode(events, &mut buffer)
740            .expect("Mixed batch should succeed (non-log events dropped)");
741
742        assert_parquet_magic(&buffer);
743        assert_eq!(parquet_row_count(&buffer), 2);
744    }
745
746    // ── Schema file mode ─────────────────────────────────────────────────────
747
748    #[test]
749    fn test_parquet_schema_file() {
750        let schema_path = write_temp_schema(
751            "schema_file",
752            "message logs {\n  required binary name (STRING);\n  optional int64 age;\n}",
753        );
754
755        let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
756            "schema_file": schema_path.to_str().unwrap()
757        }))
758        .expect("Config should deserialize");
759
760        let mut serializer =
761            ParquetSerializer::new(config).expect("Should create serializer from schema file");
762
763        let mut log = LogEvent::default();
764        log.insert("name", "alice");
765
766        let mut buffer = BytesMut::new();
767        serializer
768            .encode(vec![Event::Log(log)], &mut buffer)
769            .expect("Encoding with schema file should succeed");
770
771        let data = buffer.freeze();
772        assert_parquet_magic(&data);
773        assert_eq!(parquet_row_count(&data), 1);
774
775        let columns = parquet_column_names(&data);
776        assert_eq!(columns, vec!["name", "age"]);
777    }
778
779    #[test]
780    fn test_parquet_schema_file_not_found_error() {
781        let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
782            "schema_file": "/nonexistent/path/schema.parquet"
783        }))
784        .expect("Config should deserialize");
785
786        let result = ParquetSerializer::new(config);
787        assert!(result.is_err(), "Missing schema file should error");
788        assert!(
789            result.unwrap_err().to_string().contains("Failed to read"),
790            "Error should mention file read failure"
791        );
792    }
793
794    #[test]
795    fn test_parquet_schema_file_invalid_syntax_error() {
796        let schema_path = write_temp_schema(
797            "invalid_syntax",
798            "this is not valid parquet schema syntax !!!",
799        );
800
801        let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
802            "schema_file": schema_path.to_str().unwrap()
803        }))
804        .expect("Config should deserialize");
805
806        let result = ParquetSerializer::new(config);
807        assert!(result.is_err(), "Invalid Parquet schema should error");
808        assert!(
809            result
810                .unwrap_err()
811                .to_string()
812                .contains("Failed to parse Parquet schema"),
813            "Error should mention parsing failure"
814        );
815    }
816
817    #[test]
818    fn test_parquet_no_schema_error() {
819        let config = ParquetSerializerConfig::default();
820        let result = ParquetSerializer::new(config);
821        assert!(
822            result.is_err(),
823            "Should fail without schema_file or auto_infer"
824        );
825    }
826
827    // ── Schema mode: strict / relaxed ────────────────────────────────────────
828
829    #[test]
830    fn test_parquet_strict_mode_rejects_extra_fields() {
831        let schema_path = write_temp_schema(
832            "strict_rejects",
833            "message logs {\n  required binary name (STRING);\n}",
834        );
835
836        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
837            schema_file: Some(schema_path),
838            schema_mode: ParquetSchemaMode::Strict,
839            ..Default::default()
840        })
841        .expect("Failed to create strict serializer");
842
843        let events = vec![create_event(vec![("name", "alice"), ("city", "paris")])];
844        let mut buffer = BytesMut::new();
845        let result = serializer.encode(events, &mut buffer);
846        assert!(result.is_err(), "Strict mode should reject extra fields");
847        assert!(result.unwrap_err().to_string().contains("city"));
848    }
849
850    #[test]
851    fn test_parquet_strict_mode_allows_schema_fields() {
852        let schema_path = write_temp_schema(
853            "strict_allows",
854            "message logs {\n  required binary name (STRING);\n  required binary level (STRING);\n}",
855        );
856
857        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
858            schema_file: Some(schema_path),
859            schema_mode: ParquetSchemaMode::Strict,
860            ..Default::default()
861        })
862        .expect("Failed to create strict serializer");
863
864        let mut log = LogEvent::default();
865        log.insert("name", "test");
866        log.insert("level", "info");
867
868        let mut buffer = BytesMut::new();
869        assert!(
870            serializer
871                .encode(vec![Event::Log(log)], &mut buffer)
872                .is_ok(),
873            "Strict mode should pass when all fields match schema"
874        );
875    }
876
877    #[test]
878    fn test_parquet_relaxed_mode_drops_extra_fields() {
879        let schema_path = write_temp_schema(
880            "relaxed_drops",
881            "message logs {\n  required binary name (STRING);\n}",
882        );
883
884        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
885            schema_file: Some(schema_path),
886            schema_mode: ParquetSchemaMode::Relaxed,
887            ..Default::default()
888        })
889        .expect("Failed to create relaxed serializer");
890
891        let events = vec![create_event(vec![("name", "alice"), ("city", "paris")])];
892        let mut buffer = BytesMut::new();
893        serializer
894            .encode(events, &mut buffer)
895            .expect("Relaxed mode should drop extra fields silently");
896
897        let data = buffer.freeze();
898        assert_parquet_magic(&data);
899        assert_eq!(parquet_row_count(&data), 1);
900        let columns = parquet_column_names(&data);
901        assert_eq!(columns, vec!["name"]);
902    }
903
904    #[test]
905    fn test_parquet_type_mismatch_returns_error() {
906        let schema_path =
907            write_temp_schema("type_mismatch", "message logs {\n  required int64 name;\n}");
908
909        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
910            schema_file: Some(schema_path),
911            schema_mode: ParquetSchemaMode::Relaxed,
912            ..Default::default()
913        })
914        .expect("Failed to create serializer");
915
916        let events = vec![create_event(vec![("name", "not_an_integer")])];
917        let mut buffer = BytesMut::new();
918        let result = serializer.encode(events, &mut buffer);
919        assert!(result.is_err(), "Type mismatch should return an error");
920        let err = result.unwrap_err().to_string();
921        assert!(
922            err.contains("Int64"),
923            "Error should mention the expected type, got: {err}"
924        );
925    }
926
927    #[test]
928    fn test_parquet_schema_file_binary_without_string_annotation_rejected() {
929        // Native Parquet "binary" without (STRING) annotation resolves to Arrow Binary,
930        // which is rejected at config time.
931        let schema_path = write_temp_schema(
932            "binary_rejected",
933            "message logs {\n  required binary name (STRING);\n  optional binary raw_data;\n}",
934        );
935
936        let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
937            "schema_file": schema_path.to_str().unwrap()
938        }))
939        .expect("Config should deserialize");
940
941        let result = ParquetSerializer::new(config);
942        assert!(
943            result.is_err(),
944            "Parquet binary without STRING annotation should be rejected"
945        );
946        assert!(
947            result.unwrap_err().to_string().contains("raw_data"),
948            "Error should name the offending field"
949        );
950    }
951}