codecs/decoding/framing/
varint_length_delimited.rs1use bytes::{Buf, Bytes, BytesMut};
2use snafu::Snafu;
3use tokio_util::codec::Decoder;
4use vector_config::configurable_component;
5
6use super::{BoxedFramingError, FramingError, StreamDecodingError};
7
8#[derive(Debug, Snafu)]
10pub enum VarintFramingError {
11 #[snafu(display("Varint too large"))]
12 VarintOverflow,
13
14 #[snafu(display("Frame too large: {length} bytes (max: {max})"))]
15 FrameTooLarge { length: usize, max: usize },
16
17 #[snafu(display("Trailing data at EOF"))]
18 TrailingData,
19}
20
21impl StreamDecodingError for VarintFramingError {
22 fn can_continue(&self) -> bool {
23 match self {
24 Self::VarintOverflow | Self::FrameTooLarge { .. } => false,
26 Self::TrailingData => false,
28 }
29 }
30}
31
32impl FramingError for VarintFramingError {
33 fn as_any(&self) -> &dyn std::any::Any {
34 self as &dyn std::any::Any
35 }
36}
37
38#[configurable_component]
40#[derive(Debug, Clone, Default)]
41pub struct VarintLengthDelimitedDecoderConfig {
42 #[serde(default = "default_max_frame_length")]
44 pub max_frame_length: usize,
45}
46
47const fn default_max_frame_length() -> usize {
48 8 * 1_024 * 1_024
49}
50
51impl VarintLengthDelimitedDecoderConfig {
52 pub fn build(&self) -> VarintLengthDelimitedDecoder {
54 VarintLengthDelimitedDecoder::new(self.max_frame_length)
55 }
56}
57
58#[derive(Debug, Clone)]
61pub struct VarintLengthDelimitedDecoder {
62 max_frame_length: usize,
63}
64
65impl VarintLengthDelimitedDecoder {
66 pub fn new(max_frame_length: usize) -> Self {
68 Self { max_frame_length }
69 }
70
71 fn decode_varint(&self, buf: &mut BytesMut) -> Result<Option<u64>, BoxedFramingError> {
73 if buf.is_empty() {
74 return Ok(None);
75 }
76
77 let mut value: u64 = 0;
78 let mut shift: u8 = 0;
79 let mut bytes_read = 0;
80
81 for byte in buf.iter() {
82 bytes_read += 1;
83 let byte_value = (*byte & 0x7F) as u64;
84 value |= byte_value << shift;
85
86 if *byte & 0x80 == 0 {
87 buf.advance(bytes_read);
89 return Ok(Some(value));
90 }
91
92 shift += 7;
93 if shift >= 64 {
94 return Err(VarintFramingError::VarintOverflow.into());
95 }
96 }
97
98 Ok(None)
100 }
101}
102
103impl Default for VarintLengthDelimitedDecoder {
104 fn default() -> Self {
105 Self::new(default_max_frame_length())
106 }
107}
108
109impl Decoder for VarintLengthDelimitedDecoder {
110 type Item = Bytes;
111 type Error = BoxedFramingError;
112
113 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
114 let length = match self.decode_varint(src)? {
116 Some(len) => len as usize,
117 None => return Ok(None), };
119
120 if length > self.max_frame_length {
122 return Err(VarintFramingError::FrameTooLarge {
123 length,
124 max: self.max_frame_length,
125 }
126 .into());
127 }
128
129 if src.len() < length {
131 return Ok(None); }
133
134 let frame = src.split_to(length).freeze();
136 Ok(Some(frame))
137 }
138
139 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
140 if src.is_empty() {
141 Ok(None)
142 } else {
143 match self.decode(src)? {
145 Some(frame) => Ok(Some(frame)),
146 None => {
147 if !src.is_empty() {
149 Err(VarintFramingError::TrailingData.into())
150 } else {
151 Ok(None)
152 }
153 }
154 }
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162
163 #[test]
164 fn decode_single_byte_varint() {
165 let mut input = BytesMut::from(&[0x03, b'f', b'o', b'o'][..]);
166 let mut decoder = VarintLengthDelimitedDecoder::default();
167
168 assert_eq!(
169 decoder.decode(&mut input).unwrap().unwrap(),
170 Bytes::from("foo")
171 );
172 assert_eq!(decoder.decode(&mut input).unwrap(), None);
173 }
174
175 #[test]
176 fn decode_multi_byte_varint() {
177 let mut input = BytesMut::from(&[0xAC, 0x02][..]);
179 input.extend_from_slice(&vec![b'x'; 300]);
181 let mut decoder = VarintLengthDelimitedDecoder::default();
182
183 let result = decoder.decode(&mut input).unwrap().unwrap();
184 assert_eq!(result.len(), 300);
185 assert_eq!(decoder.decode(&mut input).unwrap(), None);
186 }
187
188 #[test]
189 fn decode_incomplete_varint() {
190 let mut input = BytesMut::from(&[0x80][..]); let mut decoder = VarintLengthDelimitedDecoder::default();
192
193 assert_eq!(decoder.decode(&mut input).unwrap(), None);
194 }
195
196 #[test]
197 fn decode_incomplete_frame() {
198 let mut input = BytesMut::from(&[0x05, b'f', b'o'][..]); let mut decoder = VarintLengthDelimitedDecoder::default();
200
201 assert_eq!(decoder.decode(&mut input).unwrap(), None);
202 }
203
204 #[test]
205 fn decode_frame_too_large() {
206 let mut input =
207 BytesMut::from(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01][..]);
208 let mut decoder = VarintLengthDelimitedDecoder::new(1000);
209
210 assert!(decoder.decode(&mut input).is_err());
211 }
212
213 #[test]
214 fn decode_trailing_data_at_eof() {
215 let mut input = BytesMut::from(&[0x03, b'f', b'o', b'o', b'e', b'x', b't', b'r', b'a'][..]);
216 let mut decoder = VarintLengthDelimitedDecoder::default();
217
218 assert_eq!(
220 decoder.decode(&mut input).unwrap().unwrap(),
221 Bytes::from("foo")
222 );
223
224 assert!(decoder.decode_eof(&mut input).is_err());
226 }
227}