1use prost_reflect::{MessageDescriptor, prost::Message as _};
7use snafu::Snafu;
8use std::sync::Arc;
9use vector_config::configurable_component;
10use vector_core::{config::DataType, event::Event, schema};
11use vrl::protobuf::encode::{Options, encode_message};
12
13#[derive(Debug, Snafu)]
15pub enum ProtoBatchEncodingError {
16 #[snafu(display("Cannot encode an empty batch"))]
18 NoEvents,
19
20 #[snafu(display("Unsupported event type: only Log events are supported"))]
22 UnsupportedEventType,
23
24 #[snafu(display("Protobuf encoding failed: {}", source))]
26 EncodingFailed {
27 source: vector_common::Error,
29 },
30
31 #[snafu(display("Protobuf prost encoding failed: {}", source))]
33 ProstEncodingFailed {
34 source: prost_reflect::prost::EncodeError,
36 },
37}
38
39#[configurable_component]
41#[derive(Clone, Default)]
42pub struct ProtoBatchSerializerConfig {
43 #[serde(skip)]
45 #[configurable(derived)]
46 pub descriptor: Option<MessageDescriptor>,
47}
48
49impl std::fmt::Debug for ProtoBatchSerializerConfig {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("ProtoBatchSerializerConfig")
52 .field(
53 "descriptor",
54 &self.descriptor.as_ref().map(|d| d.full_name().to_string()),
55 )
56 .finish()
57 }
58}
59
60impl ProtoBatchSerializerConfig {
61 pub fn new(descriptor: MessageDescriptor) -> Self {
63 Self {
64 descriptor: Some(descriptor),
65 }
66 }
67
68 pub fn input_type(&self) -> DataType {
70 DataType::Log
71 }
72
73 pub fn schema_requirement(&self) -> schema::Requirement {
75 schema::Requirement::empty()
76 }
77}
78
79#[derive(Clone, Debug)]
81pub struct ProtoBatchSerializer {
82 descriptor: Arc<MessageDescriptor>,
83 options: Options,
84}
85
86impl ProtoBatchSerializer {
87 pub fn new(config: ProtoBatchSerializerConfig) -> Result<Self, vector_common::Error> {
89 let descriptor = config.descriptor.ok_or_else(|| {
90 vector_common::Error::from("Proto batch serializer requires a message descriptor.")
91 })?;
92
93 Ok(Self {
94 descriptor: Arc::new(descriptor),
95 options: Options {
96 use_json_names: false,
97 allow_lossy_string_coercion: true,
98 },
99 })
100 }
101
102 pub fn encode_batch(&self, events: &[Event]) -> Result<Vec<Vec<u8>>, ProtoBatchEncodingError> {
104 if events.is_empty() {
105 return Err(ProtoBatchEncodingError::NoEvents);
106 }
107
108 let mut records = Vec::with_capacity(events.len());
109
110 for event in events {
111 let dynamic_message = match event {
112 Event::Log(log) => {
113 encode_message(&self.descriptor, log.value().clone(), &self.options)
114 }
115 Event::Trace(_) | Event::Metric(_) => {
116 return Err(ProtoBatchEncodingError::UnsupportedEventType);
117 }
118 }
119 .map_err(|source| ProtoBatchEncodingError::EncodingFailed {
120 source: source.into(),
121 })?;
122
123 records.push(dynamic_message.encode_to_vec());
124 }
125
126 Ok(records)
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use prost_reflect::{
134 DescriptorPool, DynamicMessage, Value as ProstValue,
135 prost_types::{
136 DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
137 field_descriptor_proto::{Label, Type},
138 },
139 };
140 use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent, Value};
141 use vrl::btreemap;
142
143 fn build_descriptor() -> MessageDescriptor {
144 let inner = DescriptorProto {
146 name: Some("Inner".to_string()),
147 field: vec![FieldDescriptorProto {
148 name: Some("label".to_string()),
149 number: Some(1),
150 label: Some(Label::Optional as i32),
151 r#type: Some(Type::String as i32),
152 ..Default::default()
153 }],
154 ..Default::default()
155 };
156
157 let outer = DescriptorProto {
159 name: Some("Outer".to_string()),
160 field: vec![
161 FieldDescriptorProto {
162 name: Some("name".to_string()),
163 number: Some(1),
164 label: Some(Label::Optional as i32),
165 r#type: Some(Type::String as i32),
166 ..Default::default()
167 },
168 FieldDescriptorProto {
169 name: Some("count".to_string()),
170 number: Some(2),
171 label: Some(Label::Optional as i32),
172 r#type: Some(Type::Int64 as i32),
173 ..Default::default()
174 },
175 FieldDescriptorProto {
176 name: Some("inner".to_string()),
177 number: Some(3),
178 label: Some(Label::Optional as i32),
179 r#type: Some(Type::Message as i32),
180 type_name: Some(".test.Inner".to_string()),
181 ..Default::default()
182 },
183 ],
184 nested_type: vec![],
185 ..Default::default()
186 };
187
188 let file = FileDescriptorProto {
189 name: Some("test.proto".to_string()),
190 package: Some("test".to_string()),
191 message_type: vec![outer, inner],
192 syntax: Some("proto3".to_string()),
193 ..Default::default()
194 };
195
196 let pool = DescriptorPool::from_file_descriptor_set(FileDescriptorSet { file: vec![file] })
197 .expect("descriptor pool builds");
198 pool.get_message_by_name("test.Outer")
199 .expect("Outer message exists")
200 }
201
202 fn make_serializer() -> ProtoBatchSerializer {
203 ProtoBatchSerializer::new(ProtoBatchSerializerConfig::new(build_descriptor()))
204 .expect("serializer builds")
205 }
206
207 #[test]
208 fn empty_batch_returns_no_events_error() {
209 let serializer = make_serializer();
210 let err = serializer
211 .encode_batch(&[])
212 .expect_err("empty batch errors");
213 assert!(matches!(err, ProtoBatchEncodingError::NoEvents));
214 }
215
216 #[test]
217 fn metric_event_is_rejected() {
218 let serializer = make_serializer();
219 let metric = Event::Metric(Metric::new(
220 "test",
221 MetricKind::Absolute,
222 MetricValue::Counter { value: 1.0 },
223 ));
224 let err = serializer
225 .encode_batch(&[metric])
226 .expect_err("metric event errors");
227 assert!(matches!(err, ProtoBatchEncodingError::UnsupportedEventType));
228 }
229
230 #[test]
231 fn trace_event_is_rejected() {
232 let serializer = make_serializer();
233 let trace = Event::Trace(TraceEvent::default());
234 let err = serializer
235 .encode_batch(&[trace])
236 .expect_err("trace event errors");
237 assert!(matches!(err, ProtoBatchEncodingError::UnsupportedEventType));
238 }
239
240 #[test]
241 fn round_trip_decode_preserves_field_mapping() {
242 let descriptor = build_descriptor();
243 let serializer =
244 ProtoBatchSerializer::new(ProtoBatchSerializerConfig::new(descriptor.clone()))
245 .expect("serializer builds");
246
247 let event = Event::Log(LogEvent::from(btreemap! {
248 "name" => Value::from("hello"),
249 "count" => Value::from(42_i64),
250 "inner" => Value::from(btreemap! {
251 "label" => Value::from("nested"),
252 }),
253 }));
254
255 let records = serializer
256 .encode_batch(&[event])
257 .expect("encoding succeeds");
258 assert_eq!(records.len(), 1);
259
260 let decoded =
261 DynamicMessage::decode(descriptor, records[0].as_slice()).expect("decode succeeds");
262
263 let name_field = decoded
264 .get_field_by_name("name")
265 .expect("name field present");
266 assert_eq!(name_field.as_str(), Some("hello"));
267
268 let count_field = decoded
269 .get_field_by_name("count")
270 .expect("count field present");
271 assert_eq!(count_field.as_i64(), Some(42));
272
273 let inner_field = decoded
274 .get_field_by_name("inner")
275 .expect("inner field present");
276 let inner_msg = match &*inner_field {
277 ProstValue::Message(m) => m,
278 other => panic!("expected nested message, got {:?}", other),
279 };
280 let label = inner_msg
281 .get_field_by_name("label")
282 .expect("label field present");
283 assert_eq!(label.as_str(), Some("nested"));
284 }
285}