codecs/encoding/format/
otlp.rs

1use crate::encoding::ProtobufSerializer;
2use bytes::BytesMut;
3use opentelemetry_proto::proto::{
4    DESCRIPTOR_BYTES, LOGS_REQUEST_MESSAGE_TYPE, METRICS_REQUEST_MESSAGE_TYPE,
5    RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
6    TRACES_REQUEST_MESSAGE_TYPE,
7};
8use tokio_util::codec::Encoder;
9use vector_config_macros::configurable_component;
10use vector_core::{config::DataType, event::Event, schema};
11use vrl::protobuf::encode::Options;
12
13/// Config used to build an `OtlpSerializer`.
14#[configurable_component]
15#[derive(Debug, Clone, Default)]
16pub struct OtlpSerializerConfig {
17    // No configuration options needed - OTLP serialization is opinionated
18}
19
20impl OtlpSerializerConfig {
21    /// Build the `OtlpSerializer` from this configuration.
22    pub fn build(&self) -> Result<OtlpSerializer, crate::encoding::BuildError> {
23        OtlpSerializer::new()
24    }
25
26    /// The data type of events that are accepted by `OtlpSerializer`.
27    pub fn input_type(&self) -> DataType {
28        DataType::Log | DataType::Trace
29    }
30
31    /// The schema required by the serializer.
32    pub fn schema_requirement(&self) -> schema::Requirement {
33        schema::Requirement::empty()
34    }
35}
36
37/// Serializer that converts an `Event` to bytes using the OTLP (OpenTelemetry Protocol) protobuf format.
38///
39/// This serializer encodes events using the OTLP protobuf specification, which is the recommended
40/// encoding format for OpenTelemetry data. The output is suitable for sending to OTLP-compatible
41/// endpoints with `content-type: application/x-protobuf`.
42///
43/// # Implementation approach
44///
45/// This serializer converts Vector's internal event representation to the appropriate OTLP message type
46/// based on the top-level field in the event:
47/// - `resourceLogs` → `ExportLogsServiceRequest`
48/// - `resourceMetrics` → `ExportMetricsServiceRequest`
49/// - `resourceSpans` → `ExportTraceServiceRequest`
50///
51/// The implementation is the inverse of what the `opentelemetry` source does when decoding,
52/// ensuring round-trip compatibility.
53#[derive(Debug, Clone)]
54#[allow(dead_code)] // Fields will be used once encoding is implemented
55pub struct OtlpSerializer {
56    logs_descriptor: ProtobufSerializer,
57    metrics_descriptor: ProtobufSerializer,
58    traces_descriptor: ProtobufSerializer,
59    options: Options,
60}
61
62impl OtlpSerializer {
63    /// Creates a new OTLP serializer with the appropriate message descriptors.
64    pub fn new() -> vector_common::Result<Self> {
65        let options = Options {
66            use_json_names: true,
67            allow_lossy_string_coercion: true,
68        };
69
70        let logs_descriptor = ProtobufSerializer::new_from_bytes(
71            DESCRIPTOR_BYTES,
72            LOGS_REQUEST_MESSAGE_TYPE,
73            &options,
74        )?;
75
76        let metrics_descriptor = ProtobufSerializer::new_from_bytes(
77            DESCRIPTOR_BYTES,
78            METRICS_REQUEST_MESSAGE_TYPE,
79            &options,
80        )?;
81
82        let traces_descriptor = ProtobufSerializer::new_from_bytes(
83            DESCRIPTOR_BYTES,
84            TRACES_REQUEST_MESSAGE_TYPE,
85            &options,
86        )?;
87
88        Ok(Self {
89            logs_descriptor,
90            metrics_descriptor,
91            traces_descriptor,
92            options,
93        })
94    }
95}
96
97impl Encoder<Event> for OtlpSerializer {
98    type Error = vector_common::Error;
99
100    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
101        // Determine which descriptor to use based on top-level OTLP fields
102        // This handles events that were decoded with use_otlp_decoding enabled
103        // The deserializer uses use_json_names: true, so fields are in camelCase
104        match &event {
105            Event::Log(log) => {
106                if log.contains(RESOURCE_LOGS_JSON_FIELD) {
107                    self.logs_descriptor.encode(event, buffer)
108                } else if log.contains(RESOURCE_METRICS_JSON_FIELD) {
109                    // Currently the OTLP metrics are Vector logs (not metrics).
110                    self.metrics_descriptor.encode(event, buffer)
111                } else {
112                    Err(format!(
113                        "Log event does not contain OTLP top-level fields ({RESOURCE_LOGS_JSON_FIELD} or {RESOURCE_METRICS_JSON_FIELD})",
114                    )
115                        .into())
116                }
117            }
118            Event::Trace(trace) => {
119                if trace.contains(RESOURCE_SPANS_JSON_FIELD) {
120                    self.traces_descriptor.encode(event, buffer)
121                } else {
122                    Err(format!(
123                        "Trace event does not contain OTLP top-level field ({RESOURCE_SPANS_JSON_FIELD})",
124                    )
125                        .into())
126                }
127            }
128            Event::Metric(_) => {
129                Err("OTLP serializer does not support native Vector metrics yet.".into())
130            }
131        }
132    }
133}