1use bytes::{Buf, Bytes};
2use chrono::Utc;
3use lookup::event_path;
4use serde::{Deserialize, Serialize};
5use smallvec::{SmallVec, smallvec};
6use vector_config::configurable_component;
7use vector_core::{
8 config::{DataType, LogNamespace, log_schema},
9 event::{Event, LogEvent},
10 schema,
11};
12use vrl::value::KeyString;
13
14use super::Deserializer;
15use crate::encoding::AvroSerializerOptions;
16
17type VrlValue = vrl::value::Value;
18type AvroValue = apache_avro::types::Value;
19
20const CONFLUENT_MAGIC_BYTE: u8 = 0;
21const CONFLUENT_SCHEMA_PREFIX_LEN: usize = 5;
22
23#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct AvroDeserializerConfig {
26 pub avro_options: AvroDeserializerOptions,
28}
29
30impl AvroDeserializerConfig {
31 pub const fn new(schema: String, strip_schema_id_prefix: bool) -> Self {
33 Self {
34 avro_options: AvroDeserializerOptions {
35 schema,
36 strip_schema_id_prefix,
37 },
38 }
39 }
40
41 pub fn build(&self) -> vector_common::Result<AvroDeserializer> {
43 let schema = apache_avro::Schema::parse_str(&self.avro_options.schema)
44 .map_err(|error| format!("Failed building Avro serializer: {error}"))?;
45
46 Ok(AvroDeserializer {
47 schema,
48 strip_schema_id_prefix: self.avro_options.strip_schema_id_prefix,
49 })
50 }
51
52 pub fn output_type(&self) -> DataType {
54 DataType::Log
55 }
56
57 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
59 match log_namespace {
60 LogNamespace::Legacy => {
61 let mut definition = schema::Definition::empty_legacy_namespace()
62 .unknown_fields(vrl::value::Kind::any());
63
64 if let Some(timestamp_key) = log_schema().timestamp_key() {
65 definition = definition.try_with_field(
66 timestamp_key,
67 vrl::value::Kind::any().or_timestamp(),
68 Some("timestamp"),
69 );
70 }
71 definition
72 }
73 LogNamespace::Vector => schema::Definition::new_with_default_metadata(
74 vrl::value::Kind::any(),
75 [log_namespace],
76 ),
77 }
78 }
79}
80
81impl From<&AvroDeserializerOptions> for AvroSerializerOptions {
82 fn from(value: &AvroDeserializerOptions) -> Self {
83 Self {
84 schema: value.schema.clone(),
85 }
86 }
87}
88#[configurable_component]
90#[derive(Clone, Debug)]
91pub struct AvroDeserializerOptions {
92 #[configurable(metadata(
100 docs::examples = r#"{ "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] }"#,
101 docs::additional_props_description = r#"Supports most avro data types, unsupported data types includes
102 ["decimal", "duration", "local-timestamp-millis", "local-timestamp-micros"]"#,
103 ))]
104 pub schema: String,
105
106 pub strip_schema_id_prefix: bool,
108}
109
110#[derive(Debug, Clone)]
112pub struct AvroDeserializer {
113 schema: apache_avro::Schema,
114 strip_schema_id_prefix: bool,
115}
116
117impl AvroDeserializer {
118 pub const fn new(schema: apache_avro::Schema, strip_schema_id_prefix: bool) -> Self {
120 Self {
121 schema,
122 strip_schema_id_prefix,
123 }
124 }
125}
126
127impl Deserializer for AvroDeserializer {
128 fn parse(
129 &self,
130 bytes: Bytes,
131 log_namespace: LogNamespace,
132 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
133 if bytes.is_empty() {
135 return Ok(smallvec![]);
136 }
137
138 let bytes = if self.strip_schema_id_prefix {
139 if bytes.len() >= CONFLUENT_SCHEMA_PREFIX_LEN && bytes[0] == CONFLUENT_MAGIC_BYTE {
140 bytes.slice(CONFLUENT_SCHEMA_PREFIX_LEN..)
141 } else {
142 return Err(vector_common::Error::from(
143 "Expected avro datum to be prefixed with schema id",
144 ));
145 }
146 } else {
147 bytes
148 };
149
150 let value = apache_avro::from_avro_datum(&self.schema, &mut bytes.reader(), None)?;
151
152 let apache_avro::types::Value::Record(fields) = value else {
153 return Err(vector_common::Error::from("Expected an avro Record"));
154 };
155
156 let mut log = LogEvent::default();
157 for (k, v) in fields {
158 log.insert(event_path!(k.as_str()), try_from(v)?);
159 }
160
161 let mut event = Event::Log(log);
162 let event = match log_namespace {
163 LogNamespace::Vector => event,
164 LogNamespace::Legacy => {
165 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
166 let log = event.as_mut_log();
167 if !log.contains(timestamp_key) {
168 let timestamp = Utc::now();
169 log.insert(timestamp_key, timestamp);
170 }
171 }
172 event
173 }
174 };
175 Ok(smallvec![event])
176 }
177}
178
179pub fn try_from(value: AvroValue) -> vector_common::Result<VrlValue> {
181 match value {
184 AvroValue::Array(array) => {
185 let mut vector = Vec::new();
186 for item in array {
187 vector.push(try_from(item)?);
188 }
189 Ok(VrlValue::Array(vector))
190 }
191 AvroValue::Boolean(boolean) => Ok(VrlValue::from(boolean)),
192 AvroValue::Bytes(bytes) => Ok(VrlValue::from(bytes)),
193 AvroValue::Date(_) => Err(vector_common::Error::from(
194 "AvroValue::Date is not supported",
195 )),
196 AvroValue::Decimal(_) => Err(vector_common::Error::from(
197 "AvroValue::Decimal is not supported",
198 )),
199 AvroValue::Double(double) => Ok(VrlValue::from_f64_or_zero(double)),
200 AvroValue::Duration(_) => Err(vector_common::Error::from(
201 "AvroValue::Duration is not supported",
202 )),
203 AvroValue::Enum(_, string) => Ok(VrlValue::from(string)),
204 AvroValue::Fixed(_, _) => Err(vector_common::Error::from(
205 "AvroValue::Fixed is not supported",
206 )),
207 AvroValue::Float(float) => Ok(VrlValue::from_f64_or_zero(float as f64)),
208 AvroValue::Int(int) => Ok(VrlValue::from(int)),
209 AvroValue::Long(long) => Ok(VrlValue::from(long)),
210 AvroValue::Map(items) => items
211 .into_iter()
212 .map(|(key, value)| try_from(value).map(|v| (KeyString::from(key), v)))
213 .collect::<Result<Vec<_>, _>>()
214 .map(|v| VrlValue::Object(v.into_iter().collect())),
215 AvroValue::Null => Ok(VrlValue::Null),
216 AvroValue::Record(items) => items
217 .into_iter()
218 .map(|(key, value)| try_from(value).map(|v| (KeyString::from(key), v)))
219 .collect::<Result<Vec<_>, _>>()
220 .map(|v| VrlValue::Object(v.into_iter().collect())),
221 AvroValue::String(string) => Ok(VrlValue::from(string)),
222 AvroValue::TimeMicros(time_micros) => Ok(VrlValue::from(time_micros)),
223 AvroValue::TimeMillis(_) => Err(vector_common::Error::from(
224 "AvroValue::TimeMillis is not supported",
225 )),
226 AvroValue::TimestampMicros(ts_micros) => Ok(VrlValue::from(ts_micros)),
227 AvroValue::TimestampMillis(ts_millis) => Ok(VrlValue::from(ts_millis)),
228 AvroValue::Union(_, v) => try_from(*v),
229 AvroValue::Uuid(uuid) => Ok(VrlValue::from(uuid.as_hyphenated().to_string())),
230 AvroValue::LocalTimestampMillis(ts_millis) => Ok(VrlValue::from(ts_millis)),
231 AvroValue::LocalTimestampMicros(ts_micros) => Ok(VrlValue::from(ts_micros)),
232 AvroValue::BigDecimal(_) => Err(vector_common::Error::from(
233 "AvroValue::BigDecimal is not supported",
234 )),
235 AvroValue::TimestampNanos(_) => Err(vector_common::Error::from(
236 "AvroValue::TimestampNanos is not supported",
237 )),
238 AvroValue::LocalTimestampNanos(_) => Err(vector_common::Error::from(
239 "AvroValue::LocalTimestampNanos is not supported",
240 )),
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use apache_avro::Schema;
247 use bytes::BytesMut;
248 use uuid::Uuid;
249
250 use super::*;
251
252 #[derive(Debug, Clone, Serialize, Deserialize)]
253 struct Log {
254 message: String,
255 }
256
257 fn get_schema() -> Schema {
258 let schema = String::from(
259 r#"{
260 "type": "record",
261 "name": "log",
262 "fields": [
263 {
264 "name": "message",
265 "type": "string"
266 }
267 ]
268 }
269 "#,
270 );
271
272 Schema::parse_str(&schema).unwrap()
273 }
274
275 #[test]
276 fn deserialize_avro() {
277 let schema = get_schema();
278
279 let event = Log {
280 message: "hello from avro".to_owned(),
281 };
282 let record_value = apache_avro::to_value(event).unwrap();
283 let record_datum = apache_avro::to_avro_datum(&schema, record_value).unwrap();
284 let record_bytes = Bytes::from(record_datum);
285
286 let deserializer = AvroDeserializer::new(schema, false);
287 let events = deserializer
288 .parse(record_bytes, LogNamespace::Vector)
289 .unwrap();
290 assert_eq!(events.len(), 1);
291
292 assert_eq!(
293 events[0].as_log().get("message").unwrap(),
294 &VrlValue::from("hello from avro")
295 );
296 }
297
298 #[test]
299 fn deserialize_avro_strip_schema_id_prefix() {
300 let schema = get_schema();
301
302 let event = Log {
303 message: "hello from avro".to_owned(),
304 };
305 let record_value = apache_avro::to_value(event).unwrap();
306 let record_datum = apache_avro::to_avro_datum(&schema, record_value).unwrap();
307
308 let mut bytes = BytesMut::new();
309 bytes.extend([0, 0, 0, 0, 0]); bytes.extend(record_datum);
311
312 let deserializer = AvroDeserializer::new(schema, true);
313 let events = deserializer
314 .parse(bytes.freeze(), LogNamespace::Vector)
315 .unwrap();
316 assert_eq!(events.len(), 1);
317
318 assert_eq!(
319 events[0].as_log().get("message").unwrap(),
320 &VrlValue::from("hello from avro")
321 );
322 }
323
324 #[test]
325 fn deserialize_avro_uuid() {
326 let schema = get_schema();
327
328 let uuid = Uuid::new_v4().hyphenated().to_string();
329 let event = Log {
330 message: uuid.clone(),
331 };
332 let value = apache_avro::to_value(event).unwrap();
333 let datum = apache_avro::to_avro_datum(&schema, value).unwrap();
335
336 let mut bytes = BytesMut::new();
337 bytes.extend([0, 0, 0, 0, 0]); bytes.extend(datum);
339
340 let deserializer = AvroDeserializer::new(schema, true);
341 let events = deserializer
342 .parse(bytes.freeze(), LogNamespace::Vector)
343 .unwrap();
344 assert_eq!(events.len(), 1);
345 assert_eq!(
346 events[0].as_log().get("message").unwrap(),
347 &VrlValue::from(uuid)
348 );
349 }
350}