codecs/encoding/format/
arrow.rs

1//! Arrow IPC streaming format codec for batched event encoding
2//!
3//! Provides Apache Arrow IPC stream format encoding with static schema support.
4//! This implements the streaming variant of the Arrow IPC protocol, which writes
5//! a continuous stream of record batches without a file footer.
6
7use arrow::{
8    datatypes::{DataType, Field, Fields, Schema, SchemaRef},
9    error::ArrowError,
10    ipc::writer::StreamWriter,
11    json::reader::ReaderBuilder,
12    record_batch::RecordBatch,
13};
14use async_trait::async_trait;
15use bytes::{BufMut, Bytes, BytesMut};
16use snafu::{ResultExt, Snafu, ensure};
17use vector_config::configurable_component;
18use vector_core::event::Event;
19
20/// Provides Arrow schema for encoding.
21///
22/// Sinks can implement this trait to provide custom schema fetching logic.
23#[async_trait]
24pub trait SchemaProvider: Send + Sync + std::fmt::Debug {
25    /// Fetch the Arrow schema from the data store.
26    ///
27    /// This is called during sink configuration build phase to fetch
28    /// the schema once at startup, rather than at runtime.
29    async fn get_schema(&self) -> Result<Schema, ArrowEncodingError>;
30}
31
32/// Configuration for Arrow IPC stream serialization
33#[configurable_component]
34#[derive(Clone, Default)]
35pub struct ArrowStreamSerializerConfig {
36    /// The Arrow schema to use for encoding
37    #[serde(skip)]
38    #[configurable(derived)]
39    pub schema: Option<arrow::datatypes::Schema>,
40
41    /// Allow null values for non-nullable fields in the schema.
42    ///
43    /// When enabled, missing or incompatible values are encoded as null, even for fields
44    /// marked as non-nullable in the Arrow schema. This is useful when working with downstream
45    /// systems that can handle null values through defaults, computed columns, or other mechanisms.
46    ///
47    /// When disabled (default), missing values for non-nullable fields results in encoding errors. This is to
48    /// help ensure all required data is present before sending it to the sink.
49    #[serde(default)]
50    #[configurable(derived)]
51    pub allow_nullable_fields: bool,
52}
53
54impl std::fmt::Debug for ArrowStreamSerializerConfig {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("ArrowStreamSerializerConfig")
57            .field(
58                "schema",
59                &self
60                    .schema
61                    .as_ref()
62                    .map(|s| format!("{} fields", s.fields().len())),
63            )
64            .field("allow_nullable_fields", &self.allow_nullable_fields)
65            .finish()
66    }
67}
68
69impl ArrowStreamSerializerConfig {
70    /// Create a new ArrowStreamSerializerConfig with a schema
71    pub fn new(schema: arrow::datatypes::Schema) -> Self {
72        Self {
73            schema: Some(schema),
74            allow_nullable_fields: false,
75        }
76    }
77
78    /// The data type of events that are accepted by `ArrowStreamEncoder`.
79    pub fn input_type(&self) -> vector_core::config::DataType {
80        vector_core::config::DataType::Log
81    }
82
83    /// The schema required by the serializer.
84    pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
85        vector_core::schema::Requirement::empty()
86    }
87}
88
89/// Arrow IPC stream batch serializer that holds the schema
90#[derive(Clone, Debug)]
91pub struct ArrowStreamSerializer {
92    schema: SchemaRef,
93}
94
95impl ArrowStreamSerializer {
96    /// Encode events into a `RecordBatch` without writing to IPC stream format.
97    pub fn encode_to_record_batch(
98        &self,
99        events: &[Event],
100    ) -> Result<RecordBatch, ArrowEncodingError> {
101        let values = vector_log_events_to_json_values(events).map_err(|e| {
102            ArrowEncodingError::RecordBatchCreation {
103                source: arrow::error::ArrowError::JsonError(e.to_string()),
104            }
105        })?;
106        build_record_batch(self.schema.clone(), &values)
107    }
108
109    /// Create a new ArrowStreamSerializer with the given configuration
110    pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, ArrowEncodingError> {
111        let schema = config.schema.ok_or(ArrowEncodingError::MissingSchema)?;
112
113        // If allow_nullable_fields is enabled, transform the schema once here
114        // instead of on every batch encoding
115        let schema = if config.allow_nullable_fields {
116            let nullable_fields: Fields = schema
117                .fields()
118                .iter()
119                .map(|f| make_field_nullable(f))
120                .collect::<Result<Vec<_>, _>>()?
121                .into();
122            Schema::new_with_metadata(nullable_fields, schema.metadata().clone())
123        } else {
124            schema
125        };
126
127        Ok(Self {
128            schema: SchemaRef::new(schema),
129        })
130    }
131}
132
133impl tokio_util::codec::Encoder<Vec<Event>> for ArrowStreamSerializer {
134    type Error = ArrowEncodingError;
135
136    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
137        if events.is_empty() {
138            return Err(ArrowEncodingError::NoEvents);
139        }
140
141        let bytes = encode_events_to_arrow_ipc_stream(&events, self.schema.clone())?;
142
143        buffer.extend_from_slice(&bytes);
144        Ok(())
145    }
146}
147
148/// Errors that can occur during Arrow encoding
149#[derive(Debug, Snafu)]
150pub enum ArrowEncodingError {
151    /// Failed to create Arrow record batch
152    #[snafu(display("Failed to create Arrow record batch: {source}"))]
153    RecordBatchCreation {
154        /// The underlying Arrow error
155        source: arrow::error::ArrowError,
156    },
157
158    /// Failed to write Arrow IPC data
159    #[snafu(display("Failed to write Arrow IPC data: {source}"))]
160    IpcWrite {
161        /// The underlying Arrow error
162        source: arrow::error::ArrowError,
163    },
164
165    /// No events provided for encoding
166    #[snafu(display("No events provided for encoding"))]
167    NoEvents,
168
169    /// Failed to fetch schema from provider
170    #[snafu(display("Failed to fetch schema from provider: {message}"))]
171    SchemaFetchError {
172        /// Error message from the provider
173        message: String,
174    },
175
176    /// Null value encountered for non-nullable field
177    #[snafu(display("Null value for non-nullable field '{field_name}'"))]
178    NullConstraint {
179        /// The field name
180        field_name: String,
181    },
182
183    /// Arrow serializer requires a schema
184    #[snafu(display("Arrow serializer requires a schema"))]
185    MissingSchema,
186
187    /// IO error during encoding
188    #[snafu(display("IO error: {source}"), context(false))]
189    Io {
190        /// The underlying IO error
191        source: std::io::Error,
192    },
193
194    /// Arrow JSON decoding error
195    #[snafu(display("Arrow JSON decoding error: {source}"))]
196    ArrowJsonDecode {
197        /// The underlying Arrow error
198        source: arrow::error::ArrowError,
199    },
200
201    /// Invalid Map schema structure
202    #[snafu(display("Invalid Map schema for field '{field_name}': {reason}"))]
203    InvalidMapSchema {
204        /// The field name
205        field_name: String,
206        /// Description of the schema violation
207        reason: String,
208    },
209}
210
211/// Encodes a batch of events into Arrow IPC streaming format
212pub fn encode_events_to_arrow_ipc_stream(
213    events: &[Event],
214    schema: SchemaRef,
215) -> Result<Bytes, ArrowEncodingError> {
216    if events.is_empty() {
217        return Err(ArrowEncodingError::NoEvents);
218    }
219
220    let json_values = vector_log_events_to_json_values(events).map_err(|e| {
221        ArrowEncodingError::RecordBatchCreation {
222            source: ArrowError::JsonError(e.to_string()),
223        }
224    })?;
225
226    let record_batch = build_record_batch(schema, &json_values)?;
227
228    let mut buffer = BytesMut::new().writer();
229    let mut writer =
230        StreamWriter::try_new(&mut buffer, record_batch.schema_ref()).context(IpcWriteSnafu)?;
231    writer.write(&record_batch).context(IpcWriteSnafu)?;
232    writer.finish().context(IpcWriteSnafu)?;
233
234    Ok(buffer.into_inner().freeze())
235}
236
237/// Recursively makes a Field and all its nested fields nullable
238fn make_field_nullable(field: &Field) -> Result<Field, ArrowEncodingError> {
239    let new_data_type = match field.data_type() {
240        DataType::List(inner_field) => DataType::List(make_field_nullable(inner_field)?.into()),
241        DataType::Struct(fields) => DataType::Struct(
242            fields
243                .iter()
244                .map(|f| make_field_nullable(f))
245                .collect::<Result<Vec<_>, _>>()?
246                .into(),
247        ),
248        DataType::Map(inner, sorted) => {
249            // A Map's inner field is a "entries" Struct<Key, Value>
250            let DataType::Struct(fields) = inner.data_type() else {
251                return InvalidMapSchemaSnafu {
252                    field_name: field.name(),
253                    reason: format!("inner type must be Struct, found {:?}", inner.data_type()),
254                }
255                .fail();
256            };
257
258            ensure!(
259                fields.len() == 2,
260                InvalidMapSchemaSnafu {
261                    field_name: field.name(),
262                    reason: format!("expected 2 fields (key, value), found {}", fields.len()),
263                },
264            );
265            let key_field = &fields[0];
266            let value_field = &fields[1];
267
268            let new_struct_fields: Fields =
269                [key_field.clone(), make_field_nullable(value_field)?.into()].into();
270
271            // Reconstruct the inner "entries" field
272            // The inner field itself must be non-nullable (only the Map wrapper is nullable)
273            let new_inner_field = inner
274                .as_ref()
275                .clone()
276                .with_data_type(DataType::Struct(new_struct_fields))
277                .with_nullable(false);
278
279            DataType::Map(new_inner_field.into(), *sorted)
280        }
281        other => other.clone(),
282    };
283
284    Ok(field
285        .clone()
286        .with_data_type(new_data_type)
287        .with_nullable(true))
288}
289
290/// Returns true if the field is absent from the value's object map, or explicitly null.
291/// Find non-nullable schema fields that are missing or null in any of the given events.
292pub fn find_null_non_nullable_fields<'a>(
293    schema: &'a Schema,
294    values: &[serde_json::Value],
295) -> Vec<&'a str> {
296    schema
297        .fields()
298        .iter()
299        .filter(|field| {
300            !field.is_nullable()
301                && values.iter().any(|value| {
302                    value
303                        .as_object()
304                        .and_then(|map| map.get(field.name().as_str()))
305                        .is_none_or(serde_json::Value::is_null)
306                })
307        })
308        .map(|field| field.name().as_str())
309        .collect()
310}
311
312pub(crate) fn vector_log_events_to_json_values(
313    events: &[Event],
314) -> Result<Vec<serde_json::Value>, serde_json::Error> {
315    events
316        .iter()
317        .filter_map(Event::maybe_as_log)
318        .map(serde_json::to_value)
319        .collect()
320}
321
322/// Build an Arrow RecordBatch from a slice of events using the provided schema.
323pub(crate) fn build_record_batch(
324    schema: SchemaRef,
325    values: &[serde_json::Value],
326) -> Result<RecordBatch, ArrowEncodingError> {
327    if values.is_empty() {
328        return Err(ArrowEncodingError::NoEvents);
329    }
330
331    let missing = find_null_non_nullable_fields(&schema, values);
332    if !missing.is_empty() {
333        let error: vector_common::Error = Box::new(ArrowEncodingError::NullConstraint {
334            field_name: missing.join(", "),
335        });
336        vector_common::internal_event::emit(crate::internal_events::EncoderNullConstraintError {
337            error: &error,
338        });
339        return Err(ArrowEncodingError::NullConstraint {
340            field_name: missing.join(", "),
341        });
342    }
343
344    let mut decoder = ReaderBuilder::new(schema)
345        .build_decoder()
346        .inspect_err(|e| {
347            vector_common::internal_event::emit(crate::internal_events::EncoderRecordBatchError {
348                error: e,
349                error_code: "arrow_record_batch_creation",
350            });
351        })
352        .context(RecordBatchCreationSnafu)?;
353
354    decoder
355        .serialize(values)
356        .inspect_err(|e| {
357            vector_common::internal_event::emit(crate::internal_events::EncoderRecordBatchError {
358                error: e,
359                error_code: "arrow_json_decode",
360            });
361        })
362        .context(ArrowJsonDecodeSnafu)?;
363
364    decoder
365        .flush()
366        .inspect_err(|e| {
367            vector_common::internal_event::emit(crate::internal_events::EncoderRecordBatchError {
368                error: e,
369                error_code: "arrow_json_decode",
370            });
371        })
372        .context(ArrowJsonDecodeSnafu)?
373        .ok_or(ArrowEncodingError::NoEvents)
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379    use arrow::{
380        array::{Array, AsArray},
381        datatypes::TimeUnit,
382        ipc::reader::StreamReader,
383    };
384    use chrono::Utc;
385    use std::io::Cursor;
386    use vector_core::event::{LogEvent, Value};
387
388    /// Helper to encode events and return the decoded RecordBatch
389    fn encode_and_decode(
390        events: Vec<Event>,
391        schema: SchemaRef,
392    ) -> Result<RecordBatch, Box<dyn std::error::Error>> {
393        let bytes = encode_events_to_arrow_ipc_stream(&events, schema.clone())?;
394        let cursor = Cursor::new(bytes);
395        let mut reader = StreamReader::try_new(cursor, None)?;
396        Ok(reader.next().unwrap()?)
397    }
398
399    /// Create a simple event from key-value pairs
400    fn create_event<V>(fields: Vec<(&str, V)>) -> Event
401    where
402        V: Into<Value>,
403    {
404        let mut log = LogEvent::default();
405        for (key, value) in fields {
406            log.insert(key, value.into());
407        }
408        Event::Log(log)
409    }
410
411    mod comprehensive {
412        use super::*;
413
414        #[test]
415        fn test_encode_all_types() {
416            use arrow::datatypes::{
417                Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
418                Int64Type, TimestampMillisecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
419            };
420            use vrl::value::ObjectMap;
421
422            let now = Utc::now();
423
424            // Create a struct (tuple) value with unnamed fields
425            let mut tuple_value = ObjectMap::new();
426            tuple_value.insert("f0".into(), Value::Bytes("nested_str".into()));
427            tuple_value.insert("f1".into(), Value::Integer(999));
428
429            // Create a named struct (named tuple) value
430            let mut named_tuple_value = ObjectMap::new();
431            named_tuple_value.insert("category".into(), Value::Bytes("test_category".into()));
432            named_tuple_value.insert("tag".into(), Value::Bytes("test_tag".into()));
433
434            // Create a list value
435            let list_value = Value::Array(vec![
436                Value::Integer(1),
437                Value::Integer(2),
438                Value::Integer(3),
439            ]);
440
441            // Create a map value
442            let mut map_value = ObjectMap::new();
443            map_value.insert("key1".into(), Value::Integer(100));
444            map_value.insert("key2".into(), Value::Integer(200));
445
446            let mut log = LogEvent::default();
447            // Primitive types
448            log.insert("string_field", "test");
449            log.insert("int8_field", 127);
450            log.insert("int16_field", 32000);
451            log.insert("int32_field", 1000000);
452            log.insert("int64_field", 42);
453            log.insert("uint8_field", 255);
454            log.insert("uint16_field", 65535);
455            log.insert("uint32_field", 4000000);
456            log.insert("uint64_field", 9000000000_i64);
457            log.insert("float32_field", 3.15);
458            log.insert("float64_field", 3.15);
459            log.insert("bool_field", true);
460            log.insert("timestamp_field", now);
461            log.insert("decimal_field", 99.99);
462            // Complex types
463            log.insert("list_field", list_value);
464            log.insert("struct_field", Value::Object(tuple_value));
465            log.insert("named_struct_field", Value::Object(named_tuple_value));
466            log.insert("map_field", Value::Object(map_value));
467
468            let events = vec![Event::Log(log)];
469
470            // Build schema with all supported types
471            let struct_fields = arrow::datatypes::Fields::from(vec![
472                Field::new("f0", DataType::Utf8, true),
473                Field::new("f1", DataType::Int64, true),
474            ]);
475
476            let named_struct_fields = arrow::datatypes::Fields::from(vec![
477                Field::new("category", DataType::Utf8, true),
478                Field::new("tag", DataType::Utf8, true),
479            ]);
480
481            let map_entries = Field::new(
482                "entries",
483                DataType::Struct(arrow::datatypes::Fields::from(vec![
484                    Field::new("keys", DataType::Utf8, false),
485                    Field::new("values", DataType::Int64, true),
486                ])),
487                false,
488            );
489
490            let schema = Schema::new(vec![
491                Field::new("string_field", DataType::Utf8, true),
492                Field::new("int8_field", DataType::Int8, true),
493                Field::new("int16_field", DataType::Int16, true),
494                Field::new("int32_field", DataType::Int32, true),
495                Field::new("int64_field", DataType::Int64, true),
496                Field::new("uint8_field", DataType::UInt8, true),
497                Field::new("uint16_field", DataType::UInt16, true),
498                Field::new("uint32_field", DataType::UInt32, true),
499                Field::new("uint64_field", DataType::UInt64, true),
500                Field::new("float32_field", DataType::Float32, true),
501                Field::new("float64_field", DataType::Float64, true),
502                Field::new("bool_field", DataType::Boolean, true),
503                Field::new(
504                    "timestamp_field",
505                    DataType::Timestamp(TimeUnit::Millisecond, None),
506                    true,
507                ),
508                Field::new("decimal_field", DataType::Decimal128(10, 2), true),
509                Field::new(
510                    "list_field",
511                    DataType::List(Field::new("item", DataType::Int64, true).into()),
512                    true,
513                ),
514                Field::new("struct_field", DataType::Struct(struct_fields), true),
515                Field::new(
516                    "named_struct_field",
517                    DataType::Struct(named_struct_fields),
518                    true,
519                ),
520                Field::new("map_field", DataType::Map(map_entries.into(), false), true),
521            ])
522            .into();
523
524            let batch = encode_and_decode(events, schema).expect("Failed to encode");
525
526            assert_eq!(batch.num_rows(), 1);
527            assert_eq!(batch.num_columns(), 18);
528
529            // Verify all primitive types
530            assert_eq!(batch.column(0).as_string::<i32>().value(0), "test");
531            assert_eq!(batch.column(1).as_primitive::<Int8Type>().value(0), 127);
532            assert_eq!(batch.column(2).as_primitive::<Int16Type>().value(0), 32000);
533            assert_eq!(
534                batch.column(3).as_primitive::<Int32Type>().value(0),
535                1000000
536            );
537            assert_eq!(batch.column(4).as_primitive::<Int64Type>().value(0), 42);
538            assert_eq!(batch.column(5).as_primitive::<UInt8Type>().value(0), 255);
539            assert_eq!(batch.column(6).as_primitive::<UInt16Type>().value(0), 65535);
540            assert_eq!(
541                batch.column(7).as_primitive::<UInt32Type>().value(0),
542                4000000
543            );
544            assert_eq!(
545                batch.column(8).as_primitive::<UInt64Type>().value(0),
546                9000000000
547            );
548            assert!((batch.column(9).as_primitive::<Float32Type>().value(0) - 3.15).abs() < 0.001);
549            assert!((batch.column(10).as_primitive::<Float64Type>().value(0) - 3.15).abs() < 0.001);
550            assert!(batch.column(11).as_boolean().value(0));
551            assert_eq!(
552                batch
553                    .column(12)
554                    .as_primitive::<TimestampMillisecondType>()
555                    .value(0),
556                now.timestamp_millis()
557            );
558            assert_eq!(
559                batch.column(13).as_primitive::<Decimal128Type>().value(0),
560                9999
561            );
562
563            let list_array = batch.column(14).as_list::<i32>();
564            assert!(!list_array.is_null(0));
565            let list_values = list_array.value(0);
566            assert_eq!(list_values.len(), 3);
567            let int_array = list_values.as_primitive::<Int64Type>();
568            assert_eq!(int_array.value(0), 1);
569            assert_eq!(int_array.value(1), 2);
570            assert_eq!(int_array.value(2), 3);
571
572            // Verify struct field (unnamed)
573            let struct_array = batch.column(15).as_struct();
574            assert!(!struct_array.is_null(0));
575            assert_eq!(
576                struct_array.column(0).as_string::<i32>().value(0),
577                "nested_str"
578            );
579            assert_eq!(
580                struct_array.column(1).as_primitive::<Int64Type>().value(0),
581                999
582            );
583
584            // Verify named struct field (named tuple)
585            let named_struct_array = batch.column(16).as_struct();
586            assert!(!named_struct_array.is_null(0));
587            assert_eq!(
588                named_struct_array.column(0).as_string::<i32>().value(0),
589                "test_category"
590            );
591            assert_eq!(
592                named_struct_array.column(1).as_string::<i32>().value(0),
593                "test_tag"
594            );
595
596            // Verify map field
597            let map_array = batch.column(17).as_map();
598            assert!(!map_array.is_null(0));
599            let map_value = map_array.value(0);
600            assert_eq!(map_value.len(), 2);
601        }
602    }
603
604    mod error_handling {
605        use super::*;
606
607        #[test]
608        fn test_encode_empty_events() {
609            let schema = Schema::new(vec![Field::new("message", DataType::Utf8, true)]).into();
610            let events: Vec<Event> = vec![];
611            let result = encode_events_to_arrow_ipc_stream(&events, schema);
612            assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents));
613        }
614
615        #[test]
616        fn test_missing_non_nullable_field_errors() {
617            let events = vec![create_event(vec![("other_field", "value")])];
618
619            let schema = Schema::new(vec![Field::new(
620                "required_field",
621                DataType::Utf8,
622                false, // non-nullable
623            )])
624            .into();
625
626            let result = encode_events_to_arrow_ipc_stream(&events, schema);
627            assert!(result.is_err());
628        }
629    }
630
631    mod temporal_types {
632        use super::*;
633        use arrow::datatypes::{
634            TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
635            TimestampSecondType,
636        };
637
638        #[test]
639        fn test_encode_timestamp_precisions() {
640            let now = Utc::now();
641            let mut log = LogEvent::default();
642            log.insert("ts_second", now);
643            log.insert("ts_milli", now);
644            log.insert("ts_micro", now);
645            log.insert("ts_nano", now);
646
647            let events = vec![Event::Log(log)];
648
649            let schema = Schema::new(vec![
650                Field::new(
651                    "ts_second",
652                    DataType::Timestamp(TimeUnit::Second, None),
653                    true,
654                ),
655                Field::new(
656                    "ts_milli",
657                    DataType::Timestamp(TimeUnit::Millisecond, None),
658                    true,
659                ),
660                Field::new(
661                    "ts_micro",
662                    DataType::Timestamp(TimeUnit::Microsecond, None),
663                    true,
664                ),
665                Field::new(
666                    "ts_nano",
667                    DataType::Timestamp(TimeUnit::Nanosecond, None),
668                    true,
669                ),
670            ])
671            .into();
672
673            let batch = encode_and_decode(events, schema).unwrap();
674
675            assert_eq!(batch.num_rows(), 1);
676            assert_eq!(batch.num_columns(), 4);
677
678            let ts_second = batch.column(0).as_primitive::<TimestampSecondType>();
679            assert!(!ts_second.is_null(0));
680            assert_eq!(ts_second.value(0), now.timestamp());
681
682            let ts_milli = batch.column(1).as_primitive::<TimestampMillisecondType>();
683            assert!(!ts_milli.is_null(0));
684            assert_eq!(ts_milli.value(0), now.timestamp_millis());
685
686            let ts_micro = batch.column(2).as_primitive::<TimestampMicrosecondType>();
687            assert!(!ts_micro.is_null(0));
688            assert_eq!(ts_micro.value(0), now.timestamp_micros());
689
690            let ts_nano = batch.column(3).as_primitive::<TimestampNanosecondType>();
691            assert!(!ts_nano.is_null(0));
692            assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap());
693        }
694
695        #[test]
696        fn test_encode_mixed_timestamp_string_native_and_integer() {
697            let now = Utc::now();
698
699            let mut log1 = LogEvent::default();
700            log1.insert("ts", "2025-10-22T10:18:44.256Z"); // RFC3339 String
701
702            let mut log2 = LogEvent::default();
703            log2.insert("ts", now); // Native Timestamp
704
705            let mut log3 = LogEvent::default();
706            log3.insert("ts", 1729594724256000000_i64); // Integer (nanoseconds)
707
708            let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
709
710            let schema = Schema::new(vec![Field::new(
711                "ts",
712                DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
713                true,
714            )])
715            .into();
716
717            let batch = encode_and_decode(events, schema).unwrap();
718
719            assert_eq!(batch.num_rows(), 3);
720
721            let ts_array = batch.column(0).as_primitive::<TimestampNanosecondType>();
722
723            // All three should be non-null
724            assert!(!ts_array.is_null(0));
725            assert!(!ts_array.is_null(1));
726            assert!(!ts_array.is_null(2));
727
728            // First one should match the parsed RFC3339 string
729            let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z")
730                .unwrap()
731                .timestamp_nanos_opt()
732                .unwrap();
733            assert_eq!(ts_array.value(0), expected);
734
735            // Second one should match the native timestamp
736            assert_eq!(ts_array.value(1), now.timestamp_nanos_opt().unwrap());
737
738            // Third one should match the integer
739            assert_eq!(ts_array.value(2), 1729594724256000000_i64);
740        }
741    }
742
743    mod config_tests {
744        use super::*;
745        use tokio_util::codec::Encoder;
746
747        #[test]
748        fn test_config_allow_nullable_fields_overrides_schema() {
749            let mut log1 = LogEvent::default();
750            log1.insert("strict_field", 42);
751            let log2 = LogEvent::default();
752            let events = vec![Event::Log(log1), Event::Log(log2)];
753
754            let schema = Schema::new(vec![Field::new("strict_field", DataType::Int64, false)]);
755
756            let mut config = ArrowStreamSerializerConfig::new(schema);
757            config.allow_nullable_fields = true;
758
759            let mut serializer =
760                ArrowStreamSerializer::new(config).expect("Failed to create serializer");
761
762            let mut buffer = BytesMut::new();
763            serializer
764                .encode(events, &mut buffer)
765                .expect("Encoding should succeed when allow_nullable_fields is true");
766
767            let cursor = Cursor::new(buffer);
768            let mut reader = StreamReader::try_new(cursor, None).expect("Failed to create reader");
769            let batch = reader.next().unwrap().expect("Failed to read batch");
770
771            assert_eq!(batch.num_rows(), 2);
772
773            let binding = batch.schema();
774            let output_field = binding.field(0);
775            assert!(
776                output_field.is_nullable(),
777                "The output schema field should have been transformed to nullable=true"
778            );
779
780            let array = batch
781                .column(0)
782                .as_primitive::<arrow::datatypes::Int64Type>();
783
784            assert_eq!(array.value(0), 42);
785            assert!(!array.is_null(0));
786            assert!(
787                array.is_null(1),
788                "The missing value should be encoded as null"
789            );
790        }
791
792        #[test]
793        fn test_make_field_nullable_with_nested_types() {
794            let inner_struct_field = Field::new("nested_field", DataType::Int64, false);
795            let inner_struct =
796                DataType::Struct(arrow::datatypes::Fields::from(vec![inner_struct_field]));
797            let list_field = Field::new("item", inner_struct, false);
798            let list_type = DataType::List(list_field.into());
799            let outer_field = Field::new("inner_list", list_type, false);
800            let outer_struct = DataType::Struct(arrow::datatypes::Fields::from(vec![outer_field]));
801
802            let original_field = Field::new("root", outer_struct, false);
803            let nullable_field = make_field_nullable(&original_field).unwrap();
804
805            assert!(
806                nullable_field.is_nullable(),
807                "Root field should be nullable"
808            );
809
810            if let DataType::Struct(root_fields) = nullable_field.data_type() {
811                let inner_list_field = &root_fields[0];
812                assert!(inner_list_field.is_nullable());
813
814                if let DataType::List(list_item_field) = inner_list_field.data_type() {
815                    assert!(list_item_field.is_nullable());
816
817                    if let DataType::Struct(inner_struct_fields) = list_item_field.data_type() {
818                        let nested_field = &inner_struct_fields[0];
819                        assert!(nested_field.is_nullable());
820                    } else {
821                        panic!("Expected Struct type for list items");
822                    }
823                } else {
824                    panic!("Expected List type for inner_list");
825                }
826            } else {
827                panic!("Expected Struct type for root field");
828            }
829        }
830
831        #[test]
832        fn test_make_field_nullable_with_map_type() {
833            let key_field = Field::new("key", DataType::Utf8, false);
834            let value_field = Field::new("value", DataType::Int64, false);
835            let entries_struct =
836                DataType::Struct(arrow::datatypes::Fields::from(vec![key_field, value_field]));
837            let entries_field = Field::new("entries", entries_struct, false);
838            let map_type = DataType::Map(entries_field.into(), false);
839
840            let original_field = Field::new("my_map", map_type, false);
841            let nullable_field = make_field_nullable(&original_field).unwrap();
842
843            assert!(
844                nullable_field.is_nullable(),
845                "Root map field should be nullable"
846            );
847
848            if let DataType::Map(entries_field, _sorted) = nullable_field.data_type() {
849                assert!(
850                    !entries_field.is_nullable(),
851                    "Map entries field should be non-nullable"
852                );
853
854                if let DataType::Struct(struct_fields) = entries_field.data_type() {
855                    let key_field = &struct_fields[0];
856                    let value_field = &struct_fields[1];
857                    assert!(
858                        !key_field.is_nullable(),
859                        "Map key field should be non-nullable"
860                    );
861                    assert!(
862                        value_field.is_nullable(),
863                        "Map value field should be nullable"
864                    );
865                } else {
866                    panic!("Expected Struct type for map entries");
867                }
868            } else {
869                panic!("Expected Map type for my_map field");
870            }
871        }
872    }
873
874    mod null_non_nullable {
875        use super::*;
876
877        #[test]
878        fn test_missing_non_nullable_field_error_names_fields() {
879            let schema: SchemaRef = Schema::new(vec![
880                Field::new("required_field", DataType::Utf8, false),
881                Field::new("optional_field", DataType::Utf8, true),
882            ])
883            .into();
884
885            // Event is missing "required_field" entirely
886            let event = create_event(vec![("optional_field", "hello")]);
887
888            let result = encode_events_to_arrow_ipc_stream(&[event], schema);
889            let err = result.unwrap_err().to_string();
890            assert!(
891                err.contains("required_field"),
892                "Error should name the missing field, got: {err}"
893            );
894            assert!(
895                !err.contains("optional_field"),
896                "Error should not name nullable fields, got: {err}"
897            );
898        }
899
900        #[test]
901        fn test_null_value_in_non_nullable_field_error_names_fields() {
902            let schema: SchemaRef = Schema::new(vec![
903                Field::new("id", DataType::Int64, false),
904                Field::new("name", DataType::Utf8, false),
905            ])
906            .into();
907
908            // Event has "id" but "name" is null
909            let event = create_event(vec![("id", Value::Integer(1))]);
910
911            let result = encode_events_to_arrow_ipc_stream(&[event], schema);
912            let err = result.unwrap_err().to_string();
913            assert!(
914                err.contains("name"),
915                "Error should name the null field, got: {err}"
916            );
917        }
918
919        #[test]
920        fn test_find_null_non_nullable_fields_returns_empty_when_all_present() {
921            let schema = Schema::new(vec![
922                Field::new("a", DataType::Utf8, false),
923                Field::new("b", DataType::Int64, false),
924            ]);
925
926            let event = create_event(vec![
927                ("a", Value::Bytes("val".into())),
928                ("b", Value::Integer(42)),
929            ]);
930            let missing = find_null_non_nullable_fields(
931                &schema,
932                &vector_log_events_to_json_values(&[event]).unwrap(),
933            );
934            assert!(
935                missing.is_empty(),
936                "Expected no missing fields, got: {missing:?}"
937            );
938        }
939
940        #[test]
941        fn test_find_null_non_nullable_fields_detects_explicit_null() {
942            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
943
944            let event = create_event(vec![("a", Value::Null)]);
945            let missing = find_null_non_nullable_fields(
946                &schema,
947                &vector_log_events_to_json_values(&[event]).unwrap(),
948            );
949            assert_eq!(missing, vec!["a"]);
950        }
951    }
952}