codecs/encoding/framing/
varint_length_delimited.rs1use bytes::{BufMut, BytesMut};
2use snafu::Snafu;
3use tokio_util::codec::Encoder;
4use vector_config::configurable_component;
5
6use super::{BoxedFramingError, FramingError};
7
8#[derive(Debug, Snafu)]
10pub enum VarintFramingError {
11 #[snafu(display("Frame too large: {length} bytes (max: {max})"))]
12 FrameTooLarge { length: usize, max: usize },
13}
14
15impl FramingError for VarintFramingError {}
16
17#[configurable_component]
19#[derive(Debug, Clone, PartialEq, Eq, Default)]
20pub struct VarintLengthDelimitedEncoderConfig {
21 #[serde(default = "default_max_frame_length")]
23 pub max_frame_length: usize,
24}
25
26const fn default_max_frame_length() -> usize {
27 8 * 1_024 * 1_024
28}
29
30impl VarintLengthDelimitedEncoderConfig {
31 pub fn build(&self) -> VarintLengthDelimitedEncoder {
33 VarintLengthDelimitedEncoder::new(self.max_frame_length)
34 }
35}
36
37#[derive(Debug, Clone)]
40pub struct VarintLengthDelimitedEncoder {
41 max_frame_length: usize,
42}
43
44impl VarintLengthDelimitedEncoder {
45 pub fn new(max_frame_length: usize) -> Self {
47 Self { max_frame_length }
48 }
49
50 fn encode_varint(&self, value: usize, buf: &mut BytesMut) -> Result<(), BoxedFramingError> {
52 if value > self.max_frame_length {
53 return Err(VarintFramingError::FrameTooLarge {
54 length: value,
55 max: self.max_frame_length,
56 }
57 .into());
58 }
59
60 let mut val = value;
61 while val >= 0x80 {
62 buf.put_u8((val as u8) | 0x80);
63 val >>= 7;
64 }
65 buf.put_u8(val as u8);
66 Ok(())
67 }
68}
69
70impl Default for VarintLengthDelimitedEncoder {
71 fn default() -> Self {
72 Self::new(default_max_frame_length())
73 }
74}
75
76impl Encoder<()> for VarintLengthDelimitedEncoder {
77 type Error = BoxedFramingError;
78
79 fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
80 let data_length = buffer.len();
83 if data_length == 0 {
84 return Ok(());
85 }
86
87 let mut varint_buffer = BytesMut::new();
89 self.encode_varint(data_length, &mut varint_buffer)?;
90
91 let varint_bytes = varint_buffer.freeze();
93 let data_bytes = buffer.split_to(buffer.len());
94 buffer.extend_from_slice(&varint_bytes);
95 buffer.extend_from_slice(&data_bytes);
96 Ok(())
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 #[test]
105 fn encode_single_byte_varint() {
106 let mut buffer = BytesMut::from(&b"foo"[..]);
107 let mut encoder = VarintLengthDelimitedEncoder::default();
108
109 encoder.encode((), &mut buffer).unwrap();
110 assert_eq!(buffer, &[0x03, b'f', b'o', b'o'][..]);
111 }
112
113 #[test]
114 fn encode_multi_byte_varint() {
115 let mut buffer = BytesMut::from(&b"foo"[..]);
116 let mut encoder = VarintLengthDelimitedEncoder::new(1000);
117
118 buffer.clear();
120 buffer.extend_from_slice(&vec![b'x'; 300]);
121 encoder.encode((), &mut buffer).unwrap();
122
123 assert_eq!(buffer[0..2], [0xAC, 0x02]);
125 assert_eq!(buffer.len(), 302); }
127
128 #[test]
129 fn encode_frame_too_large() {
130 let large_data = vec![b'x'; 1001];
131 let mut buffer = BytesMut::from(&large_data[..]);
132 let mut encoder = VarintLengthDelimitedEncoder::new(1000);
133
134 assert!(encoder.encode((), &mut buffer).is_err());
135 }
136
137 #[test]
138 fn encode_empty_buffer() {
139 let mut buffer = BytesMut::new();
140 let mut encoder = VarintLengthDelimitedEncoder::default();
141
142 encoder.encode((), &mut buffer).unwrap();
143 assert_eq!(buffer.len(), 0);
144 }
145}