1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7 EstimatedJsonEncodedSizeOf,
8 codecs::{Transformer, encoding::Framer, internal_events::EncoderWriteError},
9 config::telemetry,
10 request_metadata::GroupedCountByteSize,
11};
12
13use crate::event::Event;
14
15pub trait Encoder<T> {
16 fn encode_input(
22 &self,
23 input: T,
24 writer: &mut dyn io::Write,
25 ) -> io::Result<(usize, GroupedCountByteSize)>;
26}
27
28impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::Encoder<Framer>) {
29 fn encode_input(
30 &self,
31 events: Vec<Event>,
32 writer: &mut dyn io::Write,
33 ) -> io::Result<(usize, GroupedCountByteSize)> {
34 let mut encoder = self.1.clone();
35 let mut bytes_written = 0;
36 let mut n_events_pending = events.len();
37 let is_empty = events.is_empty();
38 let batch_prefix = encoder.batch_prefix();
39 write_all(writer, n_events_pending, batch_prefix)?;
40 bytes_written += batch_prefix.len();
41
42 let mut byte_size = telemetry().create_request_count_byte_size();
43
44 for (position, mut event) in events.into_iter().with_position() {
45 self.0.transform(&mut event);
46
47 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
50
51 let mut bytes = BytesMut::new();
52 match (position, encoder.framer()) {
53 (
54 Position::Last | Position::Only,
55 Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
56 ) => {
57 encoder
58 .serialize(event, &mut bytes)
59 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
60 }
61 _ => {
62 encoder
63 .encode(event, &mut bytes)
64 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
65 }
66 }
67 write_all(writer, n_events_pending, &bytes)?;
68 bytes_written += bytes.len();
69 n_events_pending -= 1;
70 }
71
72 let batch_suffix = encoder.batch_suffix(is_empty);
73 assert!(n_events_pending == 0);
74 write_all(writer, 0, batch_suffix)?;
75 bytes_written += batch_suffix.len();
76
77 Ok((bytes_written, byte_size))
78 }
79}
80
81impl Encoder<Event> for (Transformer, vector_lib::codecs::Encoder<()>) {
82 fn encode_input(
83 &self,
84 mut event: Event,
85 writer: &mut dyn io::Write,
86 ) -> io::Result<(usize, GroupedCountByteSize)> {
87 let mut encoder = self.1.clone();
88 self.0.transform(&mut event);
89
90 let mut byte_size = telemetry().create_request_count_byte_size();
91 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
92
93 let mut bytes = BytesMut::new();
94 encoder
95 .serialize(event, &mut bytes)
96 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
97 write_all(writer, 1, &bytes)?;
98 Ok((bytes.len(), byte_size))
99 }
100}
101
102impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::BatchEncoder) {
103 fn encode_input(
104 &self,
105 events: Vec<Event>,
106 writer: &mut dyn io::Write,
107 ) -> io::Result<(usize, GroupedCountByteSize)> {
108 use tokio_util::codec::Encoder as _;
109 use vector_lib::internal_event::{ComponentEventsDropped, UNINTENTIONAL};
110
111 let mut encoder = self.1.clone();
112 let mut byte_size = telemetry().create_request_count_byte_size();
113 let n_events = events.len();
114 let mut transformed_events = Vec::with_capacity(n_events);
115
116 for mut event in events {
117 self.0.transform(&mut event);
118 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
119 transformed_events.push(event);
120 }
121
122 let mut bytes = BytesMut::new();
123 encoder
124 .encode(transformed_events, &mut bytes)
125 .map_err(|error| {
126 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
136 count: n_events,
137 reason: "Failed to batch encode events.",
138 });
139 io::Error::new(io::ErrorKind::InvalidData, error)
140 })?;
141
142 write_all(writer, n_events, &bytes)?;
143 Ok((bytes.len(), byte_size))
144 }
145}
146
147impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::EncoderKind) {
148 fn encode_input(
149 &self,
150 events: Vec<Event>,
151 writer: &mut dyn io::Write,
152 ) -> io::Result<(usize, GroupedCountByteSize)> {
153 match &self.1 {
155 vector_lib::codecs::EncoderKind::Framed(encoder) => {
156 (self.0.clone(), *encoder.clone()).encode_input(events, writer)
157 }
158 vector_lib::codecs::EncoderKind::Batch(encoder) => {
159 (self.0.clone(), encoder.clone()).encode_input(events, writer)
160 }
161 }
162 }
163}
164
165pub fn write_all(
174 writer: &mut dyn io::Write,
175 n_events_pending: usize,
176 buf: &[u8],
177) -> io::Result<()> {
178 writer.write_all(buf).inspect_err(|error| {
179 emit!(EncoderWriteError {
180 error,
181 count: n_events_pending,
182 });
183 })
184}
185
186pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
187where
188 F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
189 E: Into<io::Error> + 'static,
190{
191 struct Tracked<'inner> {
192 count: usize,
193 inner: &'inner mut dyn io::Write,
194 }
195
196 impl io::Write for Tracked<'_> {
197 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
198 #[allow(clippy::disallowed_methods)] let n = self.inner.write(buf)?;
200 self.count += n;
201 Ok(n)
202 }
203
204 fn flush(&mut self) -> io::Result<()> {
205 self.inner.flush()
206 }
207 }
208
209 let mut tracked = Tracked { count: 0, inner };
210 f(&mut tracked, input).map_err(|e| e.into())?;
211 Ok(tracked.count)
212}
213
214#[cfg(test)]
215mod tests {
216 use std::{collections::BTreeMap, env, path::PathBuf};
217
218 use bytes::{BufMut, Bytes};
219 use cfg_if::cfg_if;
220 use vector_lib::{
221 codecs::{
222 CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
223 NewlineDelimitedEncoder, TextSerializerConfig,
224 encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
225 },
226 event::LogEvent,
227 internal_event::CountByteSize,
228 json_size::JsonSize,
229 };
230 use vrl::value::{KeyString, Value};
231
232 cfg_if! {
233 if #[cfg(feature = "codecs-arrow")] {
234 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
235 use vector_lib::codecs::{
236 BatchEncoder,
237 encoding::{ArrowStreamSerializer, ArrowStreamSerializerConfig, BatchSerializer},
238 };
239 use vector_lib::event_test_util::{clear_recorded_events, contains_name_once};
240 }
241 }
242
243 use super::*;
244
245 #[test]
246 fn test_encode_batch_json_empty() {
247 let encoding = (
248 Transformer::default(),
249 vector_lib::codecs::Encoder::<Framer>::new(
250 CharacterDelimitedEncoder::new(b',').into(),
251 JsonSerializerConfig::default().build().into(),
252 ),
253 );
254
255 let mut writer = Vec::new();
256 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
257 assert_eq!(written, 2);
258
259 assert_eq!(String::from_utf8(writer).unwrap(), "[]");
260 assert_eq!(
261 CountByteSize(0, JsonSize::zero()),
262 json_size.size().unwrap()
263 );
264 }
265
266 #[test]
267 fn test_encode_batch_json_single() {
268 let encoding = (
269 Transformer::default(),
270 vector_lib::codecs::Encoder::<Framer>::new(
271 CharacterDelimitedEncoder::new(b',').into(),
272 JsonSerializerConfig::default().build().into(),
273 ),
274 );
275
276 let mut writer = Vec::new();
277 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
278 KeyString::from("key"),
279 Value::from("value"),
280 )])))];
281
282 let input_json_size = input
283 .iter()
284 .map(|event| event.estimated_json_encoded_size_of())
285 .sum::<JsonSize>();
286
287 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
288 assert_eq!(written, 17);
289
290 assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
291 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
292 }
293
294 #[test]
295 fn test_encode_batch_json_multiple() {
296 let encoding = (
297 Transformer::default(),
298 vector_lib::codecs::Encoder::<Framer>::new(
299 CharacterDelimitedEncoder::new(b',').into(),
300 JsonSerializerConfig::default().build().into(),
301 ),
302 );
303
304 let input = vec![
305 Event::Log(LogEvent::from(BTreeMap::from([(
306 KeyString::from("key"),
307 Value::from("value1"),
308 )]))),
309 Event::Log(LogEvent::from(BTreeMap::from([(
310 KeyString::from("key"),
311 Value::from("value2"),
312 )]))),
313 Event::Log(LogEvent::from(BTreeMap::from([(
314 KeyString::from("key"),
315 Value::from("value3"),
316 )]))),
317 ];
318
319 let input_json_size = input
320 .iter()
321 .map(|event| event.estimated_json_encoded_size_of())
322 .sum::<JsonSize>();
323
324 let mut writer = Vec::new();
325 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
326 assert_eq!(written, 52);
327
328 assert_eq!(
329 String::from_utf8(writer).unwrap(),
330 r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
331 );
332
333 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
334 }
335
336 #[test]
337 fn test_encode_batch_ndjson_empty() {
338 let encoding = (
339 Transformer::default(),
340 vector_lib::codecs::Encoder::<Framer>::new(
341 NewlineDelimitedEncoder::default().into(),
342 JsonSerializerConfig::default().build().into(),
343 ),
344 );
345
346 let mut writer = Vec::new();
347 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
348 assert_eq!(written, 0);
349
350 assert_eq!(String::from_utf8(writer).unwrap(), "");
351 assert_eq!(
352 CountByteSize(0, JsonSize::zero()),
353 json_size.size().unwrap()
354 );
355 }
356
357 #[test]
358 fn test_encode_batch_ndjson_single() {
359 let encoding = (
360 Transformer::default(),
361 vector_lib::codecs::Encoder::<Framer>::new(
362 NewlineDelimitedEncoder::default().into(),
363 JsonSerializerConfig::default().build().into(),
364 ),
365 );
366
367 let mut writer = Vec::new();
368 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
369 KeyString::from("key"),
370 Value::from("value"),
371 )])))];
372 let input_json_size = input
373 .iter()
374 .map(|event| event.estimated_json_encoded_size_of())
375 .sum::<JsonSize>();
376
377 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
378 assert_eq!(written, 16);
379
380 assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
381 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
382 }
383
384 #[test]
385 fn test_encode_batch_ndjson_multiple() {
386 let encoding = (
387 Transformer::default(),
388 vector_lib::codecs::Encoder::<Framer>::new(
389 NewlineDelimitedEncoder::default().into(),
390 JsonSerializerConfig::default().build().into(),
391 ),
392 );
393
394 let mut writer = Vec::new();
395 let input = vec![
396 Event::Log(LogEvent::from(BTreeMap::from([(
397 KeyString::from("key"),
398 Value::from("value1"),
399 )]))),
400 Event::Log(LogEvent::from(BTreeMap::from([(
401 KeyString::from("key"),
402 Value::from("value2"),
403 )]))),
404 Event::Log(LogEvent::from(BTreeMap::from([(
405 KeyString::from("key"),
406 Value::from("value3"),
407 )]))),
408 ];
409 let input_json_size = input
410 .iter()
411 .map(|event| event.estimated_json_encoded_size_of())
412 .sum::<JsonSize>();
413
414 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
415 assert_eq!(written, 51);
416
417 assert_eq!(
418 String::from_utf8(writer).unwrap(),
419 "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
420 );
421 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
422 }
423
424 #[test]
425 fn test_encode_event_json() {
426 let encoding = (
427 Transformer::default(),
428 vector_lib::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
429 );
430
431 let mut writer = Vec::new();
432 let input = Event::Log(LogEvent::from(BTreeMap::from([(
433 KeyString::from("key"),
434 Value::from("value"),
435 )])));
436 let input_json_size = input.estimated_json_encoded_size_of();
437
438 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
439 assert_eq!(written, 15);
440
441 assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
442 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
443 }
444
445 #[test]
446 fn test_encode_event_text() {
447 let encoding = (
448 Transformer::default(),
449 vector_lib::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
450 );
451
452 let mut writer = Vec::new();
453 let input = Event::Log(LogEvent::from(BTreeMap::from([(
454 KeyString::from("message"),
455 Value::from("value"),
456 )])));
457 let input_json_size = input.estimated_json_encoded_size_of();
458
459 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
460 assert_eq!(written, 5);
461
462 assert_eq!(String::from_utf8(writer).unwrap(), r"value");
463 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
464 }
465
466 fn test_data_dir() -> PathBuf {
467 PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
468 }
469
470 #[test]
471 fn test_encode_batch_protobuf_single() {
472 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
473 let input_proto_size = message_raw.len();
474
475 let mut buf = BytesMut::with_capacity(64);
477 buf.reserve(4 + input_proto_size);
478 buf.put_uint(input_proto_size as u64, 4);
479 buf.extend_from_slice(&message_raw[..]);
480 let expected_bytes = buf.freeze();
481
482 let config = ProtobufSerializerConfig {
483 protobuf: ProtobufSerializerOptions {
484 desc_file: test_data_dir().join("test_proto.desc"),
485 message_type: "test_proto.User".to_string(),
486 use_json_names: false,
487 },
488 };
489
490 let encoding = (
491 Transformer::default(),
492 vector_lib::codecs::Encoder::<Framer>::new(
493 LengthDelimitedEncoder::default().into(),
494 config.build().unwrap().into(),
495 ),
496 );
497
498 let mut writer = Vec::new();
499 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
500 (KeyString::from("id"), Value::from("123")),
501 (KeyString::from("name"), Value::from("Alice")),
502 (KeyString::from("age"), Value::from(30)),
503 (
504 KeyString::from("emails"),
505 Value::from(vec!["alice@example.com", "alice@work.com"]),
506 ),
507 ])))];
508
509 let input_json_size = input
510 .iter()
511 .map(|event| event.estimated_json_encoded_size_of())
512 .sum::<JsonSize>();
513
514 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
515
516 assert_eq!(input_proto_size, 49);
517 assert_eq!(written, input_proto_size + 4);
518 assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
519 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
520 }
521
522 #[test]
523 fn test_encode_batch_protobuf_multiple() {
524 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
525 let messages = vec![message_raw.clone(), message_raw.clone()];
526 let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
527
528 let mut buf = BytesMut::with_capacity(128);
529 for message in messages {
530 buf.reserve(4 + message.len());
532 buf.put_uint(message.len() as u64, 4);
533 buf.extend_from_slice(&message[..]);
534 }
535 let expected_bytes = buf.freeze();
536
537 let config = ProtobufSerializerConfig {
538 protobuf: ProtobufSerializerOptions {
539 desc_file: test_data_dir().join("test_proto.desc"),
540 message_type: "test_proto.User".to_string(),
541 use_json_names: false,
542 },
543 };
544
545 let encoding = (
546 Transformer::default(),
547 vector_lib::codecs::Encoder::<Framer>::new(
548 LengthDelimitedEncoder::default().into(),
549 config.build().unwrap().into(),
550 ),
551 );
552
553 let mut writer = Vec::new();
554 let input = vec![
555 Event::Log(LogEvent::from(BTreeMap::from([
556 (KeyString::from("id"), Value::from("123")),
557 (KeyString::from("name"), Value::from("Alice")),
558 (KeyString::from("age"), Value::from(30)),
559 (
560 KeyString::from("emails"),
561 Value::from(vec!["alice@example.com", "alice@work.com"]),
562 ),
563 ]))),
564 Event::Log(LogEvent::from(BTreeMap::from([
565 (KeyString::from("id"), Value::from("123")),
566 (KeyString::from("name"), Value::from("Alice")),
567 (KeyString::from("age"), Value::from(30)),
568 (
569 KeyString::from("emails"),
570 Value::from(vec!["alice@example.com", "alice@work.com"]),
571 ),
572 ]))),
573 ];
574
575 let input_json_size: JsonSize = input
576 .iter()
577 .map(|event| event.estimated_json_encoded_size_of())
578 .sum();
579
580 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
581
582 assert_eq!(total_input_proto_size, 49 * 2);
583 assert_eq!(written, total_input_proto_size + 8);
584 assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
585 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
586 }
587
588 #[cfg(feature = "codecs-arrow")]
589 #[test]
590 fn test_encode_batch_arrow_emits_record_batch_error_on_type_mismatch() {
591 clear_recorded_events();
592
593 let schema = ArrowSchema::new(vec![Field::new("message", DataType::Int64, false)]);
596 let serializer = ArrowStreamSerializer::new(ArrowStreamSerializerConfig::new(schema))
597 .expect("failed to build ArrowStreamSerializer");
598 let encoder = BatchEncoder::new(BatchSerializer::Arrow(serializer));
599 let encoding = (Transformer::default(), encoder);
600
601 let event = Event::Log(LogEvent::from(BTreeMap::from([(
602 KeyString::from("message"),
603 Value::from("not_an_integer"),
604 )])));
605
606 let mut writer = Vec::new();
607 let result = encoding.encode_input(vec![event], &mut writer);
608 assert!(
609 result.is_err(),
610 "type mismatch should fail batch encoding, got {result:?}"
611 );
612
613 contains_name_once("EncoderRecordBatchError")
614 .expect("EncoderRecordBatchError should be emitted on ArrowJsonDecode failure");
615 contains_name_once("ComponentEventsDropped")
616 .expect("ComponentEventsDropped should be emitted by the wrapper");
617 }
618}