1use arrow::{
8 datatypes::{DataType, Field, Fields, Schema, SchemaRef},
9 error::ArrowError,
10 ipc::writer::StreamWriter,
11 json::reader::ReaderBuilder,
12 record_batch::RecordBatch,
13};
14use async_trait::async_trait;
15use bytes::{BufMut, Bytes, BytesMut};
16use snafu::{ResultExt, Snafu, ensure};
17use vector_config::configurable_component;
18use vector_core::event::Event;
19
20#[async_trait]
24pub trait SchemaProvider: Send + Sync + std::fmt::Debug {
25 async fn get_schema(&self) -> Result<Schema, ArrowEncodingError>;
30}
31
32#[configurable_component]
34#[derive(Clone, Default)]
35pub struct ArrowStreamSerializerConfig {
36 #[serde(skip)]
38 #[configurable(derived)]
39 pub schema: Option<arrow::datatypes::Schema>,
40
41 #[serde(default)]
50 #[configurable(derived)]
51 pub allow_nullable_fields: bool,
52}
53
54impl std::fmt::Debug for ArrowStreamSerializerConfig {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("ArrowStreamSerializerConfig")
57 .field(
58 "schema",
59 &self
60 .schema
61 .as_ref()
62 .map(|s| format!("{} fields", s.fields().len())),
63 )
64 .field("allow_nullable_fields", &self.allow_nullable_fields)
65 .finish()
66 }
67}
68
69impl ArrowStreamSerializerConfig {
70 pub fn new(schema: arrow::datatypes::Schema) -> Self {
72 Self {
73 schema: Some(schema),
74 allow_nullable_fields: false,
75 }
76 }
77
78 pub fn input_type(&self) -> vector_core::config::DataType {
80 vector_core::config::DataType::Log
81 }
82
83 pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
85 vector_core::schema::Requirement::empty()
86 }
87}
88
89#[derive(Clone, Debug)]
91pub struct ArrowStreamSerializer {
92 schema: SchemaRef,
93}
94
95impl ArrowStreamSerializer {
96 pub fn encode_to_record_batch(
98 &self,
99 events: &[Event],
100 ) -> Result<RecordBatch, ArrowEncodingError> {
101 let values = vector_log_events_to_json_values(events).map_err(|e| {
102 ArrowEncodingError::RecordBatchCreation {
103 source: arrow::error::ArrowError::JsonError(e.to_string()),
104 }
105 })?;
106 build_record_batch(self.schema.clone(), &values)
107 }
108
109 pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, ArrowEncodingError> {
111 let schema = config.schema.ok_or(ArrowEncodingError::MissingSchema)?;
112
113 let schema = if config.allow_nullable_fields {
116 let nullable_fields: Fields = schema
117 .fields()
118 .iter()
119 .map(|f| make_field_nullable(f))
120 .collect::<Result<Vec<_>, _>>()?
121 .into();
122 Schema::new_with_metadata(nullable_fields, schema.metadata().clone())
123 } else {
124 schema
125 };
126
127 Ok(Self {
128 schema: SchemaRef::new(schema),
129 })
130 }
131}
132
133impl tokio_util::codec::Encoder<Vec<Event>> for ArrowStreamSerializer {
134 type Error = ArrowEncodingError;
135
136 fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
137 if events.is_empty() {
138 return Err(ArrowEncodingError::NoEvents);
139 }
140
141 let bytes = encode_events_to_arrow_ipc_stream(&events, self.schema.clone())?;
142
143 buffer.extend_from_slice(&bytes);
144 Ok(())
145 }
146}
147
148#[derive(Debug, Snafu)]
150pub enum ArrowEncodingError {
151 #[snafu(display("Failed to create Arrow record batch: {source}"))]
153 RecordBatchCreation {
154 source: arrow::error::ArrowError,
156 },
157
158 #[snafu(display("Failed to write Arrow IPC data: {source}"))]
160 IpcWrite {
161 source: arrow::error::ArrowError,
163 },
164
165 #[snafu(display("No events provided for encoding"))]
167 NoEvents,
168
169 #[snafu(display("Failed to fetch schema from provider: {message}"))]
171 SchemaFetchError {
172 message: String,
174 },
175
176 #[snafu(display("Null value for non-nullable field '{field_name}'"))]
178 NullConstraint {
179 field_name: String,
181 },
182
183 #[snafu(display("Arrow serializer requires a schema"))]
185 MissingSchema,
186
187 #[snafu(display("IO error: {source}"), context(false))]
189 Io {
190 source: std::io::Error,
192 },
193
194 #[snafu(display("Arrow JSON decoding error: {source}"))]
196 ArrowJsonDecode {
197 source: arrow::error::ArrowError,
199 },
200
201 #[snafu(display("Invalid Map schema for field '{field_name}': {reason}"))]
203 InvalidMapSchema {
204 field_name: String,
206 reason: String,
208 },
209}
210
211pub fn encode_events_to_arrow_ipc_stream(
213 events: &[Event],
214 schema: SchemaRef,
215) -> Result<Bytes, ArrowEncodingError> {
216 if events.is_empty() {
217 return Err(ArrowEncodingError::NoEvents);
218 }
219
220 let json_values = vector_log_events_to_json_values(events).map_err(|e| {
221 ArrowEncodingError::RecordBatchCreation {
222 source: ArrowError::JsonError(e.to_string()),
223 }
224 })?;
225
226 let record_batch = build_record_batch(schema, &json_values)?;
227
228 let mut buffer = BytesMut::new().writer();
229 let mut writer =
230 StreamWriter::try_new(&mut buffer, record_batch.schema_ref()).context(IpcWriteSnafu)?;
231 writer.write(&record_batch).context(IpcWriteSnafu)?;
232 writer.finish().context(IpcWriteSnafu)?;
233
234 Ok(buffer.into_inner().freeze())
235}
236
237fn make_field_nullable(field: &Field) -> Result<Field, ArrowEncodingError> {
239 let new_data_type = match field.data_type() {
240 DataType::List(inner_field) => DataType::List(make_field_nullable(inner_field)?.into()),
241 DataType::Struct(fields) => DataType::Struct(
242 fields
243 .iter()
244 .map(|f| make_field_nullable(f))
245 .collect::<Result<Vec<_>, _>>()?
246 .into(),
247 ),
248 DataType::Map(inner, sorted) => {
249 let DataType::Struct(fields) = inner.data_type() else {
251 return InvalidMapSchemaSnafu {
252 field_name: field.name(),
253 reason: format!("inner type must be Struct, found {:?}", inner.data_type()),
254 }
255 .fail();
256 };
257
258 ensure!(
259 fields.len() == 2,
260 InvalidMapSchemaSnafu {
261 field_name: field.name(),
262 reason: format!("expected 2 fields (key, value), found {}", fields.len()),
263 },
264 );
265 let key_field = &fields[0];
266 let value_field = &fields[1];
267
268 let new_struct_fields: Fields =
269 [key_field.clone(), make_field_nullable(value_field)?.into()].into();
270
271 let new_inner_field = inner
274 .as_ref()
275 .clone()
276 .with_data_type(DataType::Struct(new_struct_fields))
277 .with_nullable(false);
278
279 DataType::Map(new_inner_field.into(), *sorted)
280 }
281 other => other.clone(),
282 };
283
284 Ok(field
285 .clone()
286 .with_data_type(new_data_type)
287 .with_nullable(true))
288}
289
290pub fn find_null_non_nullable_fields<'a>(
293 schema: &'a Schema,
294 values: &[serde_json::Value],
295) -> Vec<&'a str> {
296 schema
297 .fields()
298 .iter()
299 .filter(|field| {
300 !field.is_nullable()
301 && values.iter().any(|value| {
302 value
303 .as_object()
304 .and_then(|map| map.get(field.name().as_str()))
305 .is_none_or(serde_json::Value::is_null)
306 })
307 })
308 .map(|field| field.name().as_str())
309 .collect()
310}
311
312pub(crate) fn vector_log_events_to_json_values(
313 events: &[Event],
314) -> Result<Vec<serde_json::Value>, serde_json::Error> {
315 events
316 .iter()
317 .filter_map(Event::maybe_as_log)
318 .map(serde_json::to_value)
319 .collect()
320}
321
322pub(crate) fn build_record_batch(
324 schema: SchemaRef,
325 values: &[serde_json::Value],
326) -> Result<RecordBatch, ArrowEncodingError> {
327 if values.is_empty() {
328 return Err(ArrowEncodingError::NoEvents);
329 }
330
331 let missing = find_null_non_nullable_fields(&schema, values);
332 if !missing.is_empty() {
333 let error: vector_common::Error = Box::new(ArrowEncodingError::NullConstraint {
334 field_name: missing.join(", "),
335 });
336 vector_common::internal_event::emit(crate::internal_events::EncoderNullConstraintError {
337 error: &error,
338 });
339 return Err(ArrowEncodingError::NullConstraint {
340 field_name: missing.join(", "),
341 });
342 }
343
344 let mut decoder = ReaderBuilder::new(schema)
345 .build_decoder()
346 .inspect_err(|e| {
347 vector_common::internal_event::emit(crate::internal_events::EncoderRecordBatchError {
348 error: e,
349 error_code: "arrow_record_batch_creation",
350 });
351 })
352 .context(RecordBatchCreationSnafu)?;
353
354 decoder
355 .serialize(values)
356 .inspect_err(|e| {
357 vector_common::internal_event::emit(crate::internal_events::EncoderRecordBatchError {
358 error: e,
359 error_code: "arrow_json_decode",
360 });
361 })
362 .context(ArrowJsonDecodeSnafu)?;
363
364 decoder
365 .flush()
366 .inspect_err(|e| {
367 vector_common::internal_event::emit(crate::internal_events::EncoderRecordBatchError {
368 error: e,
369 error_code: "arrow_json_decode",
370 });
371 })
372 .context(ArrowJsonDecodeSnafu)?
373 .ok_or(ArrowEncodingError::NoEvents)
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379 use arrow::{
380 array::{Array, AsArray},
381 datatypes::TimeUnit,
382 ipc::reader::StreamReader,
383 };
384 use chrono::Utc;
385 use std::io::Cursor;
386 use vector_core::event::{LogEvent, Value};
387
388 fn encode_and_decode(
390 events: Vec<Event>,
391 schema: SchemaRef,
392 ) -> Result<RecordBatch, Box<dyn std::error::Error>> {
393 let bytes = encode_events_to_arrow_ipc_stream(&events, schema.clone())?;
394 let cursor = Cursor::new(bytes);
395 let mut reader = StreamReader::try_new(cursor, None)?;
396 Ok(reader.next().unwrap()?)
397 }
398
399 fn create_event<V>(fields: Vec<(&str, V)>) -> Event
401 where
402 V: Into<Value>,
403 {
404 let mut log = LogEvent::default();
405 for (key, value) in fields {
406 log.insert(key, value.into());
407 }
408 Event::Log(log)
409 }
410
411 mod comprehensive {
412 use super::*;
413
414 #[test]
415 fn test_encode_all_types() {
416 use arrow::datatypes::{
417 Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
418 Int64Type, TimestampMillisecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
419 };
420 use vrl::value::ObjectMap;
421
422 let now = Utc::now();
423
424 let mut tuple_value = ObjectMap::new();
426 tuple_value.insert("f0".into(), Value::Bytes("nested_str".into()));
427 tuple_value.insert("f1".into(), Value::Integer(999));
428
429 let mut named_tuple_value = ObjectMap::new();
431 named_tuple_value.insert("category".into(), Value::Bytes("test_category".into()));
432 named_tuple_value.insert("tag".into(), Value::Bytes("test_tag".into()));
433
434 let list_value = Value::Array(vec![
436 Value::Integer(1),
437 Value::Integer(2),
438 Value::Integer(3),
439 ]);
440
441 let mut map_value = ObjectMap::new();
443 map_value.insert("key1".into(), Value::Integer(100));
444 map_value.insert("key2".into(), Value::Integer(200));
445
446 let mut log = LogEvent::default();
447 log.insert("string_field", "test");
449 log.insert("int8_field", 127);
450 log.insert("int16_field", 32000);
451 log.insert("int32_field", 1000000);
452 log.insert("int64_field", 42);
453 log.insert("uint8_field", 255);
454 log.insert("uint16_field", 65535);
455 log.insert("uint32_field", 4000000);
456 log.insert("uint64_field", 9000000000_i64);
457 log.insert("float32_field", 3.15);
458 log.insert("float64_field", 3.15);
459 log.insert("bool_field", true);
460 log.insert("timestamp_field", now);
461 log.insert("decimal_field", 99.99);
462 log.insert("list_field", list_value);
464 log.insert("struct_field", Value::Object(tuple_value));
465 log.insert("named_struct_field", Value::Object(named_tuple_value));
466 log.insert("map_field", Value::Object(map_value));
467
468 let events = vec![Event::Log(log)];
469
470 let struct_fields = arrow::datatypes::Fields::from(vec![
472 Field::new("f0", DataType::Utf8, true),
473 Field::new("f1", DataType::Int64, true),
474 ]);
475
476 let named_struct_fields = arrow::datatypes::Fields::from(vec![
477 Field::new("category", DataType::Utf8, true),
478 Field::new("tag", DataType::Utf8, true),
479 ]);
480
481 let map_entries = Field::new(
482 "entries",
483 DataType::Struct(arrow::datatypes::Fields::from(vec![
484 Field::new("keys", DataType::Utf8, false),
485 Field::new("values", DataType::Int64, true),
486 ])),
487 false,
488 );
489
490 let schema = Schema::new(vec![
491 Field::new("string_field", DataType::Utf8, true),
492 Field::new("int8_field", DataType::Int8, true),
493 Field::new("int16_field", DataType::Int16, true),
494 Field::new("int32_field", DataType::Int32, true),
495 Field::new("int64_field", DataType::Int64, true),
496 Field::new("uint8_field", DataType::UInt8, true),
497 Field::new("uint16_field", DataType::UInt16, true),
498 Field::new("uint32_field", DataType::UInt32, true),
499 Field::new("uint64_field", DataType::UInt64, true),
500 Field::new("float32_field", DataType::Float32, true),
501 Field::new("float64_field", DataType::Float64, true),
502 Field::new("bool_field", DataType::Boolean, true),
503 Field::new(
504 "timestamp_field",
505 DataType::Timestamp(TimeUnit::Millisecond, None),
506 true,
507 ),
508 Field::new("decimal_field", DataType::Decimal128(10, 2), true),
509 Field::new(
510 "list_field",
511 DataType::List(Field::new("item", DataType::Int64, true).into()),
512 true,
513 ),
514 Field::new("struct_field", DataType::Struct(struct_fields), true),
515 Field::new(
516 "named_struct_field",
517 DataType::Struct(named_struct_fields),
518 true,
519 ),
520 Field::new("map_field", DataType::Map(map_entries.into(), false), true),
521 ])
522 .into();
523
524 let batch = encode_and_decode(events, schema).expect("Failed to encode");
525
526 assert_eq!(batch.num_rows(), 1);
527 assert_eq!(batch.num_columns(), 18);
528
529 assert_eq!(batch.column(0).as_string::<i32>().value(0), "test");
531 assert_eq!(batch.column(1).as_primitive::<Int8Type>().value(0), 127);
532 assert_eq!(batch.column(2).as_primitive::<Int16Type>().value(0), 32000);
533 assert_eq!(
534 batch.column(3).as_primitive::<Int32Type>().value(0),
535 1000000
536 );
537 assert_eq!(batch.column(4).as_primitive::<Int64Type>().value(0), 42);
538 assert_eq!(batch.column(5).as_primitive::<UInt8Type>().value(0), 255);
539 assert_eq!(batch.column(6).as_primitive::<UInt16Type>().value(0), 65535);
540 assert_eq!(
541 batch.column(7).as_primitive::<UInt32Type>().value(0),
542 4000000
543 );
544 assert_eq!(
545 batch.column(8).as_primitive::<UInt64Type>().value(0),
546 9000000000
547 );
548 assert!((batch.column(9).as_primitive::<Float32Type>().value(0) - 3.15).abs() < 0.001);
549 assert!((batch.column(10).as_primitive::<Float64Type>().value(0) - 3.15).abs() < 0.001);
550 assert!(batch.column(11).as_boolean().value(0));
551 assert_eq!(
552 batch
553 .column(12)
554 .as_primitive::<TimestampMillisecondType>()
555 .value(0),
556 now.timestamp_millis()
557 );
558 assert_eq!(
559 batch.column(13).as_primitive::<Decimal128Type>().value(0),
560 9999
561 );
562
563 let list_array = batch.column(14).as_list::<i32>();
564 assert!(!list_array.is_null(0));
565 let list_values = list_array.value(0);
566 assert_eq!(list_values.len(), 3);
567 let int_array = list_values.as_primitive::<Int64Type>();
568 assert_eq!(int_array.value(0), 1);
569 assert_eq!(int_array.value(1), 2);
570 assert_eq!(int_array.value(2), 3);
571
572 let struct_array = batch.column(15).as_struct();
574 assert!(!struct_array.is_null(0));
575 assert_eq!(
576 struct_array.column(0).as_string::<i32>().value(0),
577 "nested_str"
578 );
579 assert_eq!(
580 struct_array.column(1).as_primitive::<Int64Type>().value(0),
581 999
582 );
583
584 let named_struct_array = batch.column(16).as_struct();
586 assert!(!named_struct_array.is_null(0));
587 assert_eq!(
588 named_struct_array.column(0).as_string::<i32>().value(0),
589 "test_category"
590 );
591 assert_eq!(
592 named_struct_array.column(1).as_string::<i32>().value(0),
593 "test_tag"
594 );
595
596 let map_array = batch.column(17).as_map();
598 assert!(!map_array.is_null(0));
599 let map_value = map_array.value(0);
600 assert_eq!(map_value.len(), 2);
601 }
602 }
603
604 mod error_handling {
605 use super::*;
606
607 #[test]
608 fn test_encode_empty_events() {
609 let schema = Schema::new(vec![Field::new("message", DataType::Utf8, true)]).into();
610 let events: Vec<Event> = vec![];
611 let result = encode_events_to_arrow_ipc_stream(&events, schema);
612 assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents));
613 }
614
615 #[test]
616 fn test_missing_non_nullable_field_errors() {
617 let events = vec![create_event(vec![("other_field", "value")])];
618
619 let schema = Schema::new(vec![Field::new(
620 "required_field",
621 DataType::Utf8,
622 false, )])
624 .into();
625
626 let result = encode_events_to_arrow_ipc_stream(&events, schema);
627 assert!(result.is_err());
628 }
629 }
630
631 mod temporal_types {
632 use super::*;
633 use arrow::datatypes::{
634 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
635 TimestampSecondType,
636 };
637
638 #[test]
639 fn test_encode_timestamp_precisions() {
640 let now = Utc::now();
641 let mut log = LogEvent::default();
642 log.insert("ts_second", now);
643 log.insert("ts_milli", now);
644 log.insert("ts_micro", now);
645 log.insert("ts_nano", now);
646
647 let events = vec![Event::Log(log)];
648
649 let schema = Schema::new(vec![
650 Field::new(
651 "ts_second",
652 DataType::Timestamp(TimeUnit::Second, None),
653 true,
654 ),
655 Field::new(
656 "ts_milli",
657 DataType::Timestamp(TimeUnit::Millisecond, None),
658 true,
659 ),
660 Field::new(
661 "ts_micro",
662 DataType::Timestamp(TimeUnit::Microsecond, None),
663 true,
664 ),
665 Field::new(
666 "ts_nano",
667 DataType::Timestamp(TimeUnit::Nanosecond, None),
668 true,
669 ),
670 ])
671 .into();
672
673 let batch = encode_and_decode(events, schema).unwrap();
674
675 assert_eq!(batch.num_rows(), 1);
676 assert_eq!(batch.num_columns(), 4);
677
678 let ts_second = batch.column(0).as_primitive::<TimestampSecondType>();
679 assert!(!ts_second.is_null(0));
680 assert_eq!(ts_second.value(0), now.timestamp());
681
682 let ts_milli = batch.column(1).as_primitive::<TimestampMillisecondType>();
683 assert!(!ts_milli.is_null(0));
684 assert_eq!(ts_milli.value(0), now.timestamp_millis());
685
686 let ts_micro = batch.column(2).as_primitive::<TimestampMicrosecondType>();
687 assert!(!ts_micro.is_null(0));
688 assert_eq!(ts_micro.value(0), now.timestamp_micros());
689
690 let ts_nano = batch.column(3).as_primitive::<TimestampNanosecondType>();
691 assert!(!ts_nano.is_null(0));
692 assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap());
693 }
694
695 #[test]
696 fn test_encode_mixed_timestamp_string_native_and_integer() {
697 let now = Utc::now();
698
699 let mut log1 = LogEvent::default();
700 log1.insert("ts", "2025-10-22T10:18:44.256Z"); let mut log2 = LogEvent::default();
703 log2.insert("ts", now); let mut log3 = LogEvent::default();
706 log3.insert("ts", 1729594724256000000_i64); let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
709
710 let schema = Schema::new(vec![Field::new(
711 "ts",
712 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
713 true,
714 )])
715 .into();
716
717 let batch = encode_and_decode(events, schema).unwrap();
718
719 assert_eq!(batch.num_rows(), 3);
720
721 let ts_array = batch.column(0).as_primitive::<TimestampNanosecondType>();
722
723 assert!(!ts_array.is_null(0));
725 assert!(!ts_array.is_null(1));
726 assert!(!ts_array.is_null(2));
727
728 let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z")
730 .unwrap()
731 .timestamp_nanos_opt()
732 .unwrap();
733 assert_eq!(ts_array.value(0), expected);
734
735 assert_eq!(ts_array.value(1), now.timestamp_nanos_opt().unwrap());
737
738 assert_eq!(ts_array.value(2), 1729594724256000000_i64);
740 }
741 }
742
743 mod config_tests {
744 use super::*;
745 use tokio_util::codec::Encoder;
746
747 #[test]
748 fn test_config_allow_nullable_fields_overrides_schema() {
749 let mut log1 = LogEvent::default();
750 log1.insert("strict_field", 42);
751 let log2 = LogEvent::default();
752 let events = vec![Event::Log(log1), Event::Log(log2)];
753
754 let schema = Schema::new(vec![Field::new("strict_field", DataType::Int64, false)]);
755
756 let mut config = ArrowStreamSerializerConfig::new(schema);
757 config.allow_nullable_fields = true;
758
759 let mut serializer =
760 ArrowStreamSerializer::new(config).expect("Failed to create serializer");
761
762 let mut buffer = BytesMut::new();
763 serializer
764 .encode(events, &mut buffer)
765 .expect("Encoding should succeed when allow_nullable_fields is true");
766
767 let cursor = Cursor::new(buffer);
768 let mut reader = StreamReader::try_new(cursor, None).expect("Failed to create reader");
769 let batch = reader.next().unwrap().expect("Failed to read batch");
770
771 assert_eq!(batch.num_rows(), 2);
772
773 let binding = batch.schema();
774 let output_field = binding.field(0);
775 assert!(
776 output_field.is_nullable(),
777 "The output schema field should have been transformed to nullable=true"
778 );
779
780 let array = batch
781 .column(0)
782 .as_primitive::<arrow::datatypes::Int64Type>();
783
784 assert_eq!(array.value(0), 42);
785 assert!(!array.is_null(0));
786 assert!(
787 array.is_null(1),
788 "The missing value should be encoded as null"
789 );
790 }
791
792 #[test]
793 fn test_make_field_nullable_with_nested_types() {
794 let inner_struct_field = Field::new("nested_field", DataType::Int64, false);
795 let inner_struct =
796 DataType::Struct(arrow::datatypes::Fields::from(vec![inner_struct_field]));
797 let list_field = Field::new("item", inner_struct, false);
798 let list_type = DataType::List(list_field.into());
799 let outer_field = Field::new("inner_list", list_type, false);
800 let outer_struct = DataType::Struct(arrow::datatypes::Fields::from(vec![outer_field]));
801
802 let original_field = Field::new("root", outer_struct, false);
803 let nullable_field = make_field_nullable(&original_field).unwrap();
804
805 assert!(
806 nullable_field.is_nullable(),
807 "Root field should be nullable"
808 );
809
810 if let DataType::Struct(root_fields) = nullable_field.data_type() {
811 let inner_list_field = &root_fields[0];
812 assert!(inner_list_field.is_nullable());
813
814 if let DataType::List(list_item_field) = inner_list_field.data_type() {
815 assert!(list_item_field.is_nullable());
816
817 if let DataType::Struct(inner_struct_fields) = list_item_field.data_type() {
818 let nested_field = &inner_struct_fields[0];
819 assert!(nested_field.is_nullable());
820 } else {
821 panic!("Expected Struct type for list items");
822 }
823 } else {
824 panic!("Expected List type for inner_list");
825 }
826 } else {
827 panic!("Expected Struct type for root field");
828 }
829 }
830
831 #[test]
832 fn test_make_field_nullable_with_map_type() {
833 let key_field = Field::new("key", DataType::Utf8, false);
834 let value_field = Field::new("value", DataType::Int64, false);
835 let entries_struct =
836 DataType::Struct(arrow::datatypes::Fields::from(vec![key_field, value_field]));
837 let entries_field = Field::new("entries", entries_struct, false);
838 let map_type = DataType::Map(entries_field.into(), false);
839
840 let original_field = Field::new("my_map", map_type, false);
841 let nullable_field = make_field_nullable(&original_field).unwrap();
842
843 assert!(
844 nullable_field.is_nullable(),
845 "Root map field should be nullable"
846 );
847
848 if let DataType::Map(entries_field, _sorted) = nullable_field.data_type() {
849 assert!(
850 !entries_field.is_nullable(),
851 "Map entries field should be non-nullable"
852 );
853
854 if let DataType::Struct(struct_fields) = entries_field.data_type() {
855 let key_field = &struct_fields[0];
856 let value_field = &struct_fields[1];
857 assert!(
858 !key_field.is_nullable(),
859 "Map key field should be non-nullable"
860 );
861 assert!(
862 value_field.is_nullable(),
863 "Map value field should be nullable"
864 );
865 } else {
866 panic!("Expected Struct type for map entries");
867 }
868 } else {
869 panic!("Expected Map type for my_map field");
870 }
871 }
872 }
873
874 mod null_non_nullable {
875 use super::*;
876
877 #[test]
878 fn test_missing_non_nullable_field_error_names_fields() {
879 let schema: SchemaRef = Schema::new(vec![
880 Field::new("required_field", DataType::Utf8, false),
881 Field::new("optional_field", DataType::Utf8, true),
882 ])
883 .into();
884
885 let event = create_event(vec![("optional_field", "hello")]);
887
888 let result = encode_events_to_arrow_ipc_stream(&[event], schema);
889 let err = result.unwrap_err().to_string();
890 assert!(
891 err.contains("required_field"),
892 "Error should name the missing field, got: {err}"
893 );
894 assert!(
895 !err.contains("optional_field"),
896 "Error should not name nullable fields, got: {err}"
897 );
898 }
899
900 #[test]
901 fn test_null_value_in_non_nullable_field_error_names_fields() {
902 let schema: SchemaRef = Schema::new(vec![
903 Field::new("id", DataType::Int64, false),
904 Field::new("name", DataType::Utf8, false),
905 ])
906 .into();
907
908 let event = create_event(vec![("id", Value::Integer(1))]);
910
911 let result = encode_events_to_arrow_ipc_stream(&[event], schema);
912 let err = result.unwrap_err().to_string();
913 assert!(
914 err.contains("name"),
915 "Error should name the null field, got: {err}"
916 );
917 }
918
919 #[test]
920 fn test_find_null_non_nullable_fields_returns_empty_when_all_present() {
921 let schema = Schema::new(vec![
922 Field::new("a", DataType::Utf8, false),
923 Field::new("b", DataType::Int64, false),
924 ]);
925
926 let event = create_event(vec![
927 ("a", Value::Bytes("val".into())),
928 ("b", Value::Integer(42)),
929 ]);
930 let missing = find_null_non_nullable_fields(
931 &schema,
932 &vector_log_events_to_json_values(&[event]).unwrap(),
933 );
934 assert!(
935 missing.is_empty(),
936 "Expected no missing fields, got: {missing:?}"
937 );
938 }
939
940 #[test]
941 fn test_find_null_non_nullable_fields_detects_explicit_null() {
942 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
943
944 let event = create_event(vec![("a", Value::Null)]);
945 let missing = find_null_non_nullable_fields(
946 &schema,
947 &vector_log_events_to_json_values(&[event]).unwrap(),
948 );
949 assert_eq!(missing, vec!["a"]);
950 }
951 }
952}