vector/sinks/azure_logs_ingestion/
config.rs1use std::sync::Arc;
2
3use azure_core::credentials::TokenCredential;
4
5use vector_lib::{configurable::configurable_component, schema};
6use vrl::value::Kind;
7
8use crate::{
9 http::{HttpClient, get_http_scheme_from_uri},
10 sinks::{
11 azure_common::config::AzureAuthentication,
12 prelude::*,
13 util::{
14 RealtimeSizeBasedDefaultBatchSettings, UriSerde,
15 http::{HttpStatusRetryLogic, RetryStrategy},
16 },
17 },
18};
19
20use super::{
21 service::{AzureLogsIngestionResponse, AzureLogsIngestionService},
22 sink::AzureLogsIngestionSink,
23};
24
25const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
27
28pub(super) fn default_scope() -> String {
29 "https://monitor.azure.com/.default".into()
30}
31
32pub(super) fn default_timestamp_field() -> String {
33 "TimeGenerated".into()
34}
35
36#[configurable_component(sink(
38 "azure_logs_ingestion",
39 "Publish log events to the Azure Monitor Logs Ingestion API."
40))]
41#[derive(Clone, Debug)]
42#[serde(deny_unknown_fields)]
43pub struct AzureLogsIngestionConfig {
44 #[configurable(metadata(
48 docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"
49 ))]
50 pub endpoint: String,
51
52 #[configurable(metadata(docs::examples = "dcr-000a00a000a00000a000000aa000a0aa"))]
56 pub dcr_immutable_id: String,
57
58 #[configurable(metadata(docs::examples = "Custom-MyTable"))]
62 pub stream_name: String,
63
64 #[configurable(derived)]
65 pub auth: AzureAuthentication,
66
67 #[configurable(metadata(docs::examples = "https://monitor.azure.us/.default"))]
71 #[configurable(metadata(docs::examples = "https://monitor.azure.cn/.default"))]
72 #[serde(default = "default_scope")]
73 pub(super) token_scope: String,
74
75 #[configurable(metadata(docs::examples = "EventStartTime"))]
82 #[configurable(metadata(docs::examples = "Timestamp"))]
83 #[serde(default = "default_timestamp_field")]
84 pub timestamp_field: String,
85
86 #[configurable(derived)]
87 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
88 pub encoding: Transformer,
89
90 #[configurable(derived)]
91 #[serde(default)]
92 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
93
94 #[configurable(derived)]
95 #[serde(default)]
96 pub request: TowerRequestConfig,
97
98 #[configurable(derived)]
99 pub tls: Option<TlsConfig>,
100
101 #[configurable(derived)]
102 #[serde(
103 default,
104 deserialize_with = "crate::serde::bool_or_struct",
105 skip_serializing_if = "crate::serde::is_default"
106 )]
107 pub acknowledgements: AcknowledgementsConfig,
108
109 #[configurable(derived)]
110 #[serde(default)]
111 pub retry_strategy: RetryStrategy,
112}
113
114impl Default for AzureLogsIngestionConfig {
115 fn default() -> Self {
116 Self {
117 endpoint: Default::default(),
118 dcr_immutable_id: Default::default(),
119 stream_name: Default::default(),
120 auth: Default::default(),
121 token_scope: default_scope(),
122 timestamp_field: default_timestamp_field(),
123 encoding: Default::default(),
124 batch: Default::default(),
125 request: Default::default(),
126 tls: None,
127 acknowledgements: Default::default(),
128 retry_strategy: Default::default(),
129 }
130 }
131}
132
133impl AzureLogsIngestionConfig {
134 #[allow(clippy::too_many_arguments)]
135 pub(super) async fn build_inner(
136 &self,
137 cx: SinkContext,
138 endpoint: UriSerde,
139 dcr_immutable_id: String,
140 stream_name: String,
141 credential: Arc<dyn TokenCredential>,
142 token_scope: String,
143 timestamp_field: String,
144 ) -> crate::Result<(VectorSink, Healthcheck)> {
145 let endpoint = endpoint.with_default_parts().uri;
146 let protocol = get_http_scheme_from_uri(&endpoint).to_string();
147
148 let batch_settings = self
149 .batch
150 .validate()?
151 .limit_max_bytes(MAX_BATCH_SIZE)?
152 .into_batcher_settings()?;
153
154 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
155 let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
156
157 let service = AzureLogsIngestionService::new(
158 client,
159 endpoint,
160 dcr_immutable_id,
161 stream_name,
162 credential,
163 token_scope,
164 )?;
165 let healthcheck = service.healthcheck();
166
167 let retry_logic = HttpStatusRetryLogic::new(
168 |res: &AzureLogsIngestionResponse| res.http_status,
169 self.retry_strategy.clone(),
170 );
171 let request_settings = self.request.into_settings();
172 let service = ServiceBuilder::new()
173 .settings(request_settings, retry_logic)
174 .service(service);
175
176 let sink = AzureLogsIngestionSink::new(
177 batch_settings,
178 self.encoding.clone(),
179 service,
180 timestamp_field,
181 protocol,
182 );
183
184 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
185 }
186}
187
188impl_generate_config_from_default!(AzureLogsIngestionConfig);
189
190#[async_trait::async_trait]
191#[typetag::serde(name = "azure_logs_ingestion")]
192impl SinkConfig for AzureLogsIngestionConfig {
193 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
194 let endpoint: UriSerde = self.endpoint.parse()?;
195
196 let credential: Arc<dyn TokenCredential> = self.auth.credential().await?;
197
198 self.build_inner(
199 cx,
200 endpoint,
201 self.dcr_immutable_id.clone(),
202 self.stream_name.clone(),
203 credential,
204 self.token_scope.clone(),
205 self.timestamp_field.clone(),
206 )
207 .await
208 }
209
210 fn input(&self) -> Input {
211 let requirements =
212 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
213
214 Input::log().with_schema_requirement(requirements)
215 }
216
217 fn acknowledgements(&self) -> &AcknowledgementsConfig {
218 &self.acknowledgements
219 }
220}