codecs/decoding/
decoder.rs1use bytes::{Bytes, BytesMut};
2use smallvec::SmallVec;
3use vector_common::internal_event::emit;
4use vector_core::{
5 config::LogNamespace,
6 event::{Event, EventMetadata},
7};
8
9use crate::{
10 decoding::format::Deserializer as _,
11 decoding::{
12 BoxedFramingError, BytesDeserializer, Deserializer, Error, Framer, NewlineDelimitedDecoder,
13 },
14 internal_events::{DecoderDeserializeError, DecoderFramingError},
15};
16
17type DecodedFrame = (SmallVec<[Event; 1]>, usize);
18
19#[derive(Clone)]
22pub struct Decoder {
23 pub framer: Framer,
25 pub deserializer: Deserializer,
27 pub log_namespace: LogNamespace,
29}
30
31impl Default for Decoder {
32 fn default() -> Self {
33 Self {
34 framer: Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
35 deserializer: Deserializer::Bytes(BytesDeserializer),
36 log_namespace: LogNamespace::Legacy,
37 }
38 }
39}
40
41impl Decoder {
42 pub const fn new(framer: Framer, deserializer: Deserializer) -> Self {
46 Self {
47 framer,
48 deserializer,
49 log_namespace: LogNamespace::Legacy,
50 }
51 }
52
53 pub const fn with_log_namespace(mut self, log_namespace: LogNamespace) -> Self {
55 self.log_namespace = log_namespace;
56 self
57 }
58
59 pub fn with_metadata_template(mut self, metadata: EventMetadata) -> Self {
62 self.deserializer = self.deserializer.with_metadata_template(metadata);
63 self
64 }
65
66 fn handle_framing_result(
71 &mut self,
72 frame: Result<Option<Bytes>, BoxedFramingError>,
73 ) -> Result<Option<DecodedFrame>, Error> {
74 let frame = frame.map_err(|error| {
75 emit(DecoderFramingError { error: &error });
76 Error::FramingError(error)
77 })?;
78
79 frame
80 .map(|frame| self.deserializer_parse(frame))
81 .transpose()
82 }
83
84 pub fn deserializer_parse(&self, frame: Bytes) -> Result<DecodedFrame, Error> {
86 let byte_size = frame.len();
87
88 self.deserializer
90 .parse(frame, self.log_namespace)
91 .map(|events| (events, byte_size))
92 .map_err(|error| {
93 emit(DecoderDeserializeError { error: &error });
94 Error::ParsingError(error)
95 })
96 }
97}
98
99impl tokio_util::codec::Decoder for Decoder {
100 type Item = DecodedFrame;
101 type Error = Error;
102
103 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
104 let frame = self.framer.decode(buf);
105 self.handle_framing_result(frame)
106 }
107
108 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
109 let frame = self.framer.decode_eof(buf);
110 self.handle_framing_result(frame)
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use bytes::Bytes;
117 use futures::{StreamExt, stream};
118 use tokio_util::io::StreamReader;
119 use vrl::value::Value;
120
121 use super::Decoder;
122 use crate::{
123 DecoderFramedRead, JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
124 decoding::{Deserializer, Framer},
125 };
126
127 #[tokio::test]
128 async fn framed_read_recover_from_error() {
129 let iter = stream::iter(
130 ["{ \"foo\": 1 }\n", "invalid\n", "{ \"bar\": 2 }\n"]
131 .into_iter()
132 .map(Bytes::from),
133 );
134 let stream = iter.map(Ok::<_, std::io::Error>);
135 let reader = StreamReader::new(stream);
136 let decoder = Decoder::new(
137 Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
138 Deserializer::Json(JsonDeserializer::default()),
139 );
140 let mut stream = DecoderFramedRead::new(reader, decoder);
141
142 let next = stream.next().await.unwrap();
143 let event = next.unwrap().0.pop().unwrap().into_log();
144 assert_eq!(event.get("foo").unwrap(), &Value::from(1));
145
146 let next = stream.next().await.unwrap();
147 let error = next.unwrap_err();
148 assert!(error.can_continue());
149
150 let next = stream.next().await.unwrap();
151 let event = next.unwrap().0.pop().unwrap().into_log();
152 assert_eq!(event.get("bar").unwrap(), &Value::from(2));
153 }
154}