1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3use vector_common::internal_event::emit;
4use vector_core::event::Event;
5
6#[cfg(feature = "arrow")]
7use crate::encoding::ArrowStreamSerializer;
8#[cfg(feature = "parquet")]
9use crate::encoding::ParquetSerializer;
10use crate::{
11 encoding::{Error, Framer, ProtoBatchSerializer, Serializer},
12 internal_events::{EncoderFramingError, EncoderSerializeError},
13};
14
15#[derive(Debug)]
21pub enum BatchOutput {
22 #[cfg(feature = "arrow")]
24 Arrow(arrow::record_batch::RecordBatch),
25 Records(Vec<Vec<u8>>),
27}
28
29#[derive(Debug, Clone)]
31pub enum BatchSerializer {
32 #[cfg(feature = "arrow")]
34 Arrow(ArrowStreamSerializer),
35 #[cfg(feature = "parquet")]
37 Parquet(Box<ParquetSerializer>),
38 ProtoBatch(ProtoBatchSerializer),
40}
41
42#[derive(Debug, Clone)]
44pub struct BatchEncoder {
45 serializer: BatchSerializer,
46}
47
48impl BatchEncoder {
49 pub const fn new(serializer: BatchSerializer) -> Self {
51 Self { serializer }
52 }
53
54 pub const fn serializer(&self) -> &BatchSerializer {
56 &self.serializer
57 }
58
59 #[cfg(any(feature = "arrow", feature = "parquet"))]
65 pub const fn content_type(&self) -> Option<&'static str> {
66 match &self.serializer {
67 #[cfg(feature = "arrow")]
68 BatchSerializer::Arrow(_) => Some("application/vnd.apache.arrow.stream"),
69 #[cfg(feature = "parquet")]
70 BatchSerializer::Parquet(_) => Some("application/vnd.apache.parquet"),
71 BatchSerializer::ProtoBatch(_) => None,
72 }
73 }
74
75 pub fn encode_batch(&self, events: &[Event]) -> Result<BatchOutput, Error> {
77 match &self.serializer {
78 #[cfg(feature = "arrow")]
79 BatchSerializer::Arrow(serializer) => {
80 let record_batch = serializer.encode_to_record_batch(events).map_err(|err| {
81 use crate::encoding::ArrowEncodingError;
82 match err {
83 ArrowEncodingError::NullConstraint { .. } => {
84 Error::SchemaConstraintViolation(Box::new(err))
85 }
86 _ => Error::SerializingError(Box::new(err)),
87 }
88 })?;
89 Ok(BatchOutput::Arrow(record_batch))
90 }
91 BatchSerializer::ProtoBatch(serializer) => {
92 let records = serializer
93 .encode_batch(events)
94 .map_err(|err| Error::SerializingError(Box::new(err)))?;
95 Ok(BatchOutput::Records(records))
96 }
97 #[cfg(feature = "parquet")]
98 BatchSerializer::Parquet(_) => Err(Error::SerializingError(Box::from(
99 "Parquet serializer does not support encode_batch; use the tokio Encoder interface instead",
100 ))),
101 }
102 }
103}
104
105impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
106 type Error = Error;
107
108 #[allow(unused_variables)]
109 fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
110 match &mut self.serializer {
111 #[cfg(feature = "arrow")]
112 BatchSerializer::Arrow(serializer) => {
113 serializer.encode(events, buffer).map_err(|err| {
114 use crate::encoding::ArrowEncodingError;
115 match err {
116 ArrowEncodingError::NullConstraint { .. } => {
117 Error::SchemaConstraintViolation(Box::new(err))
118 }
119 _ => Error::SerializingError(Box::new(err)),
120 }
121 })
122 }
123 #[cfg(feature = "parquet")]
124 BatchSerializer::Parquet(serializer) => serializer
125 .encode(events, buffer)
126 .map_err(Error::SerializingError),
127 BatchSerializer::ProtoBatch(_) => Err(Error::SerializingError(Box::from(
128 "ProtoBatch serializer does not support the tokio Encoder interface; use BatchEncoder::encode_batch() instead",
129 ))),
130 }
131 }
132}
133
134#[derive(Debug, Clone)]
136pub enum EncoderKind {
137 Framed(Box<Encoder<Framer>>),
139 Batch(BatchEncoder),
141}
142
143#[derive(Debug, Clone)]
144pub struct Encoder<Framer>
146where
147 Framer: Clone,
148{
149 framer: Framer,
150 serializer: Serializer,
151}
152
153impl Default for Encoder<Framer> {
154 fn default() -> Self {
155 use crate::encoding::{NewlineDelimitedEncoder, TextSerializerConfig};
156
157 Self {
158 framer: NewlineDelimitedEncoder::default().into(),
159 serializer: TextSerializerConfig::default().build().into(),
160 }
161 }
162}
163
164impl Default for Encoder<()> {
165 fn default() -> Self {
166 use crate::encoding::TextSerializerConfig;
167
168 Self {
169 framer: (),
170 serializer: TextSerializerConfig::default().build().into(),
171 }
172 }
173}
174
175impl<Framer> Encoder<Framer>
176where
177 Framer: Clone,
178{
179 pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
181 let len = buffer.len();
182 let mut payload = buffer.split_off(len);
183
184 self.serialize_at_start(event, &mut payload)?;
185
186 buffer.unsplit(payload);
187
188 Ok(())
189 }
190
191 fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
193 self.serializer.encode(event, buffer).map_err(|error| {
194 emit(EncoderSerializeError { error: &error });
195 Error::SerializingError(error)
196 })
197 }
198}
199
200impl Encoder<Framer> {
201 pub const fn new(framer: Framer, serializer: Serializer) -> Self {
205 Self { framer, serializer }
206 }
207
208 pub const fn framer(&self) -> &Framer {
210 &self.framer
211 }
212
213 pub const fn serializer(&self) -> &Serializer {
215 &self.serializer
216 }
217
218 pub const fn batch_prefix(&self) -> &[u8] {
220 match (&self.framer, &self.serializer) {
221 (
222 Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
223 delimiter: b',',
224 }),
225 Serializer::Json(_) | Serializer::NativeJson(_),
226 ) => b"[",
227 _ => &[],
228 }
229 }
230
231 pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
233 match (&self.framer, &self.serializer, empty) {
234 (
235 Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
236 delimiter: b',',
237 }),
238 Serializer::Json(_) | Serializer::NativeJson(_),
239 _,
240 ) => b"]",
241 (Framer::NewlineDelimited(_), _, false) => b"\n",
242 _ => &[],
243 }
244 }
245
246 pub const fn content_type(&self) -> &'static str {
248 match (&self.serializer, &self.framer) {
249 (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
250 "application/x-ndjson"
251 }
252 (
253 Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
254 Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
255 delimiter: b',',
256 }),
257 ) => "application/json",
258 (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
259 (
260 Serializer::Avro(_)
261 | Serializer::Cef(_)
262 | Serializer::Csv(_)
263 | Serializer::Gelf(_)
264 | Serializer::Json(_)
265 | Serializer::Logfmt(_)
266 | Serializer::NativeJson(_)
267 | Serializer::RawMessage(_)
268 | Serializer::Text(_),
269 _,
270 ) => "text/plain",
271 #[cfg(feature = "syslog")]
272 (Serializer::Syslog(_), _) => "text/plain",
273 #[cfg(feature = "opentelemetry")]
274 (Serializer::Otlp(_), _) => "application/x-protobuf",
275 }
276 }
277}
278
279impl Encoder<()> {
280 pub const fn new(serializer: Serializer) -> Self {
283 Self {
284 framer: (),
285 serializer,
286 }
287 }
288
289 pub const fn serializer(&self) -> &Serializer {
291 &self.serializer
292 }
293}
294
295impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
296 type Error = Error;
297
298 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
299 let len = buffer.len();
300 let mut payload = buffer.split_off(len);
301
302 self.serialize_at_start(event, &mut payload)?;
303
304 self.framer.encode((), &mut payload).map_err(|error| {
306 emit(EncoderFramingError { error: &error });
307 Error::FramingError(error)
308 })?;
309
310 buffer.unsplit(payload);
311
312 Ok(())
313 }
314}
315
316impl tokio_util::codec::Encoder<Event> for Encoder<()> {
317 type Error = Error;
318
319 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
320 let len = buffer.len();
321 let mut payload = buffer.split_off(len);
322
323 self.serialize_at_start(event, &mut payload)?;
324
325 buffer.unsplit(payload);
326
327 Ok(())
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use bytes::BufMut;
334 use futures::{SinkExt, StreamExt};
335 use tokio_util::codec::FramedWrite;
336 use vector_core::event::LogEvent;
337
338 use super::*;
339 use crate::encoding::BoxedFramingError;
340
341 #[derive(Debug, Clone)]
342 struct ParenEncoder;
343
344 impl ParenEncoder {
345 pub(super) const fn new() -> Self {
346 Self
347 }
348 }
349
350 impl tokio_util::codec::Encoder<()> for ParenEncoder {
351 type Error = BoxedFramingError;
352
353 fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
354 dst.reserve(2);
355 let inner = dst.split();
356 dst.put_u8(b'(');
357 dst.unsplit(inner);
358 dst.put_u8(b')');
359 Ok(())
360 }
361 }
362
363 #[derive(Debug, Clone)]
364 struct ErrorNthEncoder<T>(T, usize, usize)
365 where
366 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
367
368 impl<T> ErrorNthEncoder<T>
369 where
370 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
371 {
372 pub(super) const fn new(encoder: T, n: usize) -> Self {
373 Self(encoder, 0, n)
374 }
375 }
376
377 impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
378 where
379 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
380 {
381 type Error = BoxedFramingError;
382
383 fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
384 self.0.encode((), dst)?;
385 let result = if self.1 == self.2 {
386 Err(Box::new(std::io::Error::other("error")) as _)
387 } else {
388 Ok(())
389 };
390 self.1 += 1;
391 result
392 }
393 }
394
395 #[tokio::test]
396 async fn test_encode_events_sink_empty() {
397 let encoder = Encoder::<Framer>::new(
398 Framer::Boxed(Box::new(ParenEncoder::new())),
399 crate::encoding::TextSerializerConfig::default()
400 .build()
401 .into(),
402 );
403 let source = futures::stream::iter(vec![
404 Event::Log(LogEvent::from("foo")),
405 Event::Log(LogEvent::from("bar")),
406 Event::Log(LogEvent::from("baz")),
407 ])
408 .map(Ok);
409 let sink = Vec::new();
410 let mut framed = FramedWrite::new(sink, encoder);
411 source.forward(&mut framed).await.unwrap();
412 let sink = framed.into_inner();
413 assert_eq!(sink, b"(foo)(bar)(baz)");
414 }
415
416 #[tokio::test]
417 async fn test_encode_events_sink_non_empty() {
418 let encoder = Encoder::<Framer>::new(
419 Framer::Boxed(Box::new(ParenEncoder::new())),
420 crate::encoding::TextSerializerConfig::default()
421 .build()
422 .into(),
423 );
424 let source = futures::stream::iter(vec![
425 Event::Log(LogEvent::from("bar")),
426 Event::Log(LogEvent::from("baz")),
427 Event::Log(LogEvent::from("bat")),
428 ])
429 .map(Ok);
430 let sink = Vec::from("(foo)");
431 let mut framed = FramedWrite::new(sink, encoder);
432 source.forward(&mut framed).await.unwrap();
433 let sink = framed.into_inner();
434 assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
435 }
436
437 #[tokio::test]
438 async fn test_encode_events_sink_empty_handle_framing_error() {
439 let encoder = Encoder::<Framer>::new(
440 Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
441 crate::encoding::TextSerializerConfig::default()
442 .build()
443 .into(),
444 );
445 let source = futures::stream::iter(vec![
446 Event::Log(LogEvent::from("foo")),
447 Event::Log(LogEvent::from("bar")),
448 Event::Log(LogEvent::from("baz")),
449 ])
450 .map(Ok);
451 let sink = Vec::new();
452 let mut framed = FramedWrite::new(sink, encoder);
453 assert!(source.forward(&mut framed).await.is_err());
454 framed.flush().await.unwrap();
455 let sink = framed.into_inner();
456 assert_eq!(sink, b"(foo)");
457 }
458
459 #[tokio::test]
460 async fn test_encode_events_sink_non_empty_handle_framing_error() {
461 let encoder = Encoder::<Framer>::new(
462 Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
463 crate::encoding::TextSerializerConfig::default()
464 .build()
465 .into(),
466 );
467 let source = futures::stream::iter(vec![
468 Event::Log(LogEvent::from("bar")),
469 Event::Log(LogEvent::from("baz")),
470 Event::Log(LogEvent::from("bat")),
471 ])
472 .map(Ok);
473 let sink = Vec::from("(foo)");
474 let mut framed = FramedWrite::new(sink, encoder);
475 assert!(source.forward(&mut framed).await.is_err());
476 framed.flush().await.unwrap();
477 let sink = framed.into_inner();
478 assert_eq!(sink, b"(foo)(bar)");
479 }
480
481 #[tokio::test]
482 async fn test_encode_batch_newline() {
483 let encoder = Encoder::<Framer>::new(
484 Framer::NewlineDelimited(crate::encoding::NewlineDelimitedEncoder::default()),
485 crate::encoding::TextSerializerConfig::default()
486 .build()
487 .into(),
488 );
489 let source = futures::stream::iter(vec![
490 Event::Log(LogEvent::from("bar")),
491 Event::Log(LogEvent::from("baz")),
492 Event::Log(LogEvent::from("bat")),
493 ])
494 .map(Ok);
495 let sink: Vec<u8> = Vec::new();
496 let mut framed = FramedWrite::new(sink, encoder);
497 source.forward(&mut framed).await.unwrap();
498 let sink = framed.into_inner();
499 assert_eq!(sink, b"bar\nbaz\nbat\n");
500 }
501}