codecs/encoding/framing/
length_delimited.rs

1use bytes::BytesMut;
2use tokio_util::codec::{Encoder, LengthDelimitedCodec};
3use vector_config::configurable_component;
4
5use super::BoxedFramingError;
6use crate::common::length_delimited::LengthDelimitedCoderOptions;
7
8/// Config used to build a `LengthDelimitedEncoder`.
9#[configurable_component]
10#[derive(Debug, Clone, Default, Eq, PartialEq)]
11pub struct LengthDelimitedEncoderConfig {
12    /// Options for the length delimited decoder.
13    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
14    pub length_delimited: LengthDelimitedCoderOptions,
15}
16
17impl LengthDelimitedEncoderConfig {
18    /// Build the `LengthDelimitedEncoder` from this configuration.
19    pub fn build(&self) -> LengthDelimitedEncoder {
20        LengthDelimitedEncoder::new(&self.length_delimited)
21    }
22}
23
24/// An encoder for handling bytes that are delimited by a length header.
25#[derive(Debug, Clone)]
26pub struct LengthDelimitedEncoder {
27    codec: LengthDelimitedCodec,
28    inner_buffer: BytesMut,
29}
30
31impl LengthDelimitedEncoder {
32    /// Creates a new `LengthDelimitedEncoder`.
33    pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
34        Self {
35            codec: config.build_codec(),
36            inner_buffer: BytesMut::new(),
37        }
38    }
39}
40
41impl Default for LengthDelimitedEncoder {
42    fn default() -> Self {
43        Self {
44            codec: LengthDelimitedCodec::new(),
45            inner_buffer: BytesMut::new(),
46        }
47    }
48}
49
50impl Encoder<()> for LengthDelimitedEncoder {
51    type Error = BoxedFramingError;
52
53    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), BoxedFramingError> {
54        self.inner_buffer.clear();
55        self.inner_buffer.extend_from_slice(buffer);
56        buffer.clear();
57        let bytes = self.inner_buffer.split().freeze();
58        self.codec.encode(bytes, buffer)?;
59        Ok(())
60    }
61}
62
63#[cfg(test)]
64mod tests {
65    use super::*;
66
67    #[test]
68    fn encode() {
69        let mut codec = LengthDelimitedEncoder::default();
70
71        let mut buffer = BytesMut::from("abc");
72        codec.encode((), &mut buffer).unwrap();
73
74        assert_eq!(&buffer[..], b"\0\0\0\x03abc");
75    }
76
77    #[test]
78    fn encode_2byte_length() {
79        let mut codec = LengthDelimitedEncoder::new(&LengthDelimitedCoderOptions {
80            length_field_length: 2,
81            ..Default::default()
82        });
83
84        let mut buffer = BytesMut::from("abc");
85        codec.encode((), &mut buffer).unwrap();
86
87        assert_eq!(&buffer[..], b"\0\x03abc");
88    }
89}