codecs/decoding/format/
avro.rs

1use bytes::{Buf, Bytes};
2use chrono::Utc;
3use lookup::event_path;
4use serde::{Deserialize, Serialize};
5use smallvec::{SmallVec, smallvec};
6use vector_config::configurable_component;
7use vector_core::{
8    config::{DataType, LogNamespace, log_schema},
9    event::{Event, LogEvent},
10    schema,
11};
12use vrl::value::KeyString;
13
14use super::Deserializer;
15use crate::encoding::AvroSerializerOptions;
16
17type VrlValue = vrl::value::Value;
18type AvroValue = apache_avro::types::Value;
19
20const CONFLUENT_MAGIC_BYTE: u8 = 0;
21const CONFLUENT_SCHEMA_PREFIX_LEN: usize = 5;
22
23/// Config used to build a `AvroDeserializer`.
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct AvroDeserializerConfig {
26    /// Options for the Avro deserializer.
27    pub avro_options: AvroDeserializerOptions,
28}
29
30impl AvroDeserializerConfig {
31    /// Creates a new `AvroDeserializerConfig`.
32    pub const fn new(schema: String, strip_schema_id_prefix: bool) -> Self {
33        Self {
34            avro_options: AvroDeserializerOptions {
35                schema,
36                strip_schema_id_prefix,
37            },
38        }
39    }
40
41    /// Build the `AvroDeserializer` from this configuration.
42    pub fn build(&self) -> vector_common::Result<AvroDeserializer> {
43        let schema = apache_avro::Schema::parse_str(&self.avro_options.schema)
44            .map_err(|error| format!("Failed building Avro serializer: {error}"))?;
45
46        Ok(AvroDeserializer {
47            schema,
48            strip_schema_id_prefix: self.avro_options.strip_schema_id_prefix,
49        })
50    }
51
52    /// The data type of events that are accepted by `AvroDeserializer`.
53    pub fn output_type(&self) -> DataType {
54        DataType::Log
55    }
56
57    /// The schema required by the serializer.
58    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
59        match log_namespace {
60            LogNamespace::Legacy => {
61                let mut definition = schema::Definition::empty_legacy_namespace()
62                    .unknown_fields(vrl::value::Kind::any());
63
64                if let Some(timestamp_key) = log_schema().timestamp_key() {
65                    definition = definition.try_with_field(
66                        timestamp_key,
67                        vrl::value::Kind::any().or_timestamp(),
68                        Some("timestamp"),
69                    );
70                }
71                definition
72            }
73            LogNamespace::Vector => schema::Definition::new_with_default_metadata(
74                vrl::value::Kind::any(),
75                [log_namespace],
76            ),
77        }
78    }
79}
80
81impl From<&AvroDeserializerOptions> for AvroSerializerOptions {
82    fn from(value: &AvroDeserializerOptions) -> Self {
83        Self {
84            schema: value.schema.clone(),
85        }
86    }
87}
88/// Apache Avro serializer options.
89#[configurable_component]
90#[derive(Clone, Debug)]
91pub struct AvroDeserializerOptions {
92    /// The Avro schema definition.
93    /// **Note**: The following [`apache_avro::types::Value`] variants are *not* supported:
94    /// * `Date`
95    /// * `Decimal`
96    /// * `Duration`
97    /// * `Fixed`
98    /// * `TimeMillis`
99    #[configurable(metadata(
100        docs::examples = r#"{ "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] }"#,
101        docs::additional_props_description = r#"Supports most avro data types, unsupported data types includes
102        ["decimal", "duration", "local-timestamp-millis", "local-timestamp-micros"]"#,
103    ))]
104    pub schema: String,
105
106    /// For Avro datum encoded in Kafka messages, the bytes are prefixed with the schema ID.  Set this to `true` to strip the schema ID prefix, as described in [Confluent Kafka's documentation](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format).
107    pub strip_schema_id_prefix: bool,
108}
109
110/// Serializer that converts bytes to an `Event` using the Apache Avro format.
111#[derive(Debug, Clone)]
112pub struct AvroDeserializer {
113    schema: apache_avro::Schema,
114    strip_schema_id_prefix: bool,
115}
116
117impl AvroDeserializer {
118    /// Creates a new `AvroDeserializer`.
119    pub const fn new(schema: apache_avro::Schema, strip_schema_id_prefix: bool) -> Self {
120        Self {
121            schema,
122            strip_schema_id_prefix,
123        }
124    }
125}
126
127impl Deserializer for AvroDeserializer {
128    fn parse(
129        &self,
130        bytes: Bytes,
131        log_namespace: LogNamespace,
132    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
133        // Avro has a `null` type which indicates no value.
134        if bytes.is_empty() {
135            return Ok(smallvec![]);
136        }
137
138        let bytes = if self.strip_schema_id_prefix {
139            if bytes.len() >= CONFLUENT_SCHEMA_PREFIX_LEN && bytes[0] == CONFLUENT_MAGIC_BYTE {
140                bytes.slice(CONFLUENT_SCHEMA_PREFIX_LEN..)
141            } else {
142                return Err(vector_common::Error::from(
143                    "Expected avro datum to be prefixed with schema id",
144                ));
145            }
146        } else {
147            bytes
148        };
149
150        let value = apache_avro::from_avro_datum(&self.schema, &mut bytes.reader(), None)?;
151
152        let apache_avro::types::Value::Record(fields) = value else {
153            return Err(vector_common::Error::from("Expected an avro Record"));
154        };
155
156        let mut log = LogEvent::default();
157        for (k, v) in fields {
158            log.insert(event_path!(k.as_str()), try_from(v)?);
159        }
160
161        let mut event = Event::Log(log);
162        let event = match log_namespace {
163            LogNamespace::Vector => event,
164            LogNamespace::Legacy => {
165                if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
166                    let log = event.as_mut_log();
167                    if !log.contains(timestamp_key) {
168                        let timestamp = Utc::now();
169                        log.insert(timestamp_key, timestamp);
170                    }
171                }
172                event
173            }
174        };
175        Ok(smallvec![event])
176    }
177}
178
179// Can't use std::convert::TryFrom because of orphan rules
180pub fn try_from(value: AvroValue) -> vector_common::Result<VrlValue> {
181    // Very similar to avro to json see `impl std::convert::TryFrom<AvroValue> for serde_json::Value`
182    // LogEvent has native support for bytes, so it is used for Bytes and Fixed
183    match value {
184        AvroValue::Array(array) => {
185            let mut vector = Vec::new();
186            for item in array {
187                vector.push(try_from(item)?);
188            }
189            Ok(VrlValue::Array(vector))
190        }
191        AvroValue::Boolean(boolean) => Ok(VrlValue::from(boolean)),
192        AvroValue::Bytes(bytes) => Ok(VrlValue::from(bytes)),
193        AvroValue::Date(_) => Err(vector_common::Error::from(
194            "AvroValue::Date is not supported",
195        )),
196        AvroValue::Decimal(_) => Err(vector_common::Error::from(
197            "AvroValue::Decimal is not supported",
198        )),
199        AvroValue::Double(double) => Ok(VrlValue::from_f64_or_zero(double)),
200        AvroValue::Duration(_) => Err(vector_common::Error::from(
201            "AvroValue::Duration is not supported",
202        )),
203        AvroValue::Enum(_, string) => Ok(VrlValue::from(string)),
204        AvroValue::Fixed(_, _) => Err(vector_common::Error::from(
205            "AvroValue::Fixed is not supported",
206        )),
207        AvroValue::Float(float) => Ok(VrlValue::from_f64_or_zero(float as f64)),
208        AvroValue::Int(int) => Ok(VrlValue::from(int)),
209        AvroValue::Long(long) => Ok(VrlValue::from(long)),
210        AvroValue::Map(items) => items
211            .into_iter()
212            .map(|(key, value)| try_from(value).map(|v| (KeyString::from(key), v)))
213            .collect::<Result<Vec<_>, _>>()
214            .map(|v| VrlValue::Object(v.into_iter().collect())),
215        AvroValue::Null => Ok(VrlValue::Null),
216        AvroValue::Record(items) => items
217            .into_iter()
218            .map(|(key, value)| try_from(value).map(|v| (KeyString::from(key), v)))
219            .collect::<Result<Vec<_>, _>>()
220            .map(|v| VrlValue::Object(v.into_iter().collect())),
221        AvroValue::String(string) => Ok(VrlValue::from(string)),
222        AvroValue::TimeMicros(time_micros) => Ok(VrlValue::from(time_micros)),
223        AvroValue::TimeMillis(_) => Err(vector_common::Error::from(
224            "AvroValue::TimeMillis is not supported",
225        )),
226        AvroValue::TimestampMicros(ts_micros) => Ok(VrlValue::from(ts_micros)),
227        AvroValue::TimestampMillis(ts_millis) => Ok(VrlValue::from(ts_millis)),
228        AvroValue::Union(_, v) => try_from(*v),
229        AvroValue::Uuid(uuid) => Ok(VrlValue::from(uuid.as_hyphenated().to_string())),
230        AvroValue::LocalTimestampMillis(ts_millis) => Ok(VrlValue::from(ts_millis)),
231        AvroValue::LocalTimestampMicros(ts_micros) => Ok(VrlValue::from(ts_micros)),
232        AvroValue::BigDecimal(_) => Err(vector_common::Error::from(
233            "AvroValue::BigDecimal is not supported",
234        )),
235        AvroValue::TimestampNanos(_) => Err(vector_common::Error::from(
236            "AvroValue::TimestampNanos is not supported",
237        )),
238        AvroValue::LocalTimestampNanos(_) => Err(vector_common::Error::from(
239            "AvroValue::LocalTimestampNanos is not supported",
240        )),
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use apache_avro::Schema;
247    use bytes::BytesMut;
248    use uuid::Uuid;
249
250    use super::*;
251
252    #[derive(Debug, Clone, Serialize, Deserialize)]
253    struct Log {
254        message: String,
255    }
256
257    fn get_schema() -> Schema {
258        let schema = String::from(
259            r#"{
260                "type": "record",
261                "name": "log",
262                "fields": [
263                    {
264                        "name": "message",
265                        "type": "string"
266                    }
267                ]
268            }
269        "#,
270        );
271
272        Schema::parse_str(&schema).unwrap()
273    }
274
275    #[test]
276    fn deserialize_avro() {
277        let schema = get_schema();
278
279        let event = Log {
280            message: "hello from avro".to_owned(),
281        };
282        let record_value = apache_avro::to_value(event).unwrap();
283        let record_datum = apache_avro::to_avro_datum(&schema, record_value).unwrap();
284        let record_bytes = Bytes::from(record_datum);
285
286        let deserializer = AvroDeserializer::new(schema, false);
287        let events = deserializer
288            .parse(record_bytes, LogNamespace::Vector)
289            .unwrap();
290        assert_eq!(events.len(), 1);
291
292        assert_eq!(
293            events[0].as_log().get("message").unwrap(),
294            &VrlValue::from("hello from avro")
295        );
296    }
297
298    #[test]
299    fn deserialize_avro_strip_schema_id_prefix() {
300        let schema = get_schema();
301
302        let event = Log {
303            message: "hello from avro".to_owned(),
304        };
305        let record_value = apache_avro::to_value(event).unwrap();
306        let record_datum = apache_avro::to_avro_datum(&schema, record_value).unwrap();
307
308        let mut bytes = BytesMut::new();
309        bytes.extend([0, 0, 0, 0, 0]); // 0 prefix + 4 byte schema id
310        bytes.extend(record_datum);
311
312        let deserializer = AvroDeserializer::new(schema, true);
313        let events = deserializer
314            .parse(bytes.freeze(), LogNamespace::Vector)
315            .unwrap();
316        assert_eq!(events.len(), 1);
317
318        assert_eq!(
319            events[0].as_log().get("message").unwrap(),
320            &VrlValue::from("hello from avro")
321        );
322    }
323
324    #[test]
325    fn deserialize_avro_uuid() {
326        let schema = get_schema();
327
328        let uuid = Uuid::new_v4().hyphenated().to_string();
329        let event = Log {
330            message: uuid.clone(),
331        };
332        let value = apache_avro::to_value(event).unwrap();
333        // let value = value.resolve(&schema).unwrap();
334        let datum = apache_avro::to_avro_datum(&schema, value).unwrap();
335
336        let mut bytes = BytesMut::new();
337        bytes.extend([0, 0, 0, 0, 0]); // 0 prefix + 4 byte schema id
338        bytes.extend(datum);
339
340        let deserializer = AvroDeserializer::new(schema, true);
341        let events = deserializer
342            .parse(bytes.freeze(), LogNamespace::Vector)
343            .unwrap();
344        assert_eq!(events.len(), 1);
345        assert_eq!(
346            events[0].as_log().get("message").unwrap(),
347            &VrlValue::from(uuid)
348        );
349    }
350}