1use bytes::Bytes;
2use smallvec::{SmallVec, smallvec};
3use vector_config_macros::configurable_component;
4use vector_core::{
5 compile_vrl,
6 config::{DataType, LogNamespace},
7 event::{Event, TargetEvents, VrlTarget},
8 schema,
9};
10use vrl::{
11 compiler::{CompileConfig, Program, TimeZone, TypeState, runtime::Runtime, state::ExternalEnv},
12 diagnostic::Formatter,
13 value::Kind,
14};
15
16use vector_core::event::EventMetadata;
17
18use crate::decoding::format::Deserializer;
19
20#[configurable_component]
22#[derive(Debug, Clone, Default)]
23pub struct VrlDeserializerConfig {
24 pub vrl: VrlDeserializerOptions,
26}
27
28#[configurable_component]
30#[derive(Debug, Clone, PartialEq, Eq, Default)]
31pub struct VrlDeserializerOptions {
32 pub source: String,
39
40 #[serde(default)]
48 #[configurable(metadata(docs::advanced))]
49 pub timezone: Option<TimeZone>,
50}
51
52impl VrlDeserializerConfig {
53 pub fn build(&self) -> vector_common::Result<VrlDeserializer> {
55 let state = TypeState {
56 local: Default::default(),
57 external: ExternalEnv::default(),
58 };
59
60 match compile_vrl(
61 &self.vrl.source,
62 &vector_vrl_functions::all(),
63 &state,
64 CompileConfig::default(),
65 ) {
66 Ok(result) => Ok(VrlDeserializer {
67 program: result.program,
68 timezone: self.vrl.timezone.unwrap_or(TimeZone::Local),
69 metadata_template: None,
70 }),
71 Err(diagnostics) => Err(Formatter::new(&self.vrl.source, diagnostics)
72 .to_string()
73 .into()),
74 }
75 }
76
77 pub fn output_type(&self) -> DataType {
79 DataType::Log
80 }
81
82 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
84 match log_namespace {
85 LogNamespace::Legacy => {
86 schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
87 }
88 LogNamespace::Vector => {
89 schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
90 }
91 }
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct VrlDeserializer {
98 program: Program,
99 timezone: TimeZone,
100 metadata_template: Option<EventMetadata>,
104}
105
106impl VrlDeserializer {
107 #[must_use]
114 pub fn with_metadata_template(mut self, metadata: EventMetadata) -> Self {
115 self.metadata_template = Some(metadata);
116 self
117 }
118}
119
120fn parse_bytes(bytes: Bytes, log_namespace: LogNamespace) -> Event {
121 use crate::BytesDeserializerConfig;
122 let bytes_deserializer = BytesDeserializerConfig::new().build();
123 let log_event = bytes_deserializer.parse_single(bytes, log_namespace);
124 Event::from(log_event)
125}
126
127impl Deserializer for VrlDeserializer {
128 fn parse(
129 &self,
130 bytes: Bytes,
131 log_namespace: LogNamespace,
132 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
133 let mut event = parse_bytes(bytes, log_namespace);
134 if let Some(template) = &self.metadata_template {
135 *event.metadata_mut() = template.clone();
139 }
140 self.run_vrl(event, log_namespace)
141 }
142}
143
144impl VrlDeserializer {
145 fn run_vrl(
146 &self,
147 event: Event,
148 log_namespace: LogNamespace,
149 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
150 let mut runtime = Runtime::default();
151 let mut target = VrlTarget::new(event, self.program.info(), true);
152 match runtime.resolve(&mut target, &self.program, &self.timezone) {
153 Ok(_) => match target.into_events(log_namespace) {
154 TargetEvents::One(event) => Ok(smallvec![event]),
155 TargetEvents::Logs(events_iter) => Ok(SmallVec::from_iter(events_iter)),
156 TargetEvents::Traces(_) => Err("trace targets are not supported".into()),
157 },
158 Err(e) => Err(e.to_string().into()),
159 }
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use chrono::{DateTime, Utc};
166 use indoc::indoc;
167 use vrl::{btreemap, path::OwnedTargetPath, value::Value};
168
169 use super::*;
170
171 fn make_decoder(source: &str) -> VrlDeserializer {
172 VrlDeserializerConfig {
173 vrl: VrlDeserializerOptions {
174 source: source.to_string(),
175 timezone: None,
176 },
177 }
178 .build()
179 .expect("Failed to build VrlDeserializer")
180 }
181
182 #[test]
183 fn test_json_message() {
184 let source = indoc!(
185 r#"
186 %m1 = "metadata"
187 . = string!(.)
188 . = parse_json!(.)
189 "#
190 );
191
192 let decoder = make_decoder(source);
193
194 let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
195 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
196 assert_eq!(result.len(), 1);
197 let event = result.first().unwrap();
198 assert_eq!(
199 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
200 btreemap! { "message" => "Hello VRL" }.into()
201 );
202 assert_eq!(
203 *event
204 .as_log()
205 .get(&OwnedTargetPath::metadata_root())
206 .unwrap(),
207 btreemap! { "m1" => "metadata" }.into()
208 );
209 }
210
211 #[test]
212 fn test_ignored_returned_expression() {
213 let source = indoc!(
214 r#"
215 . = { "a" : 1 }
216 { "b" : 9 }
217 "#
218 );
219
220 let decoder = make_decoder(source);
221
222 let log_bytes = Bytes::from("some bytes");
223 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
224 assert_eq!(result.len(), 1);
225 let event = result.first().unwrap();
226 assert_eq!(
227 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
228 btreemap! { "a" => 1 }.into()
229 );
230 }
231
232 #[test]
233 fn test_multiple_events() {
234 let source = indoc!(". = [0,1,2]");
235 let decoder = make_decoder(source);
236 let log_bytes = Bytes::from("some bytes");
237 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
238 assert_eq!(result.len(), 3);
239 for (i, event) in result.iter().enumerate() {
240 assert_eq!(
241 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
242 i.into()
243 );
244 }
245 }
246
247 #[test]
248 fn test_syslog_and_cef_input() {
249 let source = indoc!(
250 r#"
251 if exists(.message) {
252 . = string!(.message)
253 }
254 . = parse_syslog(.) ?? parse_cef(.) ?? null
255 "#
256 );
257
258 let decoder = make_decoder(source);
259
260 let syslog_bytes = Bytes::from(
262 "<34>1 2024-02-06T15:04:05.000Z mymachine.example.com su - ID47 - 'su root' failed for user on /dev/pts/8",
263 );
264 let result = decoder.parse(syslog_bytes, LogNamespace::Vector).unwrap();
265 assert_eq!(result.len(), 1);
266 let syslog_event = result.first().unwrap();
267 assert_eq!(
268 *syslog_event
269 .as_log()
270 .get(&OwnedTargetPath::event_root())
271 .unwrap(),
272 btreemap! {
273 "appname" => "su",
274 "facility" => "auth",
275 "hostname" => "mymachine.example.com",
276 "message" => "'su root' failed for user on /dev/pts/8",
277 "msgid" => "ID47",
278 "severity" => "crit",
279 "timestamp" => "2024-02-06T15:04:05Z".parse::<DateTime<Utc>>().unwrap(),
280 "version" => 1
281 }
282 .into()
283 );
284
285 let cef_bytes = Bytes::from(
287 "CEF:0|Security|Threat Manager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232",
288 );
289 let result = decoder.parse(cef_bytes, LogNamespace::Vector).unwrap();
290 assert_eq!(result.len(), 1);
291 let cef_event = result.first().unwrap();
292 assert_eq!(
293 *cef_event
294 .as_log()
295 .get(&OwnedTargetPath::event_root())
296 .unwrap(),
297 btreemap! {
298 "cefVersion" =>"0",
299 "deviceEventClassId" =>"100",
300 "deviceProduct" =>"Threat Manager",
301 "deviceVendor" =>"Security",
302 "deviceVersion" =>"1.0",
303 "dst" =>"2.1.2.2",
304 "name" =>"worm successfully stopped",
305 "severity" =>"10",
306 "spt" =>"1232",
307 "src" =>"10.0.0.1"
308 }
309 .into()
310 );
311 let random_bytes = Bytes::from("a|- -| x");
312 let result = decoder.parse(random_bytes, LogNamespace::Vector).unwrap();
313 let random_event = result.first().unwrap();
314 assert_eq!(result.len(), 1);
315 assert_eq!(
316 *random_event
317 .as_log()
318 .get(&OwnedTargetPath::event_root())
319 .unwrap(),
320 Value::Null
321 );
322 }
323
324 #[test]
325 fn test_invalid_source() {
326 let error = VrlDeserializerConfig {
327 vrl: VrlDeserializerOptions {
328 source: ". ?".to_string(),
329 timezone: None,
330 },
331 }
332 .build()
333 .unwrap_err()
334 .to_string();
335 assert!(error.contains("error[E203]: syntax error"));
336 }
337
338 #[test]
339 fn test_abort() {
340 let decoder = make_decoder("abort");
341 let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
342 let error = decoder
343 .parse(log_bytes, LogNamespace::Vector)
344 .unwrap_err()
345 .to_string();
346 assert!(error.contains("aborted"));
347 }
348
349 fn metadata_with_secret(key: &str, value: &str) -> EventMetadata {
350 let mut metadata = EventMetadata::default();
351 metadata.secrets_mut().insert(key, value);
352 metadata
353 }
354
355 #[test]
358 fn test_with_metadata_template_vrl_can_read_secret() {
359 let decoder = make_decoder(r#".secret_value = get_secret!("my_token")"#)
363 .with_metadata_template(metadata_with_secret("my_token", "super-secret"));
364
365 let bytes = Bytes::from(r#"hello"#);
366 let events = decoder
367 .parse(bytes, LogNamespace::Legacy)
368 .expect("parse should succeed");
369
370 assert_eq!(events.len(), 1);
371 assert_eq!(
372 *events[0].as_log().get("secret_value").unwrap(),
373 Value::from("super-secret")
374 );
375 }
376
377 #[test]
380 fn test_with_metadata_template_codec_wins_on_collision() {
381 let decoder = make_decoder(r#"set_secret!("my_token", "codec-wins")"#)
382 .with_metadata_template(metadata_with_secret("my_token", "template-loses"));
383
384 let bytes = Bytes::from(r#"hello"#);
385 let events = decoder
386 .parse(bytes, LogNamespace::Legacy)
387 .expect("parse should succeed");
388
389 assert_eq!(
390 events[0]
391 .metadata()
392 .secrets()
393 .get("my_token")
394 .unwrap()
395 .as_ref(),
396 "codec-wins"
397 );
398 }
399}