1use vector_lib::{
2 codecs::{
3 MetricTagValues,
4 encoding::{FramingConfig, JsonSerializerConfig, JsonSerializerOptions, SerializerConfig},
5 },
6 configurable::configurable_component,
7 sensitive_string::SensitiveString,
8};
9
10use crate::{
11 codecs::{EncodingConfigWithFraming, Transformer},
12 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
13 http::Auth as HttpAuthConfig,
14 sinks::{
15 Healthcheck, VectorSink,
16 http::config::{HttpMethod, HttpSinkConfig},
17 util::{
18 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
19 http::{RequestConfig, RetryStrategy},
20 },
21 },
22 tls::TlsConfig,
23};
24
25static CLOUD_URL: &str = "https://api.axiom.co";
26
27#[configurable_component]
29#[derive(Clone, Debug, Default)]
30#[serde(default)]
31pub struct UrlOrRegion {
32 #[configurable(validation(format = "uri"))]
38 #[configurable(metadata(docs::examples = "https://api.eu.axiom.co"))]
39 #[configurable(metadata(docs::examples = "http://localhost:3400/ingest"))]
40 #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
41 pub url: Option<String>,
42
43 #[configurable(metadata(docs::examples = "${AXIOM_REGION}"))]
49 #[configurable(metadata(docs::examples = "mumbai.axiom.co"))]
50 #[configurable(metadata(docs::examples = "eu-central-1.aws.edge.axiom.co"))]
51 pub region: Option<String>,
52}
53
54impl UrlOrRegion {
55 fn validate(&self) -> crate::Result<()> {
57 if self.url.is_some() && self.region.is_some() {
58 return Err("Cannot set both `url` and `region`. Please use only one.".into());
59 }
60 Ok(())
61 }
62
63 pub fn url(&self) -> Option<&str> {
65 self.url.as_deref()
66 }
67
68 pub fn region(&self) -> Option<&str> {
70 self.region.as_deref()
71 }
72}
73
74#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
76#[derive(Clone, Debug, Default)]
77pub struct AxiomConfig {
78 #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
82 #[configurable(metadata(docs::examples = "123abc"))]
83 pub org_id: Option<String>,
84
85 #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
87 #[configurable(metadata(docs::examples = "123abc"))]
88 pub token: SensitiveString,
89
90 #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
92 #[configurable(metadata(docs::examples = "vector_rocks"))]
93 pub dataset: String,
94
95 #[serde(flatten)]
97 #[configurable(derived)]
98 pub endpoint: UrlOrRegion,
99
100 #[configurable(derived)]
101 #[serde(default)]
102 pub request: RequestConfig,
103
104 #[configurable(derived)]
106 #[serde(default = "Compression::zstd_default")]
107 pub compression: Compression,
108
109 #[configurable(derived)]
113 pub tls: Option<TlsConfig>,
114
115 #[configurable(derived)]
117 #[serde(default)]
118 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
119
120 #[configurable(derived)]
122 #[serde(
123 default,
124 deserialize_with = "crate::serde::bool_or_struct",
125 skip_serializing_if = "crate::serde::is_default"
126 )]
127 pub acknowledgements: AcknowledgementsConfig,
128
129 #[configurable(derived)]
130 #[serde(default)]
131 pub retry_strategy: RetryStrategy,
132}
133
134impl GenerateConfig for AxiomConfig {
135 fn generate_config() -> toml::Value {
136 toml::from_str(
137 r#"token = "${AXIOM_TOKEN}"
138 dataset = "${AXIOM_DATASET}"
139 url = "${AXIOM_URL}"
140 org_id = "${AXIOM_ORG_ID}""#,
141 )
142 .unwrap()
143 }
144}
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "axiom")]
148impl SinkConfig for AxiomConfig {
149 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
150 self.endpoint.validate()?;
152
153 let mut request = self.request.clone();
154 if let Some(org_id) = &self.org_id {
155 request
157 .headers
158 .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
159 }
160
161 let http_sink_config = HttpSinkConfig {
168 uri: self.build_endpoint().try_into()?,
169 compression: self.compression,
170 auth: Some(HttpAuthConfig::Bearer {
171 token: self.token.clone(),
172 }),
173 method: HttpMethod::Post,
174 tls: self.tls.clone(),
175 request,
176 acknowledgements: self.acknowledgements,
177 batch: self.batch,
178 encoding: EncodingConfigWithFraming::new(
179 Some(FramingConfig::NewlineDelimited),
180 SerializerConfig::Json(JsonSerializerConfig {
181 metric_tag_values: MetricTagValues::Single,
182 options: JsonSerializerOptions { pretty: false }, }),
184 Transformer::default(),
185 ),
186 payload_prefix: "".into(), payload_suffix: "".into(), retry_strategy: self.retry_strategy.clone(),
189 };
190
191 http_sink_config.build(cx).await
192 }
193
194 fn input(&self) -> Input {
195 Input::new(DataType::Metric | DataType::Log | DataType::Trace)
196 }
197
198 fn acknowledgements(&self) -> &AcknowledgementsConfig {
199 &self.acknowledgements
200 }
201}
202
203impl AxiomConfig {
204 fn build_endpoint(&self) -> String {
205 if let Some(url) = self.endpoint.url() {
209 let url = url.trim_end_matches('/');
210
211 if let Ok(parsed) = url::Url::parse(url) {
215 let path = parsed.path();
216 if path.is_empty() || path == "/" {
217 return format!("{url}/v1/datasets/{}/ingest", self.dataset);
219 }
220 }
221
222 return url.to_string();
224 }
225
226 if let Some(region) = self.endpoint.region() {
228 let region = region.trim_end_matches('/');
229 return format!("https://{region}/v1/ingest/{}", self.dataset);
230 }
231
232 format!("{CLOUD_URL}/v1/datasets/{}/ingest", self.dataset)
234 }
235}
236
237#[cfg(test)]
238mod test {
239 #[test]
240 fn generate_config() {
241 crate::test_util::test_generate_config::<super::AxiomConfig>();
242 }
243
244 #[test]
245 fn test_region_domain_only() {
246 let config = super::AxiomConfig {
248 endpoint: super::UrlOrRegion {
249 region: Some("mumbai.axiomdomain.co".to_string()),
250 url: None,
251 },
252 dataset: "test-3".to_string(),
253 ..Default::default()
254 };
255 let endpoint = config.build_endpoint();
256 assert_eq!(endpoint, "https://mumbai.axiomdomain.co/v1/ingest/test-3");
257 }
258
259 #[test]
260 fn test_default_no_config() {
261 let config = super::AxiomConfig {
263 dataset: "foo".to_string(),
264 ..Default::default()
265 };
266 let endpoint = config.build_endpoint();
267 assert_eq!(endpoint, "https://api.axiom.co/v1/datasets/foo/ingest");
268 }
269
270 #[test]
271 fn test_url_with_custom_path() {
272 let config = super::AxiomConfig {
274 endpoint: super::UrlOrRegion {
275 url: Some("http://localhost:3400/ingest".to_string()),
276 region: None,
277 },
278 dataset: "meh".to_string(),
279 ..Default::default()
280 };
281 let endpoint = config.build_endpoint();
282 assert_eq!(endpoint, "http://localhost:3400/ingest");
283 }
284
285 #[test]
286 fn test_url_without_path_backwards_compat() {
287 let config = super::AxiomConfig {
289 endpoint: super::UrlOrRegion {
290 url: Some("https://api.eu.axiom.co".to_string()),
291 region: None,
292 },
293 dataset: "qoo".to_string(),
294 ..Default::default()
295 };
296 let endpoint = config.build_endpoint();
297 assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
298
299 let config = super::AxiomConfig {
301 endpoint: super::UrlOrRegion {
302 url: Some("https://api.eu.axiom.co/".to_string()),
303 region: None,
304 },
305 dataset: "qoo".to_string(),
306 ..Default::default()
307 };
308 let endpoint = config.build_endpoint();
309 assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
310 }
311
312 #[test]
313 fn test_both_url_and_region_fails_validation() {
314 let endpoint = super::UrlOrRegion {
316 url: Some("http://localhost:3400/ingest".to_string()),
317 region: Some("mumbai.axiomdomain.co".to_string()),
318 };
319
320 let result = endpoint.validate();
321 assert!(result.is_err());
322 assert_eq!(
323 result.unwrap_err().to_string(),
324 "Cannot set both `url` and `region`. Please use only one."
325 );
326 }
327
328 #[test]
329 fn test_url_or_region_deserialization_with_url() {
330 let config: super::AxiomConfig = toml::from_str(
332 r#"
333 token = "test-token"
334 dataset = "test-dataset"
335 url = "https://api.eu.axiom.co"
336 "#,
337 )
338 .unwrap();
339
340 assert_eq!(config.endpoint.url(), Some("https://api.eu.axiom.co"));
341 assert_eq!(config.endpoint.region(), None);
342 }
343
344 #[test]
345 fn test_url_or_region_deserialization_with_region() {
346 let config: super::AxiomConfig = toml::from_str(
348 r#"
349 token = "test-token"
350 dataset = "test-dataset"
351 region = "mumbai.axiom.co"
352 "#,
353 )
354 .unwrap();
355
356 assert_eq!(config.endpoint.url(), None);
357 assert_eq!(config.endpoint.region(), Some("mumbai.axiom.co"));
358 }
359
360 #[test]
361 fn test_production_regional_edges() {
362 let config = super::AxiomConfig {
364 endpoint: super::UrlOrRegion {
365 region: Some("eu-central-1.aws.edge.axiom.co".to_string()),
366 url: None,
367 },
368 dataset: "my-dataset".to_string(),
369 ..Default::default()
370 };
371 let endpoint = config.build_endpoint();
372 assert_eq!(
373 endpoint,
374 "https://eu-central-1.aws.edge.axiom.co/v1/ingest/my-dataset"
375 );
376 }
377
378 #[test]
379 fn test_staging_environment_edges() {
380 let config = super::AxiomConfig {
382 endpoint: super::UrlOrRegion {
383 region: Some("us-east-1.edge.staging.axiomdomain.co".to_string()),
384 url: None,
385 },
386 dataset: "test-dataset".to_string(),
387 ..Default::default()
388 };
389 let endpoint = config.build_endpoint();
390 assert_eq!(
391 endpoint,
392 "https://us-east-1.edge.staging.axiomdomain.co/v1/ingest/test-dataset"
393 );
394 }
395
396 #[test]
397 fn test_dev_environment_edges() {
398 let config = super::AxiomConfig {
400 endpoint: super::UrlOrRegion {
401 region: Some("eu-west-1.edge.dev.axiomdomain.co".to_string()),
402 url: None,
403 },
404 dataset: "dev-dataset".to_string(),
405 ..Default::default()
406 };
407 let endpoint = config.build_endpoint();
408 assert_eq!(
409 endpoint,
410 "https://eu-west-1.edge.dev.axiomdomain.co/v1/ingest/dev-dataset"
411 );
412 }
413}