vector/sinks/opentelemetry/
mod.rs1use indoc::indoc;
2use vector_config::component::GenerateConfig;
3use vector_lib::{
4 codecs::{
5 JsonSerializerConfig,
6 encoding::{FramingConfig, SerializerConfig},
7 },
8 configurable::configurable_component,
9};
10
11use crate::{
12 codecs::{EncodingConfigWithFraming, Transformer},
13 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
14 sinks::{
15 Healthcheck, VectorSink,
16 http::config::{HttpMethod, HttpSinkConfig},
17 },
18};
19
20#[configurable_component(sink("opentelemetry", "Deliver OTLP data over HTTP."))]
22#[derive(Clone, Debug, Default)]
23pub struct OpenTelemetryConfig {
24 #[configurable(derived)]
26 protocol: Protocol,
27}
28
29#[configurable_component]
33#[derive(Clone, Debug)]
34#[serde(rename_all = "snake_case", tag = "type")]
35#[configurable(metadata(docs::enum_tag_description = "The communication protocol."))]
36pub enum Protocol {
37 Http(HttpSinkConfig),
39}
40
41impl Default for Protocol {
42 fn default() -> Self {
43 Protocol::Http(HttpSinkConfig {
44 encoding: EncodingConfigWithFraming::new(
45 Some(FramingConfig::NewlineDelimited),
46 SerializerConfig::Json(JsonSerializerConfig::default()),
47 Transformer::default(),
48 ),
49 uri: Default::default(),
50 method: HttpMethod::Post,
51 auth: Default::default(),
52 compression: Default::default(),
53 payload_prefix: Default::default(),
54 payload_suffix: Default::default(),
55 batch: Default::default(),
56 request: Default::default(),
57 tls: Default::default(),
58 acknowledgements: Default::default(),
59 retry_strategy: Default::default(),
60 })
61 }
62}
63
64impl GenerateConfig for OpenTelemetryConfig {
65 fn generate_config() -> toml::Value {
66 toml::from_str(indoc! {r#"
67 [protocol]
68 type = "http"
69 uri = "http://localhost:5318/v1/logs"
70 encoding.codec = "json"
71 "#})
72 .unwrap()
73 }
74}
75
76#[async_trait::async_trait]
77#[typetag::serde(name = "opentelemetry")]
78impl SinkConfig for OpenTelemetryConfig {
79 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
80 match &self.protocol {
81 Protocol::Http(config) => {
82 warn_on_invalid_otlp_batching(config);
83 config.build(cx).await
84 }
85 }
86 }
87
88 fn input(&self) -> Input {
89 match &self.protocol {
90 Protocol::Http(config) => config.input(),
91 }
92 }
93
94 fn acknowledgements(&self) -> &AcknowledgementsConfig {
95 match self.protocol {
96 Protocol::Http(ref config) => config.acknowledgements(),
97 }
98 }
99}
100
101fn warn_on_invalid_otlp_batching(config: &HttpSinkConfig) {
102 let (_, serializer) = config.encoding.config();
103 let is_json = matches!(serializer, SerializerConfig::Json(_));
104 let batches_more_than_one = !matches!(config.batch.max_events, Some(1));
105 if is_json && batches_more_than_one {
106 tracing::warn!(
107 message = "`opentelemetry` sink is configured with `encoding.codec = json` and \
108 `batch.max_events` greater than 1. This produces invalid OTLP request \
109 bodies that receivers reject with HTTP 400. Use `encoding.codec = otlp` \
110 (recommended) or set `batch.max_events = 1`. See \
111 https://github.com/vectordotdev/vector/issues/22054.",
112 );
113 }
114}
115
116#[cfg(test)]
117mod test {
118 #[test]
119 fn generate_config() {
120 crate::test_util::test_generate_config::<super::OpenTelemetryConfig>();
121 }
122}