1use std::fs::File;
2use std::io::Read;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6#[cfg(test)]
7use base64::prelude::*;
8
9use azure_core::error::Error as AzureCoreError;
10
11use crate::sinks::azure_common::connection_string::{Auth, ParsedConnectionString};
12use crate::sinks::azure_common::shared_key_policy::SharedKeyAuthorizationPolicy;
13use azure_core::http::{ClientMethodOptions, StatusCode, Url};
14
15use azure_core::credentials::{TokenCredential, TokenRequestOptions};
16use azure_core::{Error, error::ErrorKind};
17
18use azure_identity::{
19 AzureCliCredential, ClientAssertion, ClientAssertionCredential, ClientCertificateCredential,
20 ClientCertificateCredentialOptions, ClientSecretCredential, ManagedIdentityCredential,
21 ManagedIdentityCredentialOptions, UserAssignedId, WorkloadIdentityCredential,
22 WorkloadIdentityCredentialOptions,
23};
24
25use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions};
26
27use bytes::Bytes;
28use futures::FutureExt;
29use snafu::Snafu;
30use vector_lib::{
31 configurable::configurable_component,
32 json_size::JsonSize,
33 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
34 sensitive_string::SensitiveString,
35 stream::DriverResponse,
36};
37
38use crate::{
39 event::{EventFinalizers, EventStatus, Finalizable},
40 sinks::{Healthcheck, util::retries::RetryLogic},
41};
42
43#[configurable_component]
45#[configurable(metadata(docs::advanced))]
46#[derive(Clone, Debug, Default)]
47#[serde(deny_unknown_fields)]
48pub struct AzureBlobTlsConfig {
49 #[serde(alias = "ca_path")]
53 #[configurable(metadata(docs::examples = "/path/to/certificate_authority.crt"))]
54 #[configurable(metadata(docs::human_name = "CA File Path"))]
55 pub ca_file: Option<PathBuf>,
56}
57
58#[configurable_component]
60#[derive(Clone, Debug, Eq, PartialEq)]
61#[serde(deny_unknown_fields, untagged)]
62pub enum AzureAuthentication {
63 #[configurable(metadata(docs::enum_tag_description = "The kind of Azure credential to use."))]
64 Specific(SpecificAzureCredential),
65
66 #[cfg(test)]
68 #[serde(skip)]
69 MockCredential,
70}
71
72impl Default for AzureAuthentication {
73 fn default() -> Self {
77 Self::Specific(SpecificAzureCredential::ManagedIdentity {
78 user_assigned_managed_identity_id: None,
79 user_assigned_managed_identity_id_type: None,
80 })
81 }
82}
83
84#[configurable_component]
85#[derive(Clone, Debug, Eq, PartialEq)]
86#[serde(deny_unknown_fields, rename_all = "snake_case")]
87#[derive(Default)]
88pub enum UserAssignedManagedIdentityIdType {
90 #[default]
91 ClientId,
93 ObjectId,
95 ResourceId,
97}
98
99#[configurable_component]
101#[derive(Clone, Debug, Eq, PartialEq)]
102#[serde(
103 tag = "azure_credential_kind",
104 rename_all = "snake_case",
105 deny_unknown_fields
106)]
107pub enum SpecificAzureCredential {
108 #[cfg(not(target_arch = "wasm32"))]
110 AzureCli {},
111
112 ClientCertificateCredential {
114 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
118 #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID:?err}"))]
119 azure_tenant_id: String,
120
121 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
125 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID:?err}"))]
126 azure_client_id: String,
127
128 #[configurable(metadata(docs::examples = "path/to/certificate.pfx"))]
130 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_CERTIFICATE_PATH:?err}"))]
131 certificate_path: PathBuf,
132
133 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_CERTIFICATE_PASSWORD}"))]
135 certificate_password: Option<SensitiveString>,
136 },
137
138 ClientSecretCredential {
140 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
144 #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID:?err}"))]
145 azure_tenant_id: String,
146
147 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
151 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID:?err}"))]
152 azure_client_id: String,
153
154 #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))]
158 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_SECRET:?err}"))]
159 azure_client_secret: SensitiveString,
160 },
161
162 ManagedIdentity {
164 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
166 #[serde(default, skip_serializing_if = "Option::is_none")]
167 user_assigned_managed_identity_id: Option<String>,
168
169 user_assigned_managed_identity_id_type: Option<UserAssignedManagedIdentityIdType>,
172 },
173
174 ManagedIdentityClientAssertion {
176 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
178 #[configurable(metadata(
179 docs::examples = "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/rg-vector/providers/Microsoft.ManagedIdentity/userAssignedIdentities/id-vector-uami"
180 ))]
181 #[serde(default, skip_serializing_if = "Option::is_none")]
182 user_assigned_managed_identity_id: Option<String>,
183
184 user_assigned_managed_identity_id_type: Option<UserAssignedManagedIdentityIdType>,
186
187 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
189 client_assertion_tenant_id: String,
190
191 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
193 client_assertion_client_id: String,
194 },
195
196 WorkloadIdentity {
198 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
202 #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID}"))]
203 tenant_id: Option<String>,
204
205 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
209 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID}"))]
210 client_id: Option<String>,
211
212 #[configurable(metadata(
214 docs::examples = "/var/run/secrets/azure/tokens/azure-identity-token"
215 ))]
216 #[configurable(metadata(docs::examples = "${AZURE_FEDERATED_TOKEN_FILE}"))]
217 token_file_path: Option<PathBuf>,
218 },
219}
220
221#[derive(Debug)]
222struct ManagedIdentityClientAssertion {
223 credential: Arc<dyn TokenCredential>,
224 scope: String,
225}
226
227#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
228#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
229impl ClientAssertion for ManagedIdentityClientAssertion {
230 async fn secret(&self, options: Option<ClientMethodOptions<'_>>) -> azure_core::Result<String> {
231 Ok(self
232 .credential
233 .get_token(
234 &[&self.scope],
235 Some(TokenRequestOptions {
236 method_options: options.unwrap_or_default(),
237 }),
238 )
239 .await?
240 .token
241 .secret()
242 .to_string())
243 }
244}
245
246impl AzureAuthentication {
247 pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
249 match self {
250 Self::Specific(specific) => specific.credential().await,
251
252 #[cfg(test)]
253 Self::MockCredential => Ok(Arc::new(MockTokenCredential) as Arc<dyn TokenCredential>),
254 }
255 }
256}
257
258impl SpecificAzureCredential {
259 pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
261 let credential: Arc<dyn TokenCredential> = match self {
262 #[cfg(not(target_arch = "wasm32"))]
263 Self::AzureCli {} => AzureCliCredential::new(None)?,
264
265 Self::ClientCertificateCredential {
267 azure_tenant_id,
268 azure_client_id,
269 certificate_path,
270 certificate_password,
271 } => {
272 let certificate_bytes: Vec<u8> = std::fs::read(certificate_path).map_err(|e| {
273 Error::with_message(
274 ErrorKind::Credential,
275 format!(
276 "Failed to read certificate file {}: {e}",
277 certificate_path.display()
278 ),
279 )
280 })?;
281
282 let mut options: ClientCertificateCredentialOptions =
283 ClientCertificateCredentialOptions::default();
284 if let Some(password) = certificate_password {
285 options.password = Some(password.inner().to_string().into());
286 }
287
288 ClientCertificateCredential::new(
289 azure_tenant_id.clone(),
290 azure_client_id.clone(),
291 certificate_bytes.into(),
292 Some(options),
293 )?
294 }
295
296 Self::ClientSecretCredential {
297 azure_tenant_id,
298 azure_client_id,
299 azure_client_secret,
300 } => {
301 if azure_tenant_id.is_empty() {
302 return Err(Error::with_message(ErrorKind::Credential,
303 "`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
304 ));
305 }
306 if azure_client_id.is_empty() {
307 return Err(Error::with_message(ErrorKind::Credential,
308 "`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
309 ));
310 }
311 if azure_client_secret.inner().is_empty() {
312 return Err(Error::with_message(ErrorKind::Credential,
313 "`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
314 ));
315 }
316
317 let secret: String = azure_client_secret.inner().into();
318 ClientSecretCredential::new(
319 &azure_tenant_id.clone(),
320 azure_client_id.clone(),
321 secret.into(),
322 None,
323 )?
324 }
325
326 Self::ManagedIdentity {
327 user_assigned_managed_identity_id,
328 user_assigned_managed_identity_id_type,
329 } => {
330 let mut options = ManagedIdentityCredentialOptions::default();
331 if let Some(id) = user_assigned_managed_identity_id {
332 options.user_assigned_id = match user_assigned_managed_identity_id_type
333 .as_ref()
334 .unwrap_or(&Default::default())
335 {
336 UserAssignedManagedIdentityIdType::ClientId => {
337 Some(UserAssignedId::ClientId(id.clone()))
338 }
339 UserAssignedManagedIdentityIdType::ObjectId => {
340 Some(UserAssignedId::ObjectId(id.clone()))
341 }
342 UserAssignedManagedIdentityIdType::ResourceId => {
343 Some(UserAssignedId::ResourceId(id.clone()))
344 }
345 };
346 }
347 ManagedIdentityCredential::new(Some(options))?
348 }
349
350 Self::ManagedIdentityClientAssertion {
351 user_assigned_managed_identity_id,
352 user_assigned_managed_identity_id_type,
353 client_assertion_tenant_id,
354 client_assertion_client_id,
355 } => {
356 let mut options = ManagedIdentityCredentialOptions::default();
357 if let Some(id) = user_assigned_managed_identity_id {
358 options.user_assigned_id = match user_assigned_managed_identity_id_type
359 .as_ref()
360 .unwrap_or(&Default::default())
361 {
362 UserAssignedManagedIdentityIdType::ClientId => {
363 Some(UserAssignedId::ClientId(id.clone()))
364 }
365 UserAssignedManagedIdentityIdType::ObjectId => {
366 Some(UserAssignedId::ObjectId(id.clone()))
367 }
368 UserAssignedManagedIdentityIdType::ResourceId => {
369 Some(UserAssignedId::ResourceId(id.clone()))
370 }
371 };
372 }
373 let msi: Arc<dyn TokenCredential> = ManagedIdentityCredential::new(Some(options))?;
374 let assertion = ManagedIdentityClientAssertion {
375 credential: msi,
376 scope: "api://AzureADTokenExchange/.default".to_string(),
378 };
379
380 ClientAssertionCredential::new(
381 client_assertion_tenant_id.clone(),
382 client_assertion_client_id.clone(),
383 assertion,
384 None,
385 )?
386 }
387
388 Self::WorkloadIdentity {
389 tenant_id,
390 client_id,
391 token_file_path,
392 } => {
393 let options = WorkloadIdentityCredentialOptions {
394 tenant_id: tenant_id.clone(),
395 client_id: client_id.clone(),
396 token_file_path: token_file_path.clone(),
397 ..Default::default()
398 };
399
400 WorkloadIdentityCredential::new(Some(options))?
401 }
402 };
403 Ok(credential)
404 }
405}
406
407#[derive(Debug, Clone)]
408pub struct AzureBlobRequest {
409 pub blob_data: Bytes,
410 pub content_encoding: Option<&'static str>,
411 pub content_type: &'static str,
412 pub metadata: AzureBlobMetadata,
413 pub request_metadata: RequestMetadata,
414}
415
416impl Finalizable for AzureBlobRequest {
417 fn take_finalizers(&mut self) -> EventFinalizers {
418 std::mem::take(&mut self.metadata.finalizers)
419 }
420}
421
422impl MetaDescriptive for AzureBlobRequest {
423 fn get_metadata(&self) -> &RequestMetadata {
424 &self.request_metadata
425 }
426
427 fn metadata_mut(&mut self) -> &mut RequestMetadata {
428 &mut self.request_metadata
429 }
430}
431
432#[derive(Clone, Debug)]
433pub struct AzureBlobMetadata {
434 pub partition_key: String,
435 pub count: usize,
436 pub byte_size: JsonSize,
437 pub finalizers: EventFinalizers,
438}
439
440#[derive(Debug, Clone)]
441pub struct AzureBlobRetryLogic;
442
443impl RetryLogic for AzureBlobRetryLogic {
444 type Error = AzureCoreError;
445 type Request = AzureBlobRequest;
446 type Response = AzureBlobResponse;
447
448 fn is_retriable_error(&self, error: &Self::Error) -> bool {
449 match error.http_status() {
450 Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests,
451 None => false,
452 }
453 }
454}
455
456#[derive(Debug)]
457pub struct AzureBlobResponse {
458 pub events_byte_size: GroupedCountByteSize,
459 pub byte_size: usize,
460}
461
462impl DriverResponse for AzureBlobResponse {
463 fn event_status(&self) -> EventStatus {
464 EventStatus::Delivered
465 }
466
467 fn events_sent(&self) -> &GroupedCountByteSize {
468 &self.events_byte_size
469 }
470
471 fn bytes_sent(&self) -> Option<usize> {
472 Some(self.byte_size)
473 }
474}
475
476#[derive(Debug, Snafu)]
477pub enum HealthcheckError {
478 #[snafu(display("Invalid connection string specified"))]
479 InvalidCredentials,
480 #[snafu(display("Container: {:?} not found", container))]
481 UnknownContainer { container: String },
482 #[snafu(display("Unknown status code: {}", status))]
483 Unknown { status: StatusCode },
484}
485
486pub fn build_healthcheck(
487 container_name: String,
488 client: Arc<BlobContainerClient>,
489) -> crate::Result<Healthcheck> {
490 let healthcheck = async move {
491 let resp: crate::Result<()> = match client.get_properties(None).await {
492 Ok(_) => Ok(()),
493 Err(error) => {
494 let code = error.http_status();
495 Err(match code {
496 Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials),
497 Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer {
498 container: container_name,
499 }),
500 Some(status) => Box::new(HealthcheckError::Unknown { status }),
501 None => "unknown status code".into(),
502 })
503 }
504 };
505 resp
506 };
507
508 Ok(healthcheck.boxed())
509}
510
511pub async fn build_client(
512 auth: Option<AzureAuthentication>,
513 connection_string: String,
514 container_name: String,
515 proxy: &crate::config::ProxyConfig,
516 tls: Option<AzureBlobTlsConfig>,
517) -> crate::Result<Arc<BlobContainerClient>> {
518 let parsed = ParsedConnectionString::parse(&connection_string)
520 .map_err(|e| format!("Invalid connection string: {e}"))?;
521 let container_url = parsed
523 .container_url(&container_name)
524 .map_err(|e| format!("Failed to build container URL: {e}"))?;
525 let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?;
526
527 let mut credential: Option<Arc<dyn TokenCredential>> = None;
528
529 let mut options = BlobContainerClientOptions::default();
531 match (parsed.auth(), &auth) {
532 (Auth::None, None) => {
533 warn!("No authentication method provided, requests will be anonymous.");
534 }
535 (Auth::Sas { .. }, None) => {
536 info!("Using SAS token authentication.");
537 }
538 (
539 Auth::SharedKey {
540 account_name,
541 account_key,
542 },
543 None,
544 ) => {
545 info!("Using Shared Key authentication.");
546
547 let policy = SharedKeyAuthorizationPolicy::new(
548 account_name,
549 account_key,
550 String::from("2025-11-05"),
552 )
553 .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?;
554 options
555 .client_options
556 .per_call_policies
557 .push(Arc::new(policy));
558 }
559 (Auth::None, Some(AzureAuthentication::Specific(..))) => {
560 info!("Using Azure Authentication method.");
561 let credential_result: Arc<dyn TokenCredential> =
562 auth.unwrap().credential().await.map_err(|e| {
563 Error::with_message(
564 ErrorKind::Credential,
565 format!("Failed to configure Azure Authentication: {e}"),
566 )
567 })?;
568 credential = Some(credential_result);
569 }
570 (Auth::Sas { .. }, Some(AzureAuthentication::Specific(..))) => {
571 return Err(Box::new(Error::with_message(
572 ErrorKind::Credential,
573 "Cannot use both SAS token and another Azure Authentication method at the same time",
574 )));
575 }
576 (Auth::SharedKey { .. }, Some(AzureAuthentication::Specific(..))) => {
577 return Err(Box::new(Error::with_message(
578 ErrorKind::Credential,
579 "Cannot use both Shared Key and another Azure Authentication method at the same time",
580 )));
581 }
582 #[cfg(test)]
583 (Auth::None, Some(AzureAuthentication::MockCredential)) => {
584 warn!("Using mock token credential authentication.");
585 credential = Some(auth.unwrap().credential().await.unwrap());
586 }
587 #[cfg(test)]
588 (_, Some(AzureAuthentication::MockCredential)) => {
589 return Err(Box::new(Error::with_message(
590 ErrorKind::Credential,
591 "Cannot use both connection string auth and mock credential at the same time",
592 )));
593 }
594 }
595
596 let mut reqwest_builder = reqwest_13::ClientBuilder::new();
598 let bypass_proxy = {
599 let host = url.host_str().unwrap_or("");
600 let port = url.port();
601 proxy.no_proxy.matches(host)
602 || port
603 .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
604 .unwrap_or(false)
605 };
606 if bypass_proxy || !proxy.enabled {
607 reqwest_builder = reqwest_builder.no_proxy();
609 } else {
610 if let Some(http) = &proxy.http {
611 let p = reqwest_13::Proxy::http(http)
612 .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
613 reqwest_builder = reqwest_builder.proxy(p);
615 }
616 if let Some(https) = &proxy.https {
617 let p = reqwest_13::Proxy::https(https)
618 .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
619 reqwest_builder = reqwest_builder.proxy(p);
621 }
622 }
623
624 if let Some(AzureBlobTlsConfig { ca_file }) = &tls
625 && let Some(ca_file) = ca_file
626 {
627 let mut buf = Vec::new();
628 File::open(ca_file)?.read_to_end(&mut buf)?;
629 let cert = reqwest_13::Certificate::from_pem(&buf)?;
630
631 warn!("Adding TLS root certificate from {}", ca_file.display());
632 reqwest_builder = reqwest_builder.add_root_certificate(cert);
633 }
634
635 options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
636 reqwest_builder
637 .build()
638 .map_err(|e| format!("Failed to build reqwest client: {e}"))?,
639 )));
640 let client = BlobContainerClient::from_url(url, credential, Some(options))
641 .map_err(|e| format!("{e}"))?;
642 Ok(Arc::new(client))
643}
644
645#[cfg(test)]
646#[derive(Debug)]
647struct MockTokenCredential;
648
649#[cfg(test)]
650#[async_trait::async_trait]
651impl TokenCredential for MockTokenCredential {
652 async fn get_token(
653 &self,
654 scopes: &[&str],
655 _options: Option<azure_core::credentials::TokenRequestOptions<'_>>,
656 ) -> azure_core::Result<azure_core::credentials::AccessToken> {
657 let Some(scope) = scopes.first() else {
658 return Err(Error::with_message(
659 ErrorKind::Credential,
660 "no scopes were provided",
661 ));
662 };
663
664 let jwt = serde_json::json!({
667 "aud": scope.strip_suffix("/.default").unwrap_or(*scope),
668 "exp": 2147483647,
669 "iat": 0,
670 "iss": "https://sts.windows.net/",
671 "nbf": 0,
672 });
673
674 let jwt_base64 = format!(
677 "e30.{}.",
678 BASE64_STANDARD
679 .encode(serde_json::to_string(&jwt).unwrap())
680 .trim_end_matches("=")
681 )
682 .to_string();
683
684 warn!(
685 "Using mock token credential, JWT: {}, base64: {}",
686 serde_json::to_string(&jwt).unwrap(),
687 jwt_base64
688 );
689
690 Ok(azure_core::credentials::AccessToken::new(
691 jwt_base64,
692 azure_core::time::OffsetDateTime::now_utc() + std::time::Duration::from_secs(3600),
693 ))
694 }
695}
696
697#[cfg(test)]
698#[tokio::test]
699async fn azure_mock_token_credential_test() {
700 let credential = MockTokenCredential;
701 let access_token = credential
702 .get_token(&["https://example.com/.default"], None)
703 .await
704 .expect("valid credential should return a token");
705 assert_eq!(
706 access_token.token.secret(),
707 "e30.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiZXhwIjoyMTQ3NDgzNjQ3LCJpYXQiOjAsImlzcyI6Imh0dHBzOi8vc3RzLndpbmRvd3MubmV0LyIsIm5iZiI6MH0."
708 );
709}