vector/sinks/axiom/
config.rs

1use vector_lib::{
2    codecs::{
3        MetricTagValues,
4        encoding::{FramingConfig, JsonSerializerConfig, JsonSerializerOptions, SerializerConfig},
5    },
6    configurable::configurable_component,
7    sensitive_string::SensitiveString,
8};
9
10use crate::{
11    codecs::{EncodingConfigWithFraming, Transformer},
12    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
13    http::Auth as HttpAuthConfig,
14    sinks::{
15        Healthcheck, VectorSink,
16        http::config::{HttpMethod, HttpSinkConfig},
17        util::{
18            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
19            http::{RequestConfig, RetryStrategy},
20        },
21    },
22    tls::TlsConfig,
23};
24
25static CLOUD_URL: &str = "https://api.axiom.co";
26
27/// Configuration of the URL/region to use when interacting with Axiom.
28#[configurable_component]
29#[derive(Clone, Debug, Default)]
30#[serde(default)]
31pub struct UrlOrRegion {
32    /// URI of the Axiom endpoint to send data to.
33    ///
34    /// If a path is provided, the URL is used as-is.
35    /// If no path (or only `/`) is provided, `/v1/datasets/{dataset}/ingest` is appended for backwards compatibility.
36    /// This takes precedence over `region` if both are set (but both should not be set).
37    #[configurable(validation(format = "uri"))]
38    #[configurable(metadata(docs::examples = "https://api.eu.axiom.co"))]
39    #[configurable(metadata(docs::examples = "http://localhost:3400/ingest"))]
40    #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
41    pub url: Option<String>,
42
43    /// The Axiom regional edge domain to use for ingestion.
44    ///
45    /// Specify the domain name only (no scheme, no path).
46    /// When set, data is sent to `https://{region}/v1/ingest/{dataset}`.
47    /// Cannot be used together with `url`.
48    #[configurable(metadata(docs::examples = "${AXIOM_REGION}"))]
49    #[configurable(metadata(docs::examples = "mumbai.axiom.co"))]
50    #[configurable(metadata(docs::examples = "eu-central-1.aws.edge.axiom.co"))]
51    pub region: Option<String>,
52}
53
54impl UrlOrRegion {
55    /// Validates that url and region are not both set.
56    fn validate(&self) -> crate::Result<()> {
57        if self.url.is_some() && self.region.is_some() {
58            return Err("Cannot set both `url` and `region`. Please use only one.".into());
59        }
60        Ok(())
61    }
62
63    /// Returns the url if set.
64    pub fn url(&self) -> Option<&str> {
65        self.url.as_deref()
66    }
67
68    /// Returns the region if set.
69    pub fn region(&self) -> Option<&str> {
70        self.region.as_deref()
71    }
72}
73
74/// Configuration for the `axiom` sink.
75#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
76#[derive(Clone, Debug, Default)]
77pub struct AxiomConfig {
78    /// The Axiom organization ID.
79    ///
80    /// Only required when using personal tokens.
81    #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
82    #[configurable(metadata(docs::examples = "123abc"))]
83    pub org_id: Option<String>,
84
85    /// The Axiom API token.
86    #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
87    #[configurable(metadata(docs::examples = "123abc"))]
88    pub token: SensitiveString,
89
90    /// The Axiom dataset to write to.
91    #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
92    #[configurable(metadata(docs::examples = "vector_rocks"))]
93    pub dataset: String,
94
95    /// Configuration for the URL or regional edge endpoint.
96    #[serde(flatten)]
97    #[configurable(derived)]
98    pub endpoint: UrlOrRegion,
99
100    #[configurable(derived)]
101    #[serde(default)]
102    pub request: RequestConfig,
103
104    /// The compression algorithm to use.
105    #[configurable(derived)]
106    #[serde(default = "Compression::zstd_default")]
107    pub compression: Compression,
108
109    /// The TLS settings for the connection.
110    ///
111    /// Optional, constrains TLS settings for this sink.
112    #[configurable(derived)]
113    pub tls: Option<TlsConfig>,
114
115    /// The batch settings for the sink.
116    #[configurable(derived)]
117    #[serde(default)]
118    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
119
120    /// Controls how acknowledgements are handled for this sink.
121    #[configurable(derived)]
122    #[serde(
123        default,
124        deserialize_with = "crate::serde::bool_or_struct",
125        skip_serializing_if = "crate::serde::is_default"
126    )]
127    pub acknowledgements: AcknowledgementsConfig,
128
129    #[configurable(derived)]
130    #[serde(default)]
131    pub retry_strategy: RetryStrategy,
132}
133
134impl GenerateConfig for AxiomConfig {
135    fn generate_config() -> toml::Value {
136        toml::from_str(
137            r#"token = "${AXIOM_TOKEN}"
138            dataset = "${AXIOM_DATASET}"
139            url = "${AXIOM_URL}"
140            org_id = "${AXIOM_ORG_ID}""#,
141        )
142        .unwrap()
143    }
144}
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "axiom")]
148impl SinkConfig for AxiomConfig {
149    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
150        // Validate that url and region are not both set
151        self.endpoint.validate()?;
152
153        let mut request = self.request.clone();
154        if let Some(org_id) = &self.org_id {
155            // NOTE: Only add the org id header if an org id is provided
156            request
157                .headers
158                .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
159        }
160
161        // Axiom has a custom high-performance database that can be ingested
162        // into using the native HTTP ingest endpoint. This configuration wraps
163        // the vector HTTP sink with the necessary adjustments to send data
164        // to Axiom, whilst keeping the configuration simple and easy to use
165        // and maintenance of the vector axiom sink to a minimum.
166        //
167        let http_sink_config = HttpSinkConfig {
168            uri: self.build_endpoint().try_into()?,
169            compression: self.compression,
170            auth: Some(HttpAuthConfig::Bearer {
171                token: self.token.clone(),
172            }),
173            method: HttpMethod::Post,
174            tls: self.tls.clone(),
175            request,
176            acknowledgements: self.acknowledgements,
177            batch: self.batch,
178            encoding: EncodingConfigWithFraming::new(
179                Some(FramingConfig::NewlineDelimited),
180                SerializerConfig::Json(JsonSerializerConfig {
181                    metric_tag_values: MetricTagValues::Single,
182                    options: JsonSerializerOptions { pretty: false }, // Minified JSON
183                }),
184                Transformer::default(),
185            ),
186            payload_prefix: "".into(), // Always newline delimited JSON
187            payload_suffix: "".into(), // Always newline delimited JSON
188            retry_strategy: self.retry_strategy.clone(),
189        };
190
191        http_sink_config.build(cx).await
192    }
193
194    fn input(&self) -> Input {
195        Input::new(DataType::Metric | DataType::Log | DataType::Trace)
196    }
197
198    fn acknowledgements(&self) -> &AcknowledgementsConfig {
199        &self.acknowledgements
200    }
201}
202
203impl AxiomConfig {
204    fn build_endpoint(&self) -> String {
205        // Priority: url > region > default cloud endpoint
206
207        // If url is set, check if it has a path
208        if let Some(url) = self.endpoint.url() {
209            let url = url.trim_end_matches('/');
210
211            // Parse URL to check if path is provided
212            // If path is empty or just "/", append the legacy format for backwards compatibility
213            // Otherwise, use the URL as-is
214            if let Ok(parsed) = url::Url::parse(url) {
215                let path = parsed.path();
216                if path.is_empty() || path == "/" {
217                    // Backwards compatibility: append legacy path format
218                    return format!("{url}/v1/datasets/{}/ingest", self.dataset);
219                }
220            }
221
222            // URL has a custom path, use as-is
223            return url.to_string();
224        }
225
226        // If region is set, build the regional edge endpoint
227        if let Some(region) = self.endpoint.region() {
228            let region = region.trim_end_matches('/');
229            return format!("https://{region}/v1/ingest/{}", self.dataset);
230        }
231
232        // Default: use cloud endpoint with legacy path format
233        format!("{CLOUD_URL}/v1/datasets/{}/ingest", self.dataset)
234    }
235}
236
237#[cfg(test)]
238mod test {
239    #[test]
240    fn generate_config() {
241        crate::test_util::test_generate_config::<super::AxiomConfig>();
242    }
243
244    #[test]
245    fn test_region_domain_only() {
246        // region: mumbai.axiomdomain.co → https://mumbai.axiomdomain.co/v1/ingest/test-3
247        let config = super::AxiomConfig {
248            endpoint: super::UrlOrRegion {
249                region: Some("mumbai.axiomdomain.co".to_string()),
250                url: None,
251            },
252            dataset: "test-3".to_string(),
253            ..Default::default()
254        };
255        let endpoint = config.build_endpoint();
256        assert_eq!(endpoint, "https://mumbai.axiomdomain.co/v1/ingest/test-3");
257    }
258
259    #[test]
260    fn test_default_no_config() {
261        // No url, no region → https://api.axiom.co/v1/datasets/foo/ingest
262        let config = super::AxiomConfig {
263            dataset: "foo".to_string(),
264            ..Default::default()
265        };
266        let endpoint = config.build_endpoint();
267        assert_eq!(endpoint, "https://api.axiom.co/v1/datasets/foo/ingest");
268    }
269
270    #[test]
271    fn test_url_with_custom_path() {
272        // url: http://localhost:3400/ingest → http://localhost:3400/ingest (as-is)
273        let config = super::AxiomConfig {
274            endpoint: super::UrlOrRegion {
275                url: Some("http://localhost:3400/ingest".to_string()),
276                region: None,
277            },
278            dataset: "meh".to_string(),
279            ..Default::default()
280        };
281        let endpoint = config.build_endpoint();
282        assert_eq!(endpoint, "http://localhost:3400/ingest");
283    }
284
285    #[test]
286    fn test_url_without_path_backwards_compat() {
287        // url: https://api.eu.axiom.co/ → https://api.eu.axiom.co/v1/datasets/qoo/ingest
288        let config = super::AxiomConfig {
289            endpoint: super::UrlOrRegion {
290                url: Some("https://api.eu.axiom.co".to_string()),
291                region: None,
292            },
293            dataset: "qoo".to_string(),
294            ..Default::default()
295        };
296        let endpoint = config.build_endpoint();
297        assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
298
299        // Also test with trailing slash
300        let config = super::AxiomConfig {
301            endpoint: super::UrlOrRegion {
302                url: Some("https://api.eu.axiom.co/".to_string()),
303                region: None,
304            },
305            dataset: "qoo".to_string(),
306            ..Default::default()
307        };
308        let endpoint = config.build_endpoint();
309        assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
310    }
311
312    #[test]
313    fn test_both_url_and_region_fails_validation() {
314        // When both url and region are set, validation should fail
315        let endpoint = super::UrlOrRegion {
316            url: Some("http://localhost:3400/ingest".to_string()),
317            region: Some("mumbai.axiomdomain.co".to_string()),
318        };
319
320        let result = endpoint.validate();
321        assert!(result.is_err());
322        assert_eq!(
323            result.unwrap_err().to_string(),
324            "Cannot set both `url` and `region`. Please use only one."
325        );
326    }
327
328    #[test]
329    fn test_url_or_region_deserialization_with_url() {
330        // Test that url can be deserialized at the top level (flattened)
331        let config: super::AxiomConfig = toml::from_str(
332            r#"
333            token = "test-token"
334            dataset = "test-dataset"
335            url = "https://api.eu.axiom.co"
336            "#,
337        )
338        .unwrap();
339
340        assert_eq!(config.endpoint.url(), Some("https://api.eu.axiom.co"));
341        assert_eq!(config.endpoint.region(), None);
342    }
343
344    #[test]
345    fn test_url_or_region_deserialization_with_region() {
346        // Test that region can be deserialized at the top level (flattened)
347        let config: super::AxiomConfig = toml::from_str(
348            r#"
349            token = "test-token"
350            dataset = "test-dataset"
351            region = "mumbai.axiom.co"
352            "#,
353        )
354        .unwrap();
355
356        assert_eq!(config.endpoint.url(), None);
357        assert_eq!(config.endpoint.region(), Some("mumbai.axiom.co"));
358    }
359
360    #[test]
361    fn test_production_regional_edges() {
362        // Production AWS edge
363        let config = super::AxiomConfig {
364            endpoint: super::UrlOrRegion {
365                region: Some("eu-central-1.aws.edge.axiom.co".to_string()),
366                url: None,
367            },
368            dataset: "my-dataset".to_string(),
369            ..Default::default()
370        };
371        let endpoint = config.build_endpoint();
372        assert_eq!(
373            endpoint,
374            "https://eu-central-1.aws.edge.axiom.co/v1/ingest/my-dataset"
375        );
376    }
377
378    #[test]
379    fn test_staging_environment_edges() {
380        // Staging environment edge
381        let config = super::AxiomConfig {
382            endpoint: super::UrlOrRegion {
383                region: Some("us-east-1.edge.staging.axiomdomain.co".to_string()),
384                url: None,
385            },
386            dataset: "test-dataset".to_string(),
387            ..Default::default()
388        };
389        let endpoint = config.build_endpoint();
390        assert_eq!(
391            endpoint,
392            "https://us-east-1.edge.staging.axiomdomain.co/v1/ingest/test-dataset"
393        );
394    }
395
396    #[test]
397    fn test_dev_environment_edges() {
398        // Dev environment edge
399        let config = super::AxiomConfig {
400            endpoint: super::UrlOrRegion {
401                region: Some("eu-west-1.edge.dev.axiomdomain.co".to_string()),
402                url: None,
403            },
404            dataset: "dev-dataset".to_string(),
405            ..Default::default()
406        };
407        let endpoint = config.build_endpoint();
408        assert_eq!(
409            endpoint,
410            "https://eu-west-1.edge.dev.axiomdomain.co/v1/ingest/dev-dataset"
411        );
412    }
413}