vector/sinks/datadog/events/
config.rs

1use indoc::indoc;
2use tower::ServiceBuilder;
3use vector_lib::{config::proxy::ProxyConfig, configurable::configurable_component, schema};
4use vrl::value::Kind;
5
6use super::{
7    service::{DatadogEventsResponse, DatadogEventsService},
8    sink::DatadogEventsSink,
9};
10use crate::{
11    common::datadog,
12    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
13    http::HttpClient,
14    sinks::{
15        Healthcheck, VectorSink,
16        datadog::{DatadogCommonConfig, LocalDatadogCommonConfig},
17        util::{
18            ServiceBuilderExt, TowerRequestConfig,
19            http::{HttpStatusRetryLogic, RetryStrategy},
20        },
21    },
22    tls::MaybeTlsSettings,
23};
24
25/// Configuration for the `datadog_events` sink.
26#[configurable_component(sink(
27    "datadog_events",
28    "Publish observability events to the Datadog Events API."
29))]
30#[derive(Clone, Debug, Default)]
31#[serde(deny_unknown_fields)]
32pub struct DatadogEventsConfig {
33    #[serde(flatten)]
34    pub dd_common: LocalDatadogCommonConfig,
35
36    #[configurable(derived)]
37    #[serde(default)]
38    pub request: TowerRequestConfig,
39
40    #[configurable(derived)]
41    #[serde(default)]
42    pub retry_strategy: RetryStrategy,
43}
44
45impl GenerateConfig for DatadogEventsConfig {
46    fn generate_config() -> toml::Value {
47        toml::from_str(indoc! {r#"
48            default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
49        "#})
50        .unwrap()
51    }
52}
53
54impl DatadogEventsConfig {
55    fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
56        let tls = MaybeTlsSettings::from_config(self.dd_common.tls.as_ref(), false)?;
57        let client = HttpClient::new(tls, proxy)?;
58        Ok(client)
59    }
60
61    fn build_sink(
62        &self,
63        dd_common: &DatadogCommonConfig,
64        client: HttpClient,
65    ) -> crate::Result<VectorSink> {
66        let service = DatadogEventsService::new(
67            dd_common.get_api_endpoint("/api/v1/events")?,
68            dd_common.default_api_key.clone(),
69            client,
70        );
71
72        let request_opts = self.request;
73        let request_settings = request_opts.into_settings();
74        let retry_logic = HttpStatusRetryLogic::new(
75            |req: &DatadogEventsResponse| req.http_status,
76            self.retry_strategy.clone(),
77        );
78
79        let service = ServiceBuilder::new()
80            .settings(request_settings, retry_logic)
81            .service(service);
82
83        let sink = DatadogEventsSink { service };
84
85        Ok(VectorSink::from_event_streamsink(sink))
86    }
87}
88
89#[async_trait::async_trait]
90#[typetag::serde(name = "datadog_events")]
91impl SinkConfig for DatadogEventsConfig {
92    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
93        let client = self.build_client(cx.proxy())?;
94        let global = cx.extra_context.get_or_default::<datadog::Options>();
95        let dd_common = self.dd_common.with_globals(global)?;
96        let healthcheck = dd_common.build_healthcheck(client.clone())?;
97        let sink = self.build_sink(&dd_common, client)?;
98
99        Ok((sink, healthcheck))
100    }
101
102    fn input(&self) -> Input {
103        let requirement = schema::Requirement::empty()
104            .required_meaning("message", Kind::bytes())
105            .optional_meaning("host", Kind::bytes())
106            .optional_meaning("timestamp", Kind::timestamp());
107
108        Input::log().with_schema_requirement(requirement)
109    }
110
111    fn acknowledgements(&self) -> &AcknowledgementsConfig {
112        &self.dd_common.acknowledgements
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use crate::sinks::datadog::events::config::DatadogEventsConfig;
119
120    #[test]
121    fn generate_config() {
122        crate::test_util::test_generate_config::<DatadogEventsConfig>();
123    }
124}