codecs/decoding/format/
vrl.rs

1use bytes::Bytes;
2use smallvec::{SmallVec, smallvec};
3use vector_config_macros::configurable_component;
4use vector_core::{
5    compile_vrl,
6    config::{DataType, LogNamespace},
7    event::{Event, TargetEvents, VrlTarget},
8    schema,
9};
10use vrl::{
11    compiler::{CompileConfig, Program, TimeZone, TypeState, runtime::Runtime, state::ExternalEnv},
12    diagnostic::Formatter,
13    value::Kind,
14};
15
16use vector_core::event::EventMetadata;
17
18use crate::decoding::format::Deserializer;
19
20/// Config used to build a `VrlDeserializer`.
21#[configurable_component]
22#[derive(Debug, Clone, Default)]
23pub struct VrlDeserializerConfig {
24    /// VRL-specific decoding options.
25    pub vrl: VrlDeserializerOptions,
26}
27
28/// VRL-specific decoding options.
29#[configurable_component]
30#[derive(Debug, Clone, PartialEq, Eq, Default)]
31pub struct VrlDeserializerOptions {
32    /// The [Vector Remap Language][vrl] (VRL) program to execute for each event.
33    /// The final contents of the `.` target are used as the decoding result.
34    /// Compilation errors or use of `abort` in the program result in a decoding error.
35    ///
36    ///
37    /// [vrl]: https://vector.dev/docs/reference/vrl
38    pub source: String,
39
40    /// The name of the timezone to apply to timestamp conversions that do not contain an explicit
41    /// time zone. The time zone name may be any name in the [TZ database][tz_database], or `local`
42    /// to indicate system local time.
43    ///
44    /// If not set, `local` is used.
45    ///
46    /// [tz_database]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
47    #[serde(default)]
48    #[configurable(metadata(docs::advanced))]
49    pub timezone: Option<TimeZone>,
50}
51
52impl VrlDeserializerConfig {
53    /// Build the `VrlDeserializer` from this configuration.
54    pub fn build(&self) -> vector_common::Result<VrlDeserializer> {
55        let state = TypeState {
56            local: Default::default(),
57            external: ExternalEnv::default(),
58        };
59
60        match compile_vrl(
61            &self.vrl.source,
62            &vector_vrl_functions::all(),
63            &state,
64            CompileConfig::default(),
65        ) {
66            Ok(result) => Ok(VrlDeserializer {
67                program: result.program,
68                timezone: self.vrl.timezone.unwrap_or(TimeZone::Local),
69                metadata_template: None,
70            }),
71            Err(diagnostics) => Err(Formatter::new(&self.vrl.source, diagnostics)
72                .to_string()
73                .into()),
74        }
75    }
76
77    /// Return the type of event build by this deserializer.
78    pub fn output_type(&self) -> DataType {
79        DataType::Log
80    }
81
82    /// The schema produced by the deserializer.
83    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
84        match log_namespace {
85            LogNamespace::Legacy => {
86                schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
87            }
88            LogNamespace::Vector => {
89                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
90            }
91        }
92    }
93}
94
95/// Deserializer that builds `Event`s from a byte frame containing logs compatible with VRL.
96#[derive(Debug, Clone)]
97pub struct VrlDeserializer {
98    program: Program,
99    timezone: TimeZone,
100    /// Per-call metadata template set by the source before decoding. When
101    /// present, every `%`-prefixed path in the template is accessible from
102    /// within the VRL program (e.g. `%splunk_hec.host`, `%vector.secrets.*`).
103    metadata_template: Option<EventMetadata>,
104}
105
106impl VrlDeserializer {
107    /// Attach a metadata template that will be pre-populated on each synthetic
108    /// event before the VRL program runs.
109    ///
110    /// Sources call this once per decode call with the per-request context they
111    /// have assembled (envelope fields, auth tokens, etc.). VRL can then read
112    /// those values via `%`-prefixed paths.
113    #[must_use]
114    pub fn with_metadata_template(mut self, metadata: EventMetadata) -> Self {
115        self.metadata_template = Some(metadata);
116        self
117    }
118}
119
120fn parse_bytes(bytes: Bytes, log_namespace: LogNamespace) -> Event {
121    use crate::BytesDeserializerConfig;
122    let bytes_deserializer = BytesDeserializerConfig::new().build();
123    let log_event = bytes_deserializer.parse_single(bytes, log_namespace);
124    Event::from(log_event)
125}
126
127impl Deserializer for VrlDeserializer {
128    fn parse(
129        &self,
130        bytes: Bytes,
131        log_namespace: LogNamespace,
132    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
133        let mut event = parse_bytes(bytes, log_namespace);
134        if let Some(template) = &self.metadata_template {
135            // Pre-populate the synthetic event with the source-assembled metadata so
136            // every `%`-prefixed path is in scope when VRL executes. This lets
137            // user programs read `%splunk_hec.host`, `%vector.secrets.*`, etc.
138            *event.metadata_mut() = template.clone();
139        }
140        self.run_vrl(event, log_namespace)
141    }
142}
143
144impl VrlDeserializer {
145    fn run_vrl(
146        &self,
147        event: Event,
148        log_namespace: LogNamespace,
149    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
150        let mut runtime = Runtime::default();
151        let mut target = VrlTarget::new(event, self.program.info(), true);
152        match runtime.resolve(&mut target, &self.program, &self.timezone) {
153            Ok(_) => match target.into_events(log_namespace) {
154                TargetEvents::One(event) => Ok(smallvec![event]),
155                TargetEvents::Logs(events_iter) => Ok(SmallVec::from_iter(events_iter)),
156                TargetEvents::Traces(_) => Err("trace targets are not supported".into()),
157            },
158            Err(e) => Err(e.to_string().into()),
159        }
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use chrono::{DateTime, Utc};
166    use indoc::indoc;
167    use vrl::{btreemap, path::OwnedTargetPath, value::Value};
168
169    use super::*;
170
171    fn make_decoder(source: &str) -> VrlDeserializer {
172        VrlDeserializerConfig {
173            vrl: VrlDeserializerOptions {
174                source: source.to_string(),
175                timezone: None,
176            },
177        }
178        .build()
179        .expect("Failed to build VrlDeserializer")
180    }
181
182    #[test]
183    fn test_json_message() {
184        let source = indoc!(
185            r#"
186            %m1 = "metadata"
187            . = string!(.)
188            . = parse_json!(.)
189            "#
190        );
191
192        let decoder = make_decoder(source);
193
194        let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
195        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
196        assert_eq!(result.len(), 1);
197        let event = result.first().unwrap();
198        assert_eq!(
199            *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
200            btreemap! { "message" => "Hello VRL" }.into()
201        );
202        assert_eq!(
203            *event
204                .as_log()
205                .get(&OwnedTargetPath::metadata_root())
206                .unwrap(),
207            btreemap! { "m1" => "metadata" }.into()
208        );
209    }
210
211    #[test]
212    fn test_ignored_returned_expression() {
213        let source = indoc!(
214            r#"
215            . = { "a" : 1 }
216            { "b" : 9 }
217        "#
218        );
219
220        let decoder = make_decoder(source);
221
222        let log_bytes = Bytes::from("some bytes");
223        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
224        assert_eq!(result.len(), 1);
225        let event = result.first().unwrap();
226        assert_eq!(
227            *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
228            btreemap! { "a" => 1 }.into()
229        );
230    }
231
232    #[test]
233    fn test_multiple_events() {
234        let source = indoc!(". = [0,1,2]");
235        let decoder = make_decoder(source);
236        let log_bytes = Bytes::from("some bytes");
237        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
238        assert_eq!(result.len(), 3);
239        for (i, event) in result.iter().enumerate() {
240            assert_eq!(
241                *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
242                i.into()
243            );
244        }
245    }
246
247    #[test]
248    fn test_syslog_and_cef_input() {
249        let source = indoc!(
250            r#"
251            if exists(.message) {
252                . = string!(.message)
253            }
254            . = parse_syslog(.) ?? parse_cef(.) ?? null
255            "#
256        );
257
258        let decoder = make_decoder(source);
259
260        // Syslog input
261        let syslog_bytes = Bytes::from(
262            "<34>1 2024-02-06T15:04:05.000Z mymachine.example.com su - ID47 - 'su root' failed for user on /dev/pts/8",
263        );
264        let result = decoder.parse(syslog_bytes, LogNamespace::Vector).unwrap();
265        assert_eq!(result.len(), 1);
266        let syslog_event = result.first().unwrap();
267        assert_eq!(
268            *syslog_event
269                .as_log()
270                .get(&OwnedTargetPath::event_root())
271                .unwrap(),
272            btreemap! {
273                "appname" => "su",
274                "facility" => "auth",
275                "hostname" => "mymachine.example.com",
276                "message" => "'su root' failed for user on /dev/pts/8",
277                "msgid" => "ID47",
278                "severity" => "crit",
279                "timestamp" => "2024-02-06T15:04:05Z".parse::<DateTime<Utc>>().unwrap(),
280                "version" => 1
281            }
282            .into()
283        );
284
285        // CEF input
286        let cef_bytes = Bytes::from(
287            "CEF:0|Security|Threat Manager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232",
288        );
289        let result = decoder.parse(cef_bytes, LogNamespace::Vector).unwrap();
290        assert_eq!(result.len(), 1);
291        let cef_event = result.first().unwrap();
292        assert_eq!(
293            *cef_event
294                .as_log()
295                .get(&OwnedTargetPath::event_root())
296                .unwrap(),
297            btreemap! {
298                "cefVersion" =>"0",
299                "deviceEventClassId" =>"100",
300                "deviceProduct" =>"Threat Manager",
301                "deviceVendor" =>"Security",
302                "deviceVersion" =>"1.0",
303                "dst" =>"2.1.2.2",
304                "name" =>"worm successfully stopped",
305                "severity" =>"10",
306                "spt" =>"1232",
307                "src" =>"10.0.0.1"
308            }
309            .into()
310        );
311        let random_bytes = Bytes::from("a|- -| x");
312        let result = decoder.parse(random_bytes, LogNamespace::Vector).unwrap();
313        let random_event = result.first().unwrap();
314        assert_eq!(result.len(), 1);
315        assert_eq!(
316            *random_event
317                .as_log()
318                .get(&OwnedTargetPath::event_root())
319                .unwrap(),
320            Value::Null
321        );
322    }
323
324    #[test]
325    fn test_invalid_source() {
326        let error = VrlDeserializerConfig {
327            vrl: VrlDeserializerOptions {
328                source: ". ?".to_string(),
329                timezone: None,
330            },
331        }
332        .build()
333        .unwrap_err()
334        .to_string();
335        assert!(error.contains("error[E203]: syntax error"));
336    }
337
338    #[test]
339    fn test_abort() {
340        let decoder = make_decoder("abort");
341        let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
342        let error = decoder
343            .parse(log_bytes, LogNamespace::Vector)
344            .unwrap_err()
345            .to_string();
346        assert!(error.contains("aborted"));
347    }
348
349    fn metadata_with_secret(key: &str, value: &str) -> EventMetadata {
350        let mut metadata = EventMetadata::default();
351        metadata.secrets_mut().insert(key, value);
352        metadata
353    }
354
355    /// A VRL program that uses `get_secret!()` can read a secret injected via
356    /// `with_metadata_template`.
357    #[test]
358    fn test_with_metadata_template_vrl_can_read_secret() {
359        // VRL program copies the injected secret into an event field so we can
360        // assert on its value. The input bytes become `.message` (Legacy namespace)
361        // and we add `.secret_value` alongside it.
362        let decoder = make_decoder(r#".secret_value = get_secret!("my_token")"#)
363            .with_metadata_template(metadata_with_secret("my_token", "super-secret"));
364
365        let bytes = Bytes::from(r#"hello"#);
366        let events = decoder
367            .parse(bytes, LogNamespace::Legacy)
368            .expect("parse should succeed");
369
370        assert_eq!(events.len(), 1);
371        assert_eq!(
372            *events[0].as_log().get("secret_value").unwrap(),
373            Value::from("super-secret")
374        );
375    }
376
377    /// Secrets explicitly set by the VRL program win over the template because
378    /// `set_secret!` runs after the template is pre-populated.
379    #[test]
380    fn test_with_metadata_template_codec_wins_on_collision() {
381        let decoder = make_decoder(r#"set_secret!("my_token", "codec-wins")"#)
382            .with_metadata_template(metadata_with_secret("my_token", "template-loses"));
383
384        let bytes = Bytes::from(r#"hello"#);
385        let events = decoder
386            .parse(bytes, LogNamespace::Legacy)
387            .expect("parse should succeed");
388
389        assert_eq!(
390            events[0]
391                .metadata()
392                .secrets()
393                .get("my_token")
394                .unwrap()
395                .as_ref(),
396            "codec-wins"
397        );
398    }
399}