vector/sinks/util/
encoding.rs

1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7    EstimatedJsonEncodedSizeOf,
8    codecs::{Transformer, encoding::Framer, internal_events::EncoderWriteError},
9    config::telemetry,
10    request_metadata::GroupedCountByteSize,
11};
12
13use crate::event::Event;
14
15pub trait Encoder<T> {
16    /// Encodes the input into the provided writer.
17    ///
18    /// # Errors
19    ///
20    /// If an I/O error is encountered while encoding the input, an error variant will be returned.
21    fn encode_input(
22        &self,
23        input: T,
24        writer: &mut dyn io::Write,
25    ) -> io::Result<(usize, GroupedCountByteSize)>;
26}
27
28impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::Encoder<Framer>) {
29    fn encode_input(
30        &self,
31        events: Vec<Event>,
32        writer: &mut dyn io::Write,
33    ) -> io::Result<(usize, GroupedCountByteSize)> {
34        let mut encoder = self.1.clone();
35        let mut bytes_written = 0;
36        let mut n_events_pending = events.len();
37        let is_empty = events.is_empty();
38        let batch_prefix = encoder.batch_prefix();
39        write_all(writer, n_events_pending, batch_prefix)?;
40        bytes_written += batch_prefix.len();
41
42        let mut byte_size = telemetry().create_request_count_byte_size();
43
44        for (position, mut event) in events.into_iter().with_position() {
45            self.0.transform(&mut event);
46
47            // Ensure the json size is calculated after any fields have been removed
48            // by the transformer.
49            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
50
51            let mut bytes = BytesMut::new();
52            match (position, encoder.framer()) {
53                (
54                    Position::Last | Position::Only,
55                    Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
56                ) => {
57                    encoder
58                        .serialize(event, &mut bytes)
59                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
60                }
61                _ => {
62                    encoder
63                        .encode(event, &mut bytes)
64                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
65                }
66            }
67            write_all(writer, n_events_pending, &bytes)?;
68            bytes_written += bytes.len();
69            n_events_pending -= 1;
70        }
71
72        let batch_suffix = encoder.batch_suffix(is_empty);
73        assert!(n_events_pending == 0);
74        write_all(writer, 0, batch_suffix)?;
75        bytes_written += batch_suffix.len();
76
77        Ok((bytes_written, byte_size))
78    }
79}
80
81impl Encoder<Event> for (Transformer, vector_lib::codecs::Encoder<()>) {
82    fn encode_input(
83        &self,
84        mut event: Event,
85        writer: &mut dyn io::Write,
86    ) -> io::Result<(usize, GroupedCountByteSize)> {
87        let mut encoder = self.1.clone();
88        self.0.transform(&mut event);
89
90        let mut byte_size = telemetry().create_request_count_byte_size();
91        byte_size.add_event(&event, event.estimated_json_encoded_size_of());
92
93        let mut bytes = BytesMut::new();
94        encoder
95            .serialize(event, &mut bytes)
96            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
97        write_all(writer, 1, &bytes)?;
98        Ok((bytes.len(), byte_size))
99    }
100}
101
102impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::BatchEncoder) {
103    fn encode_input(
104        &self,
105        events: Vec<Event>,
106        writer: &mut dyn io::Write,
107    ) -> io::Result<(usize, GroupedCountByteSize)> {
108        use tokio_util::codec::Encoder as _;
109        use vector_lib::internal_event::{ComponentEventsDropped, UNINTENTIONAL};
110
111        let mut encoder = self.1.clone();
112        let mut byte_size = telemetry().create_request_count_byte_size();
113        let n_events = events.len();
114        let mut transformed_events = Vec::with_capacity(n_events);
115
116        for mut event in events {
117            self.0.transform(&mut event);
118            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
119            transformed_events.push(event);
120        }
121
122        let mut bytes = BytesMut::new();
123        encoder
124            .encode(transformed_events, &mut bytes)
125            .map_err(|error| {
126                // Codec error paths emit their own internal event
127                // (e.g. SchemaGenerationError, EncoderNullConstraintError,
128                // EncoderRecordBatchError) which logs the error and increments
129                // component_errors_total. We only emit the drop count here to
130                // avoid double-counting.
131                // n_events is the pre-filter count; Parquet filters non-log
132                // events before encoding, but that only happens if a sink is
133                // misconfigured to send non-log events into a log-only encoder,
134                // so the overcount is not a practical concern.
135                emit!(ComponentEventsDropped::<UNINTENTIONAL> {
136                    count: n_events,
137                    reason: "Failed to batch encode events.",
138                });
139                io::Error::new(io::ErrorKind::InvalidData, error)
140            })?;
141
142        write_all(writer, n_events, &bytes)?;
143        Ok((bytes.len(), byte_size))
144    }
145}
146
147impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::EncoderKind) {
148    fn encode_input(
149        &self,
150        events: Vec<Event>,
151        writer: &mut dyn io::Write,
152    ) -> io::Result<(usize, GroupedCountByteSize)> {
153        // Delegate to the specific encoder implementation
154        match &self.1 {
155            vector_lib::codecs::EncoderKind::Framed(encoder) => {
156                (self.0.clone(), *encoder.clone()).encode_input(events, writer)
157            }
158            vector_lib::codecs::EncoderKind::Batch(encoder) => {
159                (self.0.clone(), encoder.clone()).encode_input(events, writer)
160            }
161        }
162    }
163}
164
165/// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the
166/// instrumentation spec- as this necessitates both an Error and EventsDropped event.
167///
168/// # Arguments
169///
170/// * `writer`           - The object implementing io::Write to write data to.
171/// * `n_events_pending` - The number of events that are dropped if this write fails.
172/// * `buf`              - The buffer to write.
173pub fn write_all(
174    writer: &mut dyn io::Write,
175    n_events_pending: usize,
176    buf: &[u8],
177) -> io::Result<()> {
178    writer.write_all(buf).inspect_err(|error| {
179        emit!(EncoderWriteError {
180            error,
181            count: n_events_pending,
182        });
183    })
184}
185
186pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
187where
188    F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
189    E: Into<io::Error> + 'static,
190{
191    struct Tracked<'inner> {
192        count: usize,
193        inner: &'inner mut dyn io::Write,
194    }
195
196    impl io::Write for Tracked<'_> {
197        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
198            #[allow(clippy::disallowed_methods)] // We pass on the result of `write` to the caller.
199            let n = self.inner.write(buf)?;
200            self.count += n;
201            Ok(n)
202        }
203
204        fn flush(&mut self) -> io::Result<()> {
205            self.inner.flush()
206        }
207    }
208
209    let mut tracked = Tracked { count: 0, inner };
210    f(&mut tracked, input).map_err(|e| e.into())?;
211    Ok(tracked.count)
212}
213
214#[cfg(test)]
215mod tests {
216    use std::{collections::BTreeMap, env, path::PathBuf};
217
218    use bytes::{BufMut, Bytes};
219    use cfg_if::cfg_if;
220    use vector_lib::{
221        codecs::{
222            CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
223            NewlineDelimitedEncoder, TextSerializerConfig,
224            encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
225        },
226        event::LogEvent,
227        internal_event::CountByteSize,
228        json_size::JsonSize,
229    };
230    use vrl::value::{KeyString, Value};
231
232    cfg_if! {
233        if #[cfg(feature = "codecs-arrow")] {
234            use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
235            use vector_lib::codecs::{
236                BatchEncoder,
237                encoding::{ArrowStreamSerializer, ArrowStreamSerializerConfig, BatchSerializer},
238            };
239            use vector_lib::event_test_util::{clear_recorded_events, contains_name_once};
240        }
241    }
242
243    use super::*;
244
245    #[test]
246    fn test_encode_batch_json_empty() {
247        let encoding = (
248            Transformer::default(),
249            vector_lib::codecs::Encoder::<Framer>::new(
250                CharacterDelimitedEncoder::new(b',').into(),
251                JsonSerializerConfig::default().build().into(),
252            ),
253        );
254
255        let mut writer = Vec::new();
256        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
257        assert_eq!(written, 2);
258
259        assert_eq!(String::from_utf8(writer).unwrap(), "[]");
260        assert_eq!(
261            CountByteSize(0, JsonSize::zero()),
262            json_size.size().unwrap()
263        );
264    }
265
266    #[test]
267    fn test_encode_batch_json_single() {
268        let encoding = (
269            Transformer::default(),
270            vector_lib::codecs::Encoder::<Framer>::new(
271                CharacterDelimitedEncoder::new(b',').into(),
272                JsonSerializerConfig::default().build().into(),
273            ),
274        );
275
276        let mut writer = Vec::new();
277        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
278            KeyString::from("key"),
279            Value::from("value"),
280        )])))];
281
282        let input_json_size = input
283            .iter()
284            .map(|event| event.estimated_json_encoded_size_of())
285            .sum::<JsonSize>();
286
287        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
288        assert_eq!(written, 17);
289
290        assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
291        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
292    }
293
294    #[test]
295    fn test_encode_batch_json_multiple() {
296        let encoding = (
297            Transformer::default(),
298            vector_lib::codecs::Encoder::<Framer>::new(
299                CharacterDelimitedEncoder::new(b',').into(),
300                JsonSerializerConfig::default().build().into(),
301            ),
302        );
303
304        let input = vec![
305            Event::Log(LogEvent::from(BTreeMap::from([(
306                KeyString::from("key"),
307                Value::from("value1"),
308            )]))),
309            Event::Log(LogEvent::from(BTreeMap::from([(
310                KeyString::from("key"),
311                Value::from("value2"),
312            )]))),
313            Event::Log(LogEvent::from(BTreeMap::from([(
314                KeyString::from("key"),
315                Value::from("value3"),
316            )]))),
317        ];
318
319        let input_json_size = input
320            .iter()
321            .map(|event| event.estimated_json_encoded_size_of())
322            .sum::<JsonSize>();
323
324        let mut writer = Vec::new();
325        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
326        assert_eq!(written, 52);
327
328        assert_eq!(
329            String::from_utf8(writer).unwrap(),
330            r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
331        );
332
333        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
334    }
335
336    #[test]
337    fn test_encode_batch_ndjson_empty() {
338        let encoding = (
339            Transformer::default(),
340            vector_lib::codecs::Encoder::<Framer>::new(
341                NewlineDelimitedEncoder::default().into(),
342                JsonSerializerConfig::default().build().into(),
343            ),
344        );
345
346        let mut writer = Vec::new();
347        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
348        assert_eq!(written, 0);
349
350        assert_eq!(String::from_utf8(writer).unwrap(), "");
351        assert_eq!(
352            CountByteSize(0, JsonSize::zero()),
353            json_size.size().unwrap()
354        );
355    }
356
357    #[test]
358    fn test_encode_batch_ndjson_single() {
359        let encoding = (
360            Transformer::default(),
361            vector_lib::codecs::Encoder::<Framer>::new(
362                NewlineDelimitedEncoder::default().into(),
363                JsonSerializerConfig::default().build().into(),
364            ),
365        );
366
367        let mut writer = Vec::new();
368        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
369            KeyString::from("key"),
370            Value::from("value"),
371        )])))];
372        let input_json_size = input
373            .iter()
374            .map(|event| event.estimated_json_encoded_size_of())
375            .sum::<JsonSize>();
376
377        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
378        assert_eq!(written, 16);
379
380        assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
381        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
382    }
383
384    #[test]
385    fn test_encode_batch_ndjson_multiple() {
386        let encoding = (
387            Transformer::default(),
388            vector_lib::codecs::Encoder::<Framer>::new(
389                NewlineDelimitedEncoder::default().into(),
390                JsonSerializerConfig::default().build().into(),
391            ),
392        );
393
394        let mut writer = Vec::new();
395        let input = vec![
396            Event::Log(LogEvent::from(BTreeMap::from([(
397                KeyString::from("key"),
398                Value::from("value1"),
399            )]))),
400            Event::Log(LogEvent::from(BTreeMap::from([(
401                KeyString::from("key"),
402                Value::from("value2"),
403            )]))),
404            Event::Log(LogEvent::from(BTreeMap::from([(
405                KeyString::from("key"),
406                Value::from("value3"),
407            )]))),
408        ];
409        let input_json_size = input
410            .iter()
411            .map(|event| event.estimated_json_encoded_size_of())
412            .sum::<JsonSize>();
413
414        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
415        assert_eq!(written, 51);
416
417        assert_eq!(
418            String::from_utf8(writer).unwrap(),
419            "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
420        );
421        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
422    }
423
424    #[test]
425    fn test_encode_event_json() {
426        let encoding = (
427            Transformer::default(),
428            vector_lib::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
429        );
430
431        let mut writer = Vec::new();
432        let input = Event::Log(LogEvent::from(BTreeMap::from([(
433            KeyString::from("key"),
434            Value::from("value"),
435        )])));
436        let input_json_size = input.estimated_json_encoded_size_of();
437
438        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
439        assert_eq!(written, 15);
440
441        assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
442        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
443    }
444
445    #[test]
446    fn test_encode_event_text() {
447        let encoding = (
448            Transformer::default(),
449            vector_lib::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
450        );
451
452        let mut writer = Vec::new();
453        let input = Event::Log(LogEvent::from(BTreeMap::from([(
454            KeyString::from("message"),
455            Value::from("value"),
456        )])));
457        let input_json_size = input.estimated_json_encoded_size_of();
458
459        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
460        assert_eq!(written, 5);
461
462        assert_eq!(String::from_utf8(writer).unwrap(), r"value");
463        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
464    }
465
466    fn test_data_dir() -> PathBuf {
467        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
468    }
469
470    #[test]
471    fn test_encode_batch_protobuf_single() {
472        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
473        let input_proto_size = message_raw.len();
474
475        // default LengthDelimitedCoderOptions.length_field_length is 4
476        let mut buf = BytesMut::with_capacity(64);
477        buf.reserve(4 + input_proto_size);
478        buf.put_uint(input_proto_size as u64, 4);
479        buf.extend_from_slice(&message_raw[..]);
480        let expected_bytes = buf.freeze();
481
482        let config = ProtobufSerializerConfig {
483            protobuf: ProtobufSerializerOptions {
484                desc_file: test_data_dir().join("test_proto.desc"),
485                message_type: "test_proto.User".to_string(),
486                use_json_names: false,
487            },
488        };
489
490        let encoding = (
491            Transformer::default(),
492            vector_lib::codecs::Encoder::<Framer>::new(
493                LengthDelimitedEncoder::default().into(),
494                config.build().unwrap().into(),
495            ),
496        );
497
498        let mut writer = Vec::new();
499        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
500            (KeyString::from("id"), Value::from("123")),
501            (KeyString::from("name"), Value::from("Alice")),
502            (KeyString::from("age"), Value::from(30)),
503            (
504                KeyString::from("emails"),
505                Value::from(vec!["alice@example.com", "alice@work.com"]),
506            ),
507        ])))];
508
509        let input_json_size = input
510            .iter()
511            .map(|event| event.estimated_json_encoded_size_of())
512            .sum::<JsonSize>();
513
514        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
515
516        assert_eq!(input_proto_size, 49);
517        assert_eq!(written, input_proto_size + 4);
518        assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
519        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
520    }
521
522    #[test]
523    fn test_encode_batch_protobuf_multiple() {
524        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
525        let messages = vec![message_raw.clone(), message_raw.clone()];
526        let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
527
528        let mut buf = BytesMut::with_capacity(128);
529        for message in messages {
530            // default LengthDelimitedCoderOptions.length_field_length is 4
531            buf.reserve(4 + message.len());
532            buf.put_uint(message.len() as u64, 4);
533            buf.extend_from_slice(&message[..]);
534        }
535        let expected_bytes = buf.freeze();
536
537        let config = ProtobufSerializerConfig {
538            protobuf: ProtobufSerializerOptions {
539                desc_file: test_data_dir().join("test_proto.desc"),
540                message_type: "test_proto.User".to_string(),
541                use_json_names: false,
542            },
543        };
544
545        let encoding = (
546            Transformer::default(),
547            vector_lib::codecs::Encoder::<Framer>::new(
548                LengthDelimitedEncoder::default().into(),
549                config.build().unwrap().into(),
550            ),
551        );
552
553        let mut writer = Vec::new();
554        let input = vec![
555            Event::Log(LogEvent::from(BTreeMap::from([
556                (KeyString::from("id"), Value::from("123")),
557                (KeyString::from("name"), Value::from("Alice")),
558                (KeyString::from("age"), Value::from(30)),
559                (
560                    KeyString::from("emails"),
561                    Value::from(vec!["alice@example.com", "alice@work.com"]),
562                ),
563            ]))),
564            Event::Log(LogEvent::from(BTreeMap::from([
565                (KeyString::from("id"), Value::from("123")),
566                (KeyString::from("name"), Value::from("Alice")),
567                (KeyString::from("age"), Value::from(30)),
568                (
569                    KeyString::from("emails"),
570                    Value::from(vec!["alice@example.com", "alice@work.com"]),
571                ),
572            ]))),
573        ];
574
575        let input_json_size: JsonSize = input
576            .iter()
577            .map(|event| event.estimated_json_encoded_size_of())
578            .sum();
579
580        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
581
582        assert_eq!(total_input_proto_size, 49 * 2);
583        assert_eq!(written, total_input_proto_size + 8);
584        assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
585        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
586    }
587
588    #[cfg(feature = "codecs-arrow")]
589    #[test]
590    fn test_encode_batch_arrow_emits_record_batch_error_on_type_mismatch() {
591        clear_recorded_events();
592
593        // Schema declares `message` as Int64, but the event below carries a string,
594        // so `build_record_batch` returns `ArrowEncodingError::ArrowJsonDecode`.
595        let schema = ArrowSchema::new(vec![Field::new("message", DataType::Int64, false)]);
596        let serializer = ArrowStreamSerializer::new(ArrowStreamSerializerConfig::new(schema))
597            .expect("failed to build ArrowStreamSerializer");
598        let encoder = BatchEncoder::new(BatchSerializer::Arrow(serializer));
599        let encoding = (Transformer::default(), encoder);
600
601        let event = Event::Log(LogEvent::from(BTreeMap::from([(
602            KeyString::from("message"),
603            Value::from("not_an_integer"),
604        )])));
605
606        let mut writer = Vec::new();
607        let result = encoding.encode_input(vec![event], &mut writer);
608        assert!(
609            result.is_err(),
610            "type mismatch should fail batch encoding, got {result:?}"
611        );
612
613        contains_name_once("EncoderRecordBatchError")
614            .expect("EncoderRecordBatchError should be emitted on ArrowJsonDecode failure");
615        contains_name_once("ComponentEventsDropped")
616            .expect("ComponentEventsDropped should be emitted by the wrapper");
617    }
618}