codecs/encoding/framing/
varint_length_delimited.rs

1use bytes::{BufMut, BytesMut};
2use snafu::Snafu;
3use tokio_util::codec::Encoder;
4use vector_config::configurable_component;
5
6use super::{BoxedFramingError, FramingError};
7
8/// Errors that can occur during varint length delimited framing.
9#[derive(Debug, Snafu)]
10pub enum VarintFramingError {
11    #[snafu(display("Frame too large: {length} bytes (max: {max})"))]
12    FrameTooLarge { length: usize, max: usize },
13}
14
15impl FramingError for VarintFramingError {}
16
17/// Config used to build a `VarintLengthDelimitedEncoder`.
18#[configurable_component]
19#[derive(Debug, Clone, PartialEq, Eq, Default)]
20pub struct VarintLengthDelimitedEncoderConfig {
21    /// Maximum frame length
22    #[serde(default = "default_max_frame_length")]
23    pub max_frame_length: usize,
24}
25
26const fn default_max_frame_length() -> usize {
27    8 * 1_024 * 1_024
28}
29
30impl VarintLengthDelimitedEncoderConfig {
31    /// Build the `VarintLengthDelimitedEncoder` from this configuration.
32    pub fn build(&self) -> VarintLengthDelimitedEncoder {
33        VarintLengthDelimitedEncoder::new(self.max_frame_length)
34    }
35}
36
37/// A codec for handling bytes sequences whose length is encoded as a varint prefix.
38/// This is compatible with protobuf's length-delimited encoding.
39#[derive(Debug, Clone)]
40pub struct VarintLengthDelimitedEncoder {
41    max_frame_length: usize,
42}
43
44impl VarintLengthDelimitedEncoder {
45    /// Creates a new `VarintLengthDelimitedEncoder`.
46    pub fn new(max_frame_length: usize) -> Self {
47        Self { max_frame_length }
48    }
49
50    /// Encode a varint into the buffer
51    fn encode_varint(&self, value: usize, buf: &mut BytesMut) -> Result<(), BoxedFramingError> {
52        if value > self.max_frame_length {
53            return Err(VarintFramingError::FrameTooLarge {
54                length: value,
55                max: self.max_frame_length,
56            }
57            .into());
58        }
59
60        let mut val = value;
61        while val >= 0x80 {
62            buf.put_u8((val as u8) | 0x80);
63            val >>= 7;
64        }
65        buf.put_u8(val as u8);
66        Ok(())
67    }
68}
69
70impl Default for VarintLengthDelimitedEncoder {
71    fn default() -> Self {
72        Self::new(default_max_frame_length())
73    }
74}
75
76impl Encoder<()> for VarintLengthDelimitedEncoder {
77    type Error = BoxedFramingError;
78
79    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
80        // This encoder expects the data to already be in the buffer
81        // We just need to prepend the varint length
82        let data_length = buffer.len();
83        if data_length == 0 {
84            return Ok(());
85        }
86
87        // Create a temporary buffer to hold the varint
88        let mut varint_buffer = BytesMut::new();
89        self.encode_varint(data_length, &mut varint_buffer)?;
90
91        // Prepend the varint to the buffer
92        let varint_bytes = varint_buffer.freeze();
93        let data_bytes = buffer.split_to(buffer.len());
94        buffer.extend_from_slice(&varint_bytes);
95        buffer.extend_from_slice(&data_bytes);
96        Ok(())
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    #[test]
105    fn encode_single_byte_varint() {
106        let mut buffer = BytesMut::from(&b"foo"[..]);
107        let mut encoder = VarintLengthDelimitedEncoder::default();
108
109        encoder.encode((), &mut buffer).unwrap();
110        assert_eq!(buffer, &[0x03, b'f', b'o', b'o'][..]);
111    }
112
113    #[test]
114    fn encode_multi_byte_varint() {
115        let mut buffer = BytesMut::from(&b"foo"[..]);
116        let mut encoder = VarintLengthDelimitedEncoder::new(1000);
117
118        // Set a larger frame to trigger multi-byte varint
119        buffer.clear();
120        buffer.extend_from_slice(&vec![b'x'; 300]);
121        encoder.encode((), &mut buffer).unwrap();
122
123        // 300 in varint encoding: 0xAC 0x02
124        assert_eq!(buffer[0..2], [0xAC, 0x02]);
125        assert_eq!(buffer.len(), 302); // 2 bytes varint + 300 bytes data
126    }
127
128    #[test]
129    fn encode_frame_too_large() {
130        let large_data = vec![b'x'; 1001];
131        let mut buffer = BytesMut::from(&large_data[..]);
132        let mut encoder = VarintLengthDelimitedEncoder::new(1000);
133
134        assert!(encoder.encode((), &mut buffer).is_err());
135    }
136
137    #[test]
138    fn encode_empty_buffer() {
139        let mut buffer = BytesMut::new();
140        let mut encoder = VarintLengthDelimitedEncoder::default();
141
142        encoder.encode((), &mut buffer).unwrap();
143        assert_eq!(buffer.len(), 0);
144    }
145}