1mod config;
5mod decoder;
6mod error;
7pub mod format;
8pub mod framing;
9
10use std::fmt::Debug;
11
12use bytes::{Bytes, BytesMut};
13pub use config::DecodingConfig;
14pub use decoder::Decoder;
15pub use error::StreamDecodingError;
16pub use format::{
17 BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
18 GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
19 InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
20 NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
21 NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
22 ProtobufDeserializerConfig, ProtobufDeserializerOptions,
23};
24#[cfg(feature = "opentelemetry")]
25pub use format::{OtlpDeserializer, OtlpDeserializerConfig, OtlpSignalType};
26#[cfg(feature = "syslog")]
27pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
28pub use framing::{
29 BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
30 CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
31 ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
32 LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
33 NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
34 OctetCountingDecoderOptions, VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
35};
36use smallvec::SmallVec;
37use vector_config::configurable_component;
38use vector_core::{
39 config::{DataType, LogNamespace},
40 event::{Event, EventMetadata},
41 schema,
42};
43
44use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
45use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
46
47#[derive(Debug)]
50pub enum Error {
51 FramingError(BoxedFramingError),
54 ParsingError(vector_common::Error),
56}
57
58impl std::fmt::Display for Error {
59 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 match self {
61 Self::FramingError(error) => write!(formatter, "FramingError({error})"),
62 Self::ParsingError(error) => write!(formatter, "ParsingError({error})"),
63 }
64 }
65}
66
67impl std::error::Error for Error {}
68
69impl From<std::io::Error> for Error {
70 fn from(error: std::io::Error) -> Self {
71 Self::FramingError(Box::new(error))
72 }
73}
74
75impl StreamDecodingError for Error {
76 fn can_continue(&self) -> bool {
77 match self {
78 Self::FramingError(error) => error.can_continue(),
79 Self::ParsingError(_) => true,
80 }
81 }
82}
83
84#[configurable_component]
90#[derive(Clone, Debug)]
91#[serde(tag = "method", rename_all = "snake_case")]
92#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
93pub enum FramingConfig {
94 Bytes,
96
97 CharacterDelimited(CharacterDelimitedDecoderConfig),
99
100 LengthDelimited(LengthDelimitedDecoderConfig),
102
103 NewlineDelimited(NewlineDelimitedDecoderConfig),
105
106 OctetCounting(OctetCountingDecoderConfig),
110
111 ChunkedGelf(ChunkedGelfDecoderConfig),
115
116 VarintLengthDelimited(VarintLengthDelimitedDecoderConfig),
119}
120
121impl From<BytesDecoderConfig> for FramingConfig {
122 fn from(_: BytesDecoderConfig) -> Self {
123 Self::Bytes
124 }
125}
126
127impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
128 fn from(config: CharacterDelimitedDecoderConfig) -> Self {
129 Self::CharacterDelimited(config)
130 }
131}
132
133impl From<LengthDelimitedDecoderConfig> for FramingConfig {
134 fn from(config: LengthDelimitedDecoderConfig) -> Self {
135 Self::LengthDelimited(config)
136 }
137}
138
139impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
140 fn from(config: NewlineDelimitedDecoderConfig) -> Self {
141 Self::NewlineDelimited(config)
142 }
143}
144
145impl From<OctetCountingDecoderConfig> for FramingConfig {
146 fn from(config: OctetCountingDecoderConfig) -> Self {
147 Self::OctetCounting(config)
148 }
149}
150
151impl From<ChunkedGelfDecoderConfig> for FramingConfig {
152 fn from(config: ChunkedGelfDecoderConfig) -> Self {
153 Self::ChunkedGelf(config)
154 }
155}
156
157impl From<VarintLengthDelimitedDecoderConfig> for FramingConfig {
158 fn from(config: VarintLengthDelimitedDecoderConfig) -> Self {
159 Self::VarintLengthDelimited(config)
160 }
161}
162
163impl FramingConfig {
164 pub fn build(&self) -> Framer {
166 match self {
167 FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
168 FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
169 FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
170 FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
171 FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
172 FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
173 FramingConfig::VarintLengthDelimited(config) => {
174 Framer::VarintLengthDelimited(config.build())
175 }
176 }
177 }
178}
179
180#[derive(Debug, Clone)]
182pub enum Framer {
183 Bytes(BytesDecoder),
185 CharacterDelimited(CharacterDelimitedDecoder),
187 LengthDelimited(LengthDelimitedDecoder),
189 NewlineDelimited(NewlineDelimitedDecoder),
191 OctetCounting(OctetCountingDecoder),
193 Boxed(BoxedFramer),
195 ChunkedGelf(ChunkedGelfDecoder),
197 VarintLengthDelimited(VarintLengthDelimitedDecoder),
199}
200
201impl tokio_util::codec::Decoder for Framer {
202 type Item = Bytes;
203 type Error = BoxedFramingError;
204
205 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
206 match self {
207 Framer::Bytes(framer) => framer.decode(src),
208 Framer::CharacterDelimited(framer) => framer.decode(src),
209 Framer::LengthDelimited(framer) => framer.decode(src),
210 Framer::NewlineDelimited(framer) => framer.decode(src),
211 Framer::OctetCounting(framer) => framer.decode(src),
212 Framer::Boxed(framer) => framer.decode(src),
213 Framer::ChunkedGelf(framer) => framer.decode(src),
214 Framer::VarintLengthDelimited(framer) => framer.decode(src),
215 }
216 }
217
218 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
219 match self {
220 Framer::Bytes(framer) => framer.decode_eof(src),
221 Framer::CharacterDelimited(framer) => framer.decode_eof(src),
222 Framer::LengthDelimited(framer) => framer.decode_eof(src),
223 Framer::NewlineDelimited(framer) => framer.decode_eof(src),
224 Framer::OctetCounting(framer) => framer.decode_eof(src),
225 Framer::Boxed(framer) => framer.decode_eof(src),
226 Framer::ChunkedGelf(framer) => framer.decode_eof(src),
227 Framer::VarintLengthDelimited(framer) => framer.decode_eof(src),
228 }
229 }
230}
231
232#[configurable_component]
235#[derive(Clone, Debug)]
236#[serde(tag = "codec", rename_all = "snake_case")]
237#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
238pub enum DeserializerConfig {
239 Bytes,
241
242 Json(JsonDeserializerConfig),
246
247 Protobuf(ProtobufDeserializerConfig),
251
252 #[cfg(feature = "opentelemetry")]
253 Otlp(OtlpDeserializerConfig),
260
261 #[cfg(feature = "syslog")]
262 Syslog(SyslogDeserializerConfig),
270
271 Native,
280
281 NativeJson(NativeJsonDeserializerConfig),
290
291 Gelf(GelfDeserializerConfig),
308
309 Influxdb(InfluxdbDeserializerConfig),
313
314 Avro {
318 avro: AvroDeserializerOptions,
320 },
321
322 Vrl(VrlDeserializerConfig),
326}
327
328impl From<BytesDeserializerConfig> for DeserializerConfig {
329 fn from(_: BytesDeserializerConfig) -> Self {
330 Self::Bytes
331 }
332}
333
334impl From<JsonDeserializerConfig> for DeserializerConfig {
335 fn from(config: JsonDeserializerConfig) -> Self {
336 Self::Json(config)
337 }
338}
339
340#[cfg(feature = "syslog")]
341impl From<SyslogDeserializerConfig> for DeserializerConfig {
342 fn from(config: SyslogDeserializerConfig) -> Self {
343 Self::Syslog(config)
344 }
345}
346
347impl From<GelfDeserializerConfig> for DeserializerConfig {
348 fn from(config: GelfDeserializerConfig) -> Self {
349 Self::Gelf(config)
350 }
351}
352
353impl From<NativeDeserializerConfig> for DeserializerConfig {
354 fn from(_: NativeDeserializerConfig) -> Self {
355 Self::Native
356 }
357}
358
359impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
360 fn from(config: NativeJsonDeserializerConfig) -> Self {
361 Self::NativeJson(config)
362 }
363}
364
365impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
366 fn from(config: InfluxdbDeserializerConfig) -> Self {
367 Self::Influxdb(config)
368 }
369}
370
371impl DeserializerConfig {
372 pub fn build(&self) -> vector_common::Result<Deserializer> {
374 match self {
375 DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
376 AvroDeserializerConfig {
377 avro_options: avro.clone(),
378 }
379 .build()?,
380 )),
381 DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
382 DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
383 DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
384 #[cfg(feature = "opentelemetry")]
385 DeserializerConfig::Otlp(config) => Ok(Deserializer::Otlp(config.build())),
386 #[cfg(feature = "syslog")]
387 DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
388 DeserializerConfig::Native => {
389 Ok(Deserializer::Native(NativeDeserializerConfig.build()))
390 }
391 DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
392 DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
393 DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
394 DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
395 }
396 }
397
398 pub fn default_stream_framing(&self) -> FramingConfig {
400 match self {
401 DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
402 DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
403 DeserializerConfig::Bytes
404 | DeserializerConfig::Json(_)
405 | DeserializerConfig::Influxdb(_)
406 | DeserializerConfig::NativeJson(_) => {
407 FramingConfig::NewlineDelimited(Default::default())
408 }
409 DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
410 #[cfg(feature = "opentelemetry")]
411 DeserializerConfig::Otlp(_) => FramingConfig::Bytes,
412 #[cfg(feature = "syslog")]
413 DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
414 DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
415 DeserializerConfig::Gelf(_) => {
416 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
417 }
418 }
419 }
420
421 pub fn default_message_based_framing(&self) -> FramingConfig {
423 match self {
424 DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
425 _ => FramingConfig::Bytes,
426 }
427 }
428
429 pub fn is_vrl(&self) -> bool {
432 matches!(self, DeserializerConfig::Vrl(_))
433 }
434
435 pub fn output_type(&self) -> DataType {
437 match self {
438 DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
439 avro_options: avro.clone(),
440 }
441 .output_type(),
442 DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
443 DeserializerConfig::Json(config) => config.output_type(),
444 DeserializerConfig::Protobuf(config) => config.output_type(),
445 #[cfg(feature = "opentelemetry")]
446 DeserializerConfig::Otlp(config) => config.output_type(),
447 #[cfg(feature = "syslog")]
448 DeserializerConfig::Syslog(config) => config.output_type(),
449 DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
450 DeserializerConfig::NativeJson(config) => config.output_type(),
451 DeserializerConfig::Gelf(config) => config.output_type(),
452 DeserializerConfig::Vrl(config) => config.output_type(),
453 DeserializerConfig::Influxdb(config) => config.output_type(),
454 }
455 }
456
457 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
459 match self {
460 DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
461 avro_options: avro.clone(),
462 }
463 .schema_definition(log_namespace),
464 DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
465 DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
466 DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
467 #[cfg(feature = "opentelemetry")]
468 DeserializerConfig::Otlp(config) => config.schema_definition(log_namespace),
469 #[cfg(feature = "syslog")]
470 DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
471 DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
472 DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
473 DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
474 DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
475 DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
476 }
477 }
478
479 pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
481 match (&self, framer) {
482 (
483 DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
484 FramingConfig::NewlineDelimited(_),
485 ) => "application/x-ndjson",
486 (
487 DeserializerConfig::Gelf(_)
488 | DeserializerConfig::Json(_)
489 | DeserializerConfig::NativeJson(_),
490 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
491 character_delimited:
492 CharacterDelimitedDecoderOptions {
493 delimiter: b',',
494 max_length: Some(usize::MAX),
495 },
496 }),
497 ) => "application/json",
498 (DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
499 "application/octet-stream"
500 }
501 (DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
502 #[cfg(feature = "opentelemetry")]
503 (DeserializerConfig::Otlp(_), _) => "application/x-protobuf",
504 (
505 DeserializerConfig::Json(_)
506 | DeserializerConfig::NativeJson(_)
507 | DeserializerConfig::Bytes
508 | DeserializerConfig::Gelf(_)
509 | DeserializerConfig::Influxdb(_)
510 | DeserializerConfig::Vrl(_),
511 _,
512 ) => "text/plain",
513 #[cfg(feature = "syslog")]
514 (DeserializerConfig::Syslog(_), _) => "text/plain",
515 }
516 }
517}
518
519#[allow(clippy::large_enum_variant)]
521#[derive(Clone)]
522pub enum Deserializer {
523 Avro(AvroDeserializer),
525 Bytes(BytesDeserializer),
527 Json(JsonDeserializer),
529 Protobuf(ProtobufDeserializer),
531 #[cfg(feature = "opentelemetry")]
532 Otlp(OtlpDeserializer),
534 #[cfg(feature = "syslog")]
535 Syslog(SyslogDeserializer),
537 Native(NativeDeserializer),
539 NativeJson(NativeJsonDeserializer),
541 Boxed(BoxedDeserializer),
543 Gelf(GelfDeserializer),
545 Influxdb(InfluxdbDeserializer),
547 Vrl(VrlDeserializer),
549}
550
551impl Deserializer {
552 pub fn with_metadata_template(self, metadata: EventMetadata) -> Self {
555 match self {
556 Deserializer::Vrl(d) => Deserializer::Vrl(d.with_metadata_template(metadata)),
557 other => other,
558 }
559 }
560}
561
562impl format::Deserializer for Deserializer {
563 fn parse(
564 &self,
565 bytes: Bytes,
566 log_namespace: LogNamespace,
567 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
568 match self {
569 Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
570 Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
571 Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
572 Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
573 #[cfg(feature = "opentelemetry")]
574 Deserializer::Otlp(deserializer) => deserializer.parse(bytes, log_namespace),
575 #[cfg(feature = "syslog")]
576 Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
577 Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
578 Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
579 Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
580 Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
581 Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
582 Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
583 }
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590
591 #[test]
592 fn gelf_stream_default_framing_is_null_delimited() {
593 let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
594 let framing_config = deserializer_config.default_stream_framing();
595 assert!(matches!(
596 framing_config,
597 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
598 character_delimited: CharacterDelimitedDecoderOptions {
599 delimiter: 0,
600 max_length: None,
601 }
602 })
603 ));
604 }
605}