vector/sinks/azure_monitor_logs/
config.rs1use openssl::{base64, pkey};
2use vector_lib::{
3 config::log_schema,
4 configurable::configurable_component,
5 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath},
6 schema,
7 sensitive_string::SensitiveString,
8};
9use vrl::value::Kind;
10
11use super::{
12 service::{AzureMonitorLogsResponse, AzureMonitorLogsService},
13 sink::AzureMonitorLogsSink,
14};
15use crate::{
16 http::{HttpClient, get_http_scheme_from_uri},
17 sinks::{
18 prelude::*,
19 util::{
20 RealtimeSizeBasedDefaultBatchSettings, UriSerde,
21 http::{HttpStatusRetryLogic, RetryStrategy},
22 },
23 },
24};
25
26const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
28
29pub(super) fn default_host() -> String {
30 "ods.opinsights.azure.com".into()
31}
32
33#[configurable_component(sink(
35 "azure_monitor_logs",
36 "Publish log events to the Azure Monitor Data Collector API."
37))]
38#[derive(Clone, Debug)]
39#[serde(deny_unknown_fields)]
40pub struct AzureMonitorLogsConfig {
41 #[configurable(metadata(docs::examples = "5ce893d9-2c32-4b6c-91a9-b0887c2de2d6"))]
45 #[configurable(metadata(docs::examples = "97ce69d9-b4be-4241-8dbd-d265edcf06c4"))]
46 pub customer_id: String,
47
48 #[configurable(metadata(
52 docs::examples = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA=="
53 ))]
54 #[configurable(metadata(docs::examples = "${AZURE_MONITOR_SHARED_KEY_ENV_VAR}"))]
55 pub shared_key: SensitiveString,
56
57 #[configurable(validation(pattern = "[a-zA-Z0-9_]{1,100}"))]
63 #[configurable(metadata(docs::examples = "MyTableName"))]
64 #[configurable(metadata(docs::examples = "MyRecordType"))]
65 pub log_type: String,
66
67 #[configurable(metadata(
71 docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/otherResourceGroup/providers/Microsoft.Storage/storageAccounts/examplestorage"
72 ))]
73 #[configurable(metadata(
74 docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/examplegroup/providers/Microsoft.SQL/servers/serverName/databases/databaseName"
75 ))]
76 pub azure_resource_id: Option<String>,
77
78 #[configurable(metadata(docs::examples = "ods.opinsights.azure.us"))]
82 #[configurable(metadata(docs::examples = "ods.opinsights.azure.cn"))]
83 #[serde(default = "default_host")]
84 pub(super) host: String,
85
86 #[configurable(derived)]
87 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
88 pub encoding: Transformer,
89
90 #[configurable(derived)]
91 #[serde(default)]
92 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
93
94 #[configurable(derived)]
95 #[serde(default)]
96 pub request: TowerRequestConfig,
97
98 #[configurable(metadata(docs::examples = "time_generated"))]
107 pub time_generated_key: Option<OptionalValuePath>,
108
109 #[configurable(derived)]
110 pub tls: Option<TlsConfig>,
111
112 #[configurable(derived)]
113 #[serde(
114 default,
115 deserialize_with = "crate::serde::bool_or_struct",
116 skip_serializing_if = "crate::serde::is_default"
117 )]
118 pub acknowledgements: AcknowledgementsConfig,
119
120 #[configurable(derived)]
121 #[serde(default)]
122 pub retry_strategy: RetryStrategy,
123}
124
125impl Default for AzureMonitorLogsConfig {
126 fn default() -> Self {
127 Self {
128 customer_id: "my-customer-id".to_string(),
129 shared_key: Default::default(),
130 log_type: "MyRecordType".to_string(),
131 azure_resource_id: None,
132 host: default_host(),
133 encoding: Default::default(),
134 batch: Default::default(),
135 request: Default::default(),
136 time_generated_key: None,
137 tls: None,
138 acknowledgements: Default::default(),
139 retry_strategy: Default::default(),
140 }
141 }
142}
143
144impl AzureMonitorLogsConfig {
145 pub(super) fn build_shared_key(&self) -> crate::Result<pkey::PKey<pkey::Private>> {
146 if self.shared_key.inner().is_empty() {
147 return Err("shared_key cannot be an empty string".into());
148 }
149 let shared_key_bytes = base64::decode_block(self.shared_key.inner())?;
150 let shared_key = pkey::PKey::hmac(&shared_key_bytes)?;
151 Ok(shared_key)
152 }
153
154 fn get_time_generated_key(&self) -> Option<OwnedValuePath> {
155 self.time_generated_key
156 .clone()
157 .and_then(|k| k.path)
158 .or_else(|| log_schema().timestamp_key().cloned())
159 }
160
161 pub(super) async fn build_inner(
162 &self,
163 cx: SinkContext,
164 endpoint: UriSerde,
165 ) -> crate::Result<(VectorSink, Healthcheck)> {
166 let endpoint = endpoint.with_default_parts().uri;
167 let protocol = get_http_scheme_from_uri(&endpoint).to_string();
168
169 let batch_settings = self
170 .batch
171 .validate()?
172 .limit_max_bytes(MAX_BATCH_SIZE)?
173 .into_batcher_settings()?;
174
175 let shared_key = self.build_shared_key()?;
176 let time_generated_key = self.get_time_generated_key();
177
178 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
179 let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
180
181 let service = AzureMonitorLogsService::new(
182 client,
183 endpoint,
184 self.customer_id.clone(),
185 self.azure_resource_id.as_deref(),
186 &self.log_type,
187 time_generated_key.clone(),
188 shared_key,
189 )?;
190 let healthcheck = service.healthcheck();
191
192 let retry_logic = HttpStatusRetryLogic::new(
193 |res: &AzureMonitorLogsResponse| res.http_status,
194 self.retry_strategy.clone(),
195 );
196 let request_settings = self.request.into_settings();
197 let service = ServiceBuilder::new()
198 .settings(request_settings, retry_logic)
199 .service(service);
200
201 let sink = AzureMonitorLogsSink::new(
202 batch_settings,
203 self.encoding.clone(),
204 service,
205 time_generated_key,
206 protocol,
207 );
208
209 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
210 }
211}
212
213impl_generate_config_from_default!(AzureMonitorLogsConfig);
214
215#[async_trait::async_trait]
216#[typetag::serde(name = "azure_monitor_logs")]
217impl SinkConfig for AzureMonitorLogsConfig {
218 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
219 let endpoint = format!("https://{}.{}", self.customer_id, self.host).parse()?;
220 self.build_inner(cx, endpoint).await
221 }
222
223 fn input(&self) -> Input {
224 let requirements =
225 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
226
227 Input::log().with_schema_requirement(requirements)
228 }
229
230 fn acknowledgements(&self) -> &AcknowledgementsConfig {
231 &self.acknowledgements
232 }
233}