1use bytes::BytesMut;
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7#[cfg(feature = "arrow")]
8use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig};
9#[cfg(feature = "opentelemetry")]
10use super::format::{OtlpSerializer, OtlpSerializerConfig};
11#[cfg(feature = "parquet")]
12use super::format::{ParquetSerializer, ParquetSerializerConfig};
13use super::format::{ProtoBatchSerializer, ProtoBatchSerializerConfig};
14#[cfg(feature = "syslog")]
15use super::format::{SyslogSerializer, SyslogSerializerConfig};
16use super::{
17 chunking::Chunker,
18 format::{
19 AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
20 CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer,
21 GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer,
22 LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
23 NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer,
24 RawMessageSerializerConfig, TextSerializer, TextSerializerConfig,
25 },
26 framing::{
27 CharacterDelimitedEncoderConfig, FramingConfig, LengthDelimitedEncoderConfig,
28 VarintLengthDelimitedEncoderConfig,
29 },
30};
31
32#[configurable_component]
34#[derive(Clone, Debug)]
35#[serde(tag = "codec", rename_all = "snake_case")]
36#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
37pub enum SerializerConfig {
38 Avro {
42 avro: AvroSerializerOptions,
44 },
45
46 Cef(
49 CefSerializerConfig,
51 ),
52
53 Csv(CsvSerializerConfig),
58
59 Gelf(GelfSerializerConfig),
76
77 Json(JsonSerializerConfig),
81
82 Logfmt,
86
87 Native,
94
95 NativeJson,
102
103 #[cfg(feature = "opentelemetry")]
111 Otlp,
112
113 Protobuf(ProtobufSerializerConfig),
117
118 RawMessage,
126
127 Text(TextSerializerConfig),
136
137 #[cfg(feature = "syslog")]
140 Syslog(SyslogSerializerConfig),
141}
142
143impl Default for SerializerConfig {
144 fn default() -> Self {
145 Self::Json(JsonSerializerConfig::default())
146 }
147}
148
149#[configurable_component]
151#[derive(Clone, Debug)]
152#[serde(tag = "codec", rename_all = "snake_case")]
153#[configurable(metadata(
154 docs::enum_tag_description = "The codec to use for batch encoding events."
155))]
156pub enum BatchSerializerConfig {
157 #[cfg(feature = "arrow")]
164 #[serde(rename = "arrow_stream")]
165 ArrowStream(ArrowStreamSerializerConfig),
166 #[cfg(feature = "parquet")]
170 #[serde(rename = "parquet")]
171 Parquet(ParquetSerializerConfig),
172
173 #[serde(rename = "proto_batch")]
180 ProtoBatch(ProtoBatchSerializerConfig),
181}
182
183impl BatchSerializerConfig {
184 pub fn build_batch_serializer(
186 &self,
187 ) -> Result<super::BatchSerializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
188 match self {
189 #[cfg(feature = "arrow")]
190 BatchSerializerConfig::ArrowStream(arrow_config) => {
191 let serializer = ArrowStreamSerializer::new(arrow_config.clone())?;
192 Ok(super::BatchSerializer::Arrow(serializer))
193 }
194 #[cfg(feature = "parquet")]
195 BatchSerializerConfig::Parquet(parquet_config) => {
196 let serializer = ParquetSerializer::new(parquet_config.clone())?;
197 Ok(super::BatchSerializer::Parquet(Box::new(serializer)))
198 }
199 BatchSerializerConfig::ProtoBatch(proto_config) => {
200 let serializer = ProtoBatchSerializer::new(proto_config.clone())?;
201 Ok(super::BatchSerializer::ProtoBatch(serializer))
202 }
203 }
204 }
205
206 pub fn input_type(&self) -> DataType {
208 match self {
209 #[cfg(feature = "arrow")]
210 BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(),
211 #[cfg(feature = "parquet")]
212 BatchSerializerConfig::Parquet(parquet_config) => parquet_config.input_type(),
213 BatchSerializerConfig::ProtoBatch(proto_config) => proto_config.input_type(),
214 }
215 }
216
217 pub fn schema_requirement(&self) -> schema::Requirement {
219 match self {
220 #[cfg(feature = "arrow")]
221 BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(),
222 #[cfg(feature = "parquet")]
223 BatchSerializerConfig::Parquet(parquet_config) => parquet_config.schema_requirement(),
224 BatchSerializerConfig::ProtoBatch(proto_config) => proto_config.schema_requirement(),
225 }
226 }
227}
228
229impl From<AvroSerializerConfig> for SerializerConfig {
230 fn from(config: AvroSerializerConfig) -> Self {
231 Self::Avro { avro: config.avro }
232 }
233}
234
235impl From<CefSerializerConfig> for SerializerConfig {
236 fn from(config: CefSerializerConfig) -> Self {
237 Self::Cef(config)
238 }
239}
240
241impl From<CsvSerializerConfig> for SerializerConfig {
242 fn from(config: CsvSerializerConfig) -> Self {
243 Self::Csv(config)
244 }
245}
246
247impl From<GelfSerializerConfig> for SerializerConfig {
248 fn from(config: GelfSerializerConfig) -> Self {
249 Self::Gelf(config)
250 }
251}
252
253impl From<JsonSerializerConfig> for SerializerConfig {
254 fn from(config: JsonSerializerConfig) -> Self {
255 Self::Json(config)
256 }
257}
258
259impl From<LogfmtSerializerConfig> for SerializerConfig {
260 fn from(_: LogfmtSerializerConfig) -> Self {
261 Self::Logfmt
262 }
263}
264
265impl From<NativeSerializerConfig> for SerializerConfig {
266 fn from(_: NativeSerializerConfig) -> Self {
267 Self::Native
268 }
269}
270
271impl From<NativeJsonSerializerConfig> for SerializerConfig {
272 fn from(_: NativeJsonSerializerConfig) -> Self {
273 Self::NativeJson
274 }
275}
276
277#[cfg(feature = "opentelemetry")]
278impl From<OtlpSerializerConfig> for SerializerConfig {
279 fn from(_: OtlpSerializerConfig) -> Self {
280 Self::Otlp
281 }
282}
283
284impl From<ProtobufSerializerConfig> for SerializerConfig {
285 fn from(config: ProtobufSerializerConfig) -> Self {
286 Self::Protobuf(config)
287 }
288}
289
290impl From<RawMessageSerializerConfig> for SerializerConfig {
291 fn from(_: RawMessageSerializerConfig) -> Self {
292 Self::RawMessage
293 }
294}
295
296impl From<TextSerializerConfig> for SerializerConfig {
297 fn from(config: TextSerializerConfig) -> Self {
298 Self::Text(config)
299 }
300}
301
302impl SerializerConfig {
303 pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
305 match self {
306 SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
307 AvroSerializerConfig::new(avro.schema.clone()).build()?,
308 )),
309 SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
310 SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
311 SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
312 SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
313 SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
314 SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
315 SerializerConfig::NativeJson => {
316 Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
317 }
318 #[cfg(feature = "opentelemetry")]
319 SerializerConfig::Otlp => {
320 Ok(Serializer::Otlp(OtlpSerializerConfig::default().build()?))
321 }
322 SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
323 SerializerConfig::RawMessage => {
324 Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
325 }
326 SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
327 #[cfg(feature = "syslog")]
328 SerializerConfig::Syslog(config) => Ok(Serializer::Syslog(config.build())),
329 }
330 }
331
332 pub fn default_stream_framing(&self) -> FramingConfig {
334 match self {
335 SerializerConfig::Avro { .. } | SerializerConfig::Native => {
347 FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
348 }
349 #[cfg(feature = "opentelemetry")]
350 SerializerConfig::Otlp => FramingConfig::Bytes,
351 SerializerConfig::Protobuf(_) => {
352 FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
353 }
354 SerializerConfig::Cef(_)
355 | SerializerConfig::Csv(_)
356 | SerializerConfig::Json(_)
357 | SerializerConfig::Logfmt
358 | SerializerConfig::NativeJson
359 | SerializerConfig::RawMessage
360 | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
361 #[cfg(feature = "syslog")]
362 SerializerConfig::Syslog(_) => FramingConfig::NewlineDelimited,
363 SerializerConfig::Gelf(_) => {
364 FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
365 }
366 }
367 }
368
369 pub fn input_type(&self) -> DataType {
371 match self {
372 SerializerConfig::Avro { avro } => {
373 AvroSerializerConfig::new(avro.schema.clone()).input_type()
374 }
375 SerializerConfig::Cef(config) => config.input_type(),
376 SerializerConfig::Csv(config) => config.input_type(),
377 SerializerConfig::Gelf(config) => config.input_type(),
378 SerializerConfig::Json(config) => config.input_type(),
379 SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
380 SerializerConfig::Native => NativeSerializerConfig.input_type(),
381 SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
382 #[cfg(feature = "opentelemetry")]
383 SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(),
384 SerializerConfig::Protobuf(config) => config.input_type(),
385 SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
386 SerializerConfig::Text(config) => config.input_type(),
387 #[cfg(feature = "syslog")]
388 SerializerConfig::Syslog(config) => config.input_type(),
389 }
390 }
391
392 pub fn schema_requirement(&self) -> schema::Requirement {
394 match self {
395 SerializerConfig::Avro { avro } => {
396 AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
397 }
398 SerializerConfig::Cef(config) => config.schema_requirement(),
399 SerializerConfig::Csv(config) => config.schema_requirement(),
400 SerializerConfig::Gelf(config) => config.schema_requirement(),
401 SerializerConfig::Json(config) => config.schema_requirement(),
402 SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
403 SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
404 SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
405 #[cfg(feature = "opentelemetry")]
406 SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(),
407 SerializerConfig::Protobuf(config) => config.schema_requirement(),
408 SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
409 SerializerConfig::Text(config) => config.schema_requirement(),
410 #[cfg(feature = "syslog")]
411 SerializerConfig::Syslog(config) => config.schema_requirement(),
412 }
413 }
414}
415
416#[derive(Debug, Clone)]
418pub enum Serializer {
419 Avro(AvroSerializer),
421 Cef(CefSerializer),
423 Csv(CsvSerializer),
425 Gelf(GelfSerializer),
427 Json(JsonSerializer),
429 Logfmt(LogfmtSerializer),
431 Native(NativeSerializer),
433 NativeJson(NativeJsonSerializer),
435 #[cfg(feature = "opentelemetry")]
437 Otlp(OtlpSerializer),
438 Protobuf(ProtobufSerializer),
440 RawMessage(RawMessageSerializer),
442 Text(TextSerializer),
444 #[cfg(feature = "syslog")]
446 Syslog(SyslogSerializer),
447}
448
449impl Serializer {
450 pub fn supports_json(&self) -> bool {
452 match self {
453 Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
454 Serializer::Avro(_)
455 | Serializer::Cef(_)
456 | Serializer::Csv(_)
457 | Serializer::Logfmt(_)
458 | Serializer::Text(_)
459 | Serializer::Native(_)
460 | Serializer::Protobuf(_)
461 | Serializer::RawMessage(_) => false,
462 #[cfg(feature = "syslog")]
463 Serializer::Syslog(_) => false,
464 #[cfg(feature = "opentelemetry")]
465 Serializer::Otlp(_) => false,
466 }
467 }
468
469 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
476 match self {
477 Serializer::Gelf(serializer) => serializer.to_json_value(event),
478 Serializer::Json(serializer) => serializer.to_json_value(event),
479 Serializer::NativeJson(serializer) => serializer.to_json_value(event),
480 Serializer::Avro(_)
481 | Serializer::Cef(_)
482 | Serializer::Csv(_)
483 | Serializer::Logfmt(_)
484 | Serializer::Text(_)
485 | Serializer::Native(_)
486 | Serializer::Protobuf(_)
487 | Serializer::RawMessage(_) => {
488 panic!("Serializer does not support JSON")
489 }
490 #[cfg(feature = "syslog")]
491 Serializer::Syslog(_) => {
492 panic!("Serializer does not support JSON")
493 }
494 #[cfg(feature = "opentelemetry")]
495 Serializer::Otlp(_) => {
496 panic!("Serializer does not support JSON")
497 }
498 }
499 }
500
501 pub fn chunker(&self) -> Option<Chunker> {
503 match self {
504 Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
505 _ => None,
506 }
507 }
508
509 pub const fn is_binary(&self) -> bool {
514 match self {
515 Serializer::RawMessage(_)
516 | Serializer::Avro(_)
517 | Serializer::Native(_)
518 | Serializer::Protobuf(_) => true,
519 #[cfg(feature = "opentelemetry")]
520 Serializer::Otlp(_) => true,
521 #[cfg(feature = "syslog")]
522 Serializer::Syslog(_) => false,
523 Serializer::Cef(_)
524 | Serializer::Csv(_)
525 | Serializer::Logfmt(_)
526 | Serializer::Gelf(_)
527 | Serializer::Json(_)
528 | Serializer::Text(_)
529 | Serializer::NativeJson(_) => false,
530 }
531 }
532}
533
534impl From<AvroSerializer> for Serializer {
535 fn from(serializer: AvroSerializer) -> Self {
536 Self::Avro(serializer)
537 }
538}
539
540impl From<CefSerializer> for Serializer {
541 fn from(serializer: CefSerializer) -> Self {
542 Self::Cef(serializer)
543 }
544}
545
546impl From<CsvSerializer> for Serializer {
547 fn from(serializer: CsvSerializer) -> Self {
548 Self::Csv(serializer)
549 }
550}
551
552impl From<GelfSerializer> for Serializer {
553 fn from(serializer: GelfSerializer) -> Self {
554 Self::Gelf(serializer)
555 }
556}
557
558impl From<JsonSerializer> for Serializer {
559 fn from(serializer: JsonSerializer) -> Self {
560 Self::Json(serializer)
561 }
562}
563
564impl From<LogfmtSerializer> for Serializer {
565 fn from(serializer: LogfmtSerializer) -> Self {
566 Self::Logfmt(serializer)
567 }
568}
569
570impl From<NativeSerializer> for Serializer {
571 fn from(serializer: NativeSerializer) -> Self {
572 Self::Native(serializer)
573 }
574}
575
576impl From<NativeJsonSerializer> for Serializer {
577 fn from(serializer: NativeJsonSerializer) -> Self {
578 Self::NativeJson(serializer)
579 }
580}
581
582#[cfg(feature = "opentelemetry")]
583impl From<OtlpSerializer> for Serializer {
584 fn from(serializer: OtlpSerializer) -> Self {
585 Self::Otlp(serializer)
586 }
587}
588
589impl From<ProtobufSerializer> for Serializer {
590 fn from(serializer: ProtobufSerializer) -> Self {
591 Self::Protobuf(serializer)
592 }
593}
594
595impl From<RawMessageSerializer> for Serializer {
596 fn from(serializer: RawMessageSerializer) -> Self {
597 Self::RawMessage(serializer)
598 }
599}
600
601impl From<TextSerializer> for Serializer {
602 fn from(serializer: TextSerializer) -> Self {
603 Self::Text(serializer)
604 }
605}
606#[cfg(feature = "syslog")]
607impl From<SyslogSerializer> for Serializer {
608 fn from(serializer: SyslogSerializer) -> Self {
609 Self::Syslog(serializer)
610 }
611}
612
613impl tokio_util::codec::Encoder<Event> for Serializer {
614 type Error = vector_common::Error;
615
616 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
617 match self {
618 Serializer::Avro(serializer) => serializer.encode(event, buffer),
619 Serializer::Cef(serializer) => serializer.encode(event, buffer),
620 Serializer::Csv(serializer) => serializer.encode(event, buffer),
621 Serializer::Gelf(serializer) => serializer.encode(event, buffer),
622 Serializer::Json(serializer) => serializer.encode(event, buffer),
623 Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
624 Serializer::Native(serializer) => serializer.encode(event, buffer),
625 Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
626 #[cfg(feature = "opentelemetry")]
627 Serializer::Otlp(serializer) => serializer.encode(event, buffer),
628 Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
629 Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
630 Serializer::Text(serializer) => serializer.encode(event, buffer),
631 #[cfg(feature = "syslog")]
632 Serializer::Syslog(serializer) => serializer.encode(event, buffer),
633 }
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640
641 #[test]
642 fn test_serializer_config_default() {
643 let config = SerializerConfig::default();
645 assert!(matches!(config, SerializerConfig::Json(_)));
646 }
647
648 #[test]
649 fn test_serializer_is_binary() {
650 let json_config = JsonSerializerConfig::default();
652 let json_serializer = Serializer::Json(json_config.build());
653 assert!(!json_serializer.is_binary());
654
655 let native_serializer = Serializer::Native(NativeSerializerConfig.build());
656 assert!(native_serializer.is_binary());
657
658 let raw_message_serializer = Serializer::RawMessage(RawMessageSerializerConfig.build());
659 assert!(raw_message_serializer.is_binary());
660 }
661
662 #[test]
663 fn test_serializer_supports_json() {
664 let json_config = JsonSerializerConfig::default();
666 let json_serializer = Serializer::Json(json_config.build());
667 assert!(json_serializer.supports_json());
668
669 let text_config = TextSerializerConfig::default();
670 let text_serializer = Serializer::Text(text_config.build());
671 assert!(!text_serializer.supports_json());
672 }
673
674 #[test]
675 fn test_serializer_config_build() {
676 let config = SerializerConfig::Json(JsonSerializerConfig::default());
678 let serializer = config.build();
679 assert!(serializer.is_ok());
680 assert!(matches!(serializer.unwrap(), Serializer::Json(_)));
681 }
682
683 #[test]
684 fn test_serializer_config_default_framing() {
685 let json_config = SerializerConfig::Json(JsonSerializerConfig::default());
687 assert!(matches!(
688 json_config.default_stream_framing(),
689 FramingConfig::NewlineDelimited
690 ));
691
692 let native_config = SerializerConfig::Native;
693 assert!(matches!(
694 native_config.default_stream_framing(),
695 FramingConfig::LengthDelimited(_)
696 ));
697 }
698}