codecs/encoding/format/
protobuf.rs

1use std::path::PathBuf;
2
3use crate::encoding::BuildError;
4use bytes::BytesMut;
5use prost_reflect::{MessageDescriptor, prost::Message as _};
6use tokio_util::codec::Encoder;
7use vector_config_macros::configurable_component;
8use vector_core::{
9    config::DataType,
10    event::{Event, Value},
11    schema,
12};
13use vrl::protobuf::{
14    descriptor::{get_message_descriptor, get_message_descriptor_from_bytes},
15    encode::{Options, encode_message},
16};
17
18/// Config used to build a `ProtobufSerializer`.
19#[configurable_component]
20#[derive(Debug, Clone)]
21pub struct ProtobufSerializerConfig {
22    /// Options for the Protobuf serializer.
23    pub protobuf: ProtobufSerializerOptions,
24}
25
26impl ProtobufSerializerConfig {
27    /// Build the `ProtobufSerializer` from this configuration.
28    pub fn build(&self) -> Result<ProtobufSerializer, BuildError> {
29        let message_descriptor =
30            get_message_descriptor(&self.protobuf.desc_file, &self.protobuf.message_type)?;
31        Ok(ProtobufSerializer {
32            message_descriptor,
33            options: Options {
34                use_json_names: self.protobuf.use_json_names,
35                allow_lossy_string_coercion: true,
36            },
37        })
38    }
39
40    /// The data type of events that are accepted by `ProtobufSerializer`.
41    pub fn input_type(&self) -> DataType {
42        DataType::Log | DataType::Trace
43    }
44
45    /// The schema required by the serializer.
46    pub fn schema_requirement(&self) -> schema::Requirement {
47        // While technically we support `Value` variants that can't be losslessly serialized to
48        // Protobuf, we don't want to enforce that limitation to users yet.
49        schema::Requirement::empty()
50    }
51}
52
53/// Protobuf serializer options.
54#[configurable_component]
55#[derive(Debug, Clone)]
56pub struct ProtobufSerializerOptions {
57    /// The path to the protobuf descriptor set file.
58    ///
59    /// This file is the output of `protoc -I <include path> -o <desc output path> <proto>`
60    ///
61    /// You can read more [here](https://buf.build/docs/reference/images/#how-buf-images-work).
62    #[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))]
63    pub desc_file: PathBuf,
64
65    /// The name of the message type to use for serializing.
66    #[configurable(metadata(docs::examples = "package.Message"))]
67    pub message_type: String,
68
69    /// Use JSON field names (camelCase) instead of protobuf field names (snake_case).
70    ///
71    /// When enabled, the serializer looks for fields using their JSON names as defined
72    /// in the `.proto` file (for example `jobDescription` instead of `job_description`).
73    ///
74    /// This is useful when working with data that has already been converted from JSON or
75    /// when interfacing with systems that use JSON naming conventions.
76    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
77    pub use_json_names: bool,
78}
79
80/// Serializer that converts an `Event` to bytes using the Protobuf format.
81#[derive(Debug, Clone)]
82pub struct ProtobufSerializer {
83    /// The protobuf message definition to use for serialization.
84    message_descriptor: MessageDescriptor,
85    options: Options,
86}
87
88impl ProtobufSerializer {
89    /// Creates a new `ProtobufSerializer`.
90    pub fn new(message_descriptor: MessageDescriptor) -> Self {
91        Self {
92            message_descriptor,
93            options: Options::default(),
94        }
95    }
96
97    /// Creates a new serializer instance using the descriptor bytes directly.
98    pub fn new_from_bytes(
99        desc_bytes: &[u8],
100        message_type: &str,
101        options: &Options,
102    ) -> vector_common::Result<Self> {
103        let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
104        Ok(Self {
105            message_descriptor,
106            options: options.clone(),
107        })
108    }
109
110    /// Get a description of the message type used in serialization.
111    pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto {
112        self.message_descriptor.descriptor_proto()
113    }
114}
115
116impl Encoder<Event> for ProtobufSerializer {
117    type Error = vector_common::Error;
118
119    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
120        let message = match event {
121            Event::Log(log) => {
122                encode_message(&self.message_descriptor, log.into_parts().0, &self.options)
123            }
124            Event::Metric(_) => unimplemented!(),
125            Event::Trace(trace) => encode_message(
126                &self.message_descriptor,
127                Value::Object(trace.into_parts().0),
128                &self.options,
129            ),
130        }?;
131        message.encode(buffer).map_err(Into::into)
132    }
133}