vector/sinks/azure_common/
config.rs

1use std::fs::File;
2use std::io::Read;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6#[cfg(test)]
7use base64::prelude::*;
8
9use azure_core::error::Error as AzureCoreError;
10
11use crate::sinks::azure_common::connection_string::{Auth, ParsedConnectionString};
12use crate::sinks::azure_common::shared_key_policy::SharedKeyAuthorizationPolicy;
13use azure_core::http::{ClientMethodOptions, StatusCode, Url};
14
15use azure_core::credentials::{TokenCredential, TokenRequestOptions};
16use azure_core::{Error, error::ErrorKind};
17
18use azure_identity::{
19    AzureCliCredential, ClientAssertion, ClientAssertionCredential, ClientCertificateCredential,
20    ClientCertificateCredentialOptions, ClientSecretCredential, ManagedIdentityCredential,
21    ManagedIdentityCredentialOptions, UserAssignedId, WorkloadIdentityCredential,
22    WorkloadIdentityCredentialOptions,
23};
24
25use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions};
26
27use bytes::Bytes;
28use futures::FutureExt;
29use snafu::Snafu;
30use vector_lib::{
31    configurable::configurable_component,
32    json_size::JsonSize,
33    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
34    sensitive_string::SensitiveString,
35    stream::DriverResponse,
36};
37
38use crate::{
39    event::{EventFinalizers, EventStatus, Finalizable},
40    sinks::{Healthcheck, util::retries::RetryLogic},
41};
42
43/// TLS configuration.
44#[configurable_component]
45#[configurable(metadata(docs::advanced))]
46#[derive(Clone, Debug, Default)]
47#[serde(deny_unknown_fields)]
48pub struct AzureBlobTlsConfig {
49    /// Absolute path to an additional CA certificate file.
50    ///
51    /// The certificate must be in PEM (X.509) format.
52    #[serde(alias = "ca_path")]
53    #[configurable(metadata(docs::examples = "/path/to/certificate_authority.crt"))]
54    #[configurable(metadata(docs::human_name = "CA File Path"))]
55    pub ca_file: Option<PathBuf>,
56}
57
58/// Azure service principal authentication.
59#[configurable_component]
60#[derive(Clone, Debug, Eq, PartialEq)]
61#[serde(deny_unknown_fields, untagged)]
62pub enum AzureAuthentication {
63    #[configurable(metadata(docs::enum_tag_description = "The kind of Azure credential to use."))]
64    Specific(SpecificAzureCredential),
65
66    /// Mock credential for testing — returns a static fake token
67    #[cfg(test)]
68    #[serde(skip)]
69    MockCredential,
70}
71
72impl Default for AzureAuthentication {
73    // This should never be actually used.
74    // This is only needed when using Default::default() (such as unit tests),
75    // as serde requires `azure_credential_kind` to be specified.
76    fn default() -> Self {
77        Self::Specific(SpecificAzureCredential::ManagedIdentity {
78            user_assigned_managed_identity_id: None,
79            user_assigned_managed_identity_id_type: None,
80        })
81    }
82}
83
84#[configurable_component]
85#[derive(Clone, Debug, Eq, PartialEq)]
86#[serde(deny_unknown_fields, rename_all = "snake_case")]
87#[derive(Default)]
88/// User Assigned Managed Identity Types.
89pub enum UserAssignedManagedIdentityIdType {
90    #[default]
91    /// Client ID
92    ClientId,
93    /// Object ID
94    ObjectId,
95    /// Resource ID
96    ResourceId,
97}
98
99/// Specific Azure credential types.
100#[configurable_component]
101#[derive(Clone, Debug, Eq, PartialEq)]
102#[serde(
103    tag = "azure_credential_kind",
104    rename_all = "snake_case",
105    deny_unknown_fields
106)]
107pub enum SpecificAzureCredential {
108    /// Use Azure CLI credentials
109    #[cfg(not(target_arch = "wasm32"))]
110    AzureCli {},
111
112    /// Use certificate credentials
113    ClientCertificateCredential {
114        /// The [Azure Tenant ID][azure_tenant_id].
115        ///
116        /// [azure_tenant_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
117        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
118        #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID:?err}"))]
119        azure_tenant_id: String,
120
121        /// The [Azure Client ID][azure_client_id].
122        ///
123        /// [azure_client_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
124        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
125        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID:?err}"))]
126        azure_client_id: String,
127
128        /// PKCS12 certificate with RSA private key.
129        #[configurable(metadata(docs::examples = "path/to/certificate.pfx"))]
130        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_CERTIFICATE_PATH:?err}"))]
131        certificate_path: PathBuf,
132
133        /// The password for the client certificate, if applicable.
134        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_CERTIFICATE_PASSWORD}"))]
135        certificate_password: Option<SensitiveString>,
136    },
137
138    /// Use client ID/secret credentials
139    ClientSecretCredential {
140        /// The [Azure Tenant ID][azure_tenant_id].
141        ///
142        /// [azure_tenant_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
143        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
144        #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID:?err}"))]
145        azure_tenant_id: String,
146
147        /// The [Azure Client ID][azure_client_id].
148        ///
149        /// [azure_client_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
150        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
151        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID:?err}"))]
152        azure_client_id: String,
153
154        /// The [Azure Client Secret][azure_client_secret].
155        ///
156        /// [azure_client_secret]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
157        #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))]
158        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_SECRET:?err}"))]
159        azure_client_secret: SensitiveString,
160    },
161
162    /// Use Managed Identity credentials
163    ManagedIdentity {
164        /// The User Assigned Managed Identity to use.
165        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
166        #[serde(default, skip_serializing_if = "Option::is_none")]
167        user_assigned_managed_identity_id: Option<String>,
168
169        /// The type of the User Assigned Managed Identity ID provided (Client ID, Object ID,
170        /// or Resource ID). Defaults to Client ID.
171        user_assigned_managed_identity_id_type: Option<UserAssignedManagedIdentityIdType>,
172    },
173
174    /// Use Managed Identity with Client Assertion credentials
175    ManagedIdentityClientAssertion {
176        /// The User Assigned Managed Identity to use for the managed identity.
177        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
178        #[configurable(metadata(
179            docs::examples = "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/rg-vector/providers/Microsoft.ManagedIdentity/userAssignedIdentities/id-vector-uami"
180        ))]
181        #[serde(default, skip_serializing_if = "Option::is_none")]
182        user_assigned_managed_identity_id: Option<String>,
183
184        /// The type of the User Assigned Managed Identity ID provided (Client ID, Object ID, or Resource ID). Defaults to Client ID.
185        user_assigned_managed_identity_id_type: Option<UserAssignedManagedIdentityIdType>,
186
187        /// The target Tenant ID to use.
188        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
189        client_assertion_tenant_id: String,
190
191        /// The target Client ID to use.
192        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
193        client_assertion_client_id: String,
194    },
195
196    /// Use Workload Identity credentials
197    WorkloadIdentity {
198        /// The [Azure Tenant ID][azure_tenant_id]. Defaults to the value of the environment variable `AZURE_TENANT_ID`.
199        ///
200        /// [azure_tenant_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
201        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
202        #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID}"))]
203        tenant_id: Option<String>,
204
205        /// The [Azure Client ID][azure_client_id]. Defaults to the value of the environment variable `AZURE_CLIENT_ID`.
206        ///
207        /// [azure_client_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
208        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
209        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID}"))]
210        client_id: Option<String>,
211
212        /// Path of a file containing a Kubernetes service account token. Defaults to the value of the environment variable `AZURE_FEDERATED_TOKEN_FILE`.
213        #[configurable(metadata(
214            docs::examples = "/var/run/secrets/azure/tokens/azure-identity-token"
215        ))]
216        #[configurable(metadata(docs::examples = "${AZURE_FEDERATED_TOKEN_FILE}"))]
217        token_file_path: Option<PathBuf>,
218    },
219}
220
221#[derive(Debug)]
222struct ManagedIdentityClientAssertion {
223    credential: Arc<dyn TokenCredential>,
224    scope: String,
225}
226
227#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
228#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
229impl ClientAssertion for ManagedIdentityClientAssertion {
230    async fn secret(&self, options: Option<ClientMethodOptions<'_>>) -> azure_core::Result<String> {
231        Ok(self
232            .credential
233            .get_token(
234                &[&self.scope],
235                Some(TokenRequestOptions {
236                    method_options: options.unwrap_or_default(),
237                }),
238            )
239            .await?
240            .token
241            .secret()
242            .to_string())
243    }
244}
245
246impl AzureAuthentication {
247    /// Returns the provider for the credentials based on the authentication mechanism chosen.
248    pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
249        match self {
250            Self::Specific(specific) => specific.credential().await,
251
252            #[cfg(test)]
253            Self::MockCredential => Ok(Arc::new(MockTokenCredential) as Arc<dyn TokenCredential>),
254        }
255    }
256}
257
258impl SpecificAzureCredential {
259    /// Returns the provider for the credentials based on the specific credential type.
260    pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
261        let credential: Arc<dyn TokenCredential> = match self {
262            #[cfg(not(target_arch = "wasm32"))]
263            Self::AzureCli {} => AzureCliCredential::new(None)?,
264
265            // requires azure_identity feature 'client_certificate'
266            Self::ClientCertificateCredential {
267                azure_tenant_id,
268                azure_client_id,
269                certificate_path,
270                certificate_password,
271            } => {
272                let certificate_bytes: Vec<u8> = std::fs::read(certificate_path).map_err(|e| {
273                    Error::with_message(
274                        ErrorKind::Credential,
275                        format!(
276                            "Failed to read certificate file {}: {e}",
277                            certificate_path.display()
278                        ),
279                    )
280                })?;
281
282                let mut options: ClientCertificateCredentialOptions =
283                    ClientCertificateCredentialOptions::default();
284                if let Some(password) = certificate_password {
285                    options.password = Some(password.inner().to_string().into());
286                }
287
288                ClientCertificateCredential::new(
289                    azure_tenant_id.clone(),
290                    azure_client_id.clone(),
291                    certificate_bytes.into(),
292                    Some(options),
293                )?
294            }
295
296            Self::ClientSecretCredential {
297                azure_tenant_id,
298                azure_client_id,
299                azure_client_secret,
300            } => {
301                if azure_tenant_id.is_empty() {
302                    return Err(Error::with_message(ErrorKind::Credential,
303                        "`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
304                    ));
305                }
306                if azure_client_id.is_empty() {
307                    return Err(Error::with_message(ErrorKind::Credential,
308                        "`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
309                    ));
310                }
311                if azure_client_secret.inner().is_empty() {
312                    return Err(Error::with_message(ErrorKind::Credential,
313                        "`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
314                    ));
315                }
316
317                let secret: String = azure_client_secret.inner().into();
318                ClientSecretCredential::new(
319                    &azure_tenant_id.clone(),
320                    azure_client_id.clone(),
321                    secret.into(),
322                    None,
323                )?
324            }
325
326            Self::ManagedIdentity {
327                user_assigned_managed_identity_id,
328                user_assigned_managed_identity_id_type,
329            } => {
330                let mut options = ManagedIdentityCredentialOptions::default();
331                if let Some(id) = user_assigned_managed_identity_id {
332                    options.user_assigned_id = match user_assigned_managed_identity_id_type
333                        .as_ref()
334                        .unwrap_or(&Default::default())
335                    {
336                        UserAssignedManagedIdentityIdType::ClientId => {
337                            Some(UserAssignedId::ClientId(id.clone()))
338                        }
339                        UserAssignedManagedIdentityIdType::ObjectId => {
340                            Some(UserAssignedId::ObjectId(id.clone()))
341                        }
342                        UserAssignedManagedIdentityIdType::ResourceId => {
343                            Some(UserAssignedId::ResourceId(id.clone()))
344                        }
345                    };
346                }
347                ManagedIdentityCredential::new(Some(options))?
348            }
349
350            Self::ManagedIdentityClientAssertion {
351                user_assigned_managed_identity_id,
352                user_assigned_managed_identity_id_type,
353                client_assertion_tenant_id,
354                client_assertion_client_id,
355            } => {
356                let mut options = ManagedIdentityCredentialOptions::default();
357                if let Some(id) = user_assigned_managed_identity_id {
358                    options.user_assigned_id = match user_assigned_managed_identity_id_type
359                        .as_ref()
360                        .unwrap_or(&Default::default())
361                    {
362                        UserAssignedManagedIdentityIdType::ClientId => {
363                            Some(UserAssignedId::ClientId(id.clone()))
364                        }
365                        UserAssignedManagedIdentityIdType::ObjectId => {
366                            Some(UserAssignedId::ObjectId(id.clone()))
367                        }
368                        UserAssignedManagedIdentityIdType::ResourceId => {
369                            Some(UserAssignedId::ResourceId(id.clone()))
370                        }
371                    };
372                }
373                let msi: Arc<dyn TokenCredential> = ManagedIdentityCredential::new(Some(options))?;
374                let assertion = ManagedIdentityClientAssertion {
375                    credential: msi,
376                    // Future: make this configurable for sovereign clouds? (no way to test...)
377                    scope: "api://AzureADTokenExchange/.default".to_string(),
378                };
379
380                ClientAssertionCredential::new(
381                    client_assertion_tenant_id.clone(),
382                    client_assertion_client_id.clone(),
383                    assertion,
384                    None,
385                )?
386            }
387
388            Self::WorkloadIdentity {
389                tenant_id,
390                client_id,
391                token_file_path,
392            } => {
393                let options = WorkloadIdentityCredentialOptions {
394                    tenant_id: tenant_id.clone(),
395                    client_id: client_id.clone(),
396                    token_file_path: token_file_path.clone(),
397                    ..Default::default()
398                };
399
400                WorkloadIdentityCredential::new(Some(options))?
401            }
402        };
403        Ok(credential)
404    }
405}
406
407#[derive(Debug, Clone)]
408pub struct AzureBlobRequest {
409    pub blob_data: Bytes,
410    pub content_encoding: Option<&'static str>,
411    pub content_type: &'static str,
412    pub metadata: AzureBlobMetadata,
413    pub request_metadata: RequestMetadata,
414}
415
416impl Finalizable for AzureBlobRequest {
417    fn take_finalizers(&mut self) -> EventFinalizers {
418        std::mem::take(&mut self.metadata.finalizers)
419    }
420}
421
422impl MetaDescriptive for AzureBlobRequest {
423    fn get_metadata(&self) -> &RequestMetadata {
424        &self.request_metadata
425    }
426
427    fn metadata_mut(&mut self) -> &mut RequestMetadata {
428        &mut self.request_metadata
429    }
430}
431
432#[derive(Clone, Debug)]
433pub struct AzureBlobMetadata {
434    pub partition_key: String,
435    pub count: usize,
436    pub byte_size: JsonSize,
437    pub finalizers: EventFinalizers,
438}
439
440#[derive(Debug, Clone)]
441pub struct AzureBlobRetryLogic;
442
443impl RetryLogic for AzureBlobRetryLogic {
444    type Error = AzureCoreError;
445    type Request = AzureBlobRequest;
446    type Response = AzureBlobResponse;
447
448    fn is_retriable_error(&self, error: &Self::Error) -> bool {
449        match error.http_status() {
450            Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests,
451            None => false,
452        }
453    }
454}
455
456#[derive(Debug)]
457pub struct AzureBlobResponse {
458    pub events_byte_size: GroupedCountByteSize,
459    pub byte_size: usize,
460}
461
462impl DriverResponse for AzureBlobResponse {
463    fn event_status(&self) -> EventStatus {
464        EventStatus::Delivered
465    }
466
467    fn events_sent(&self) -> &GroupedCountByteSize {
468        &self.events_byte_size
469    }
470
471    fn bytes_sent(&self) -> Option<usize> {
472        Some(self.byte_size)
473    }
474}
475
476#[derive(Debug, Snafu)]
477pub enum HealthcheckError {
478    #[snafu(display("Invalid connection string specified"))]
479    InvalidCredentials,
480    #[snafu(display("Container: {:?} not found", container))]
481    UnknownContainer { container: String },
482    #[snafu(display("Unknown status code: {}", status))]
483    Unknown { status: StatusCode },
484}
485
486pub fn build_healthcheck(
487    container_name: String,
488    client: Arc<BlobContainerClient>,
489) -> crate::Result<Healthcheck> {
490    let healthcheck = async move {
491        let resp: crate::Result<()> = match client.get_properties(None).await {
492            Ok(_) => Ok(()),
493            Err(error) => {
494                let code = error.http_status();
495                Err(match code {
496                    Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials),
497                    Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer {
498                        container: container_name,
499                    }),
500                    Some(status) => Box::new(HealthcheckError::Unknown { status }),
501                    None => "unknown status code".into(),
502                })
503            }
504        };
505        resp
506    };
507
508    Ok(healthcheck.boxed())
509}
510
511pub async fn build_client(
512    auth: Option<AzureAuthentication>,
513    connection_string: String,
514    container_name: String,
515    proxy: &crate::config::ProxyConfig,
516    tls: Option<AzureBlobTlsConfig>,
517) -> crate::Result<Arc<BlobContainerClient>> {
518    // Parse connection string without legacy SDK
519    let parsed = ParsedConnectionString::parse(&connection_string)
520        .map_err(|e| format!("Invalid connection string: {e}"))?;
521    // Compose container URL (SAS appended if present)
522    let container_url = parsed
523        .container_url(&container_name)
524        .map_err(|e| format!("Failed to build container URL: {e}"))?;
525    let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?;
526
527    let mut credential: Option<Arc<dyn TokenCredential>> = None;
528
529    // Prepare options; attach Shared Key policy if needed
530    let mut options = BlobContainerClientOptions::default();
531    match (parsed.auth(), &auth) {
532        (Auth::None, None) => {
533            warn!("No authentication method provided, requests will be anonymous.");
534        }
535        (Auth::Sas { .. }, None) => {
536            info!("Using SAS token authentication.");
537        }
538        (
539            Auth::SharedKey {
540                account_name,
541                account_key,
542            },
543            None,
544        ) => {
545            info!("Using Shared Key authentication.");
546
547            let policy = SharedKeyAuthorizationPolicy::new(
548                account_name,
549                account_key,
550                // Use an Azurite-supported storage service version
551                String::from("2025-11-05"),
552            )
553            .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?;
554            options
555                .client_options
556                .per_call_policies
557                .push(Arc::new(policy));
558        }
559        (Auth::None, Some(AzureAuthentication::Specific(..))) => {
560            info!("Using Azure Authentication method.");
561            let credential_result: Arc<dyn TokenCredential> =
562                auth.unwrap().credential().await.map_err(|e| {
563                    Error::with_message(
564                        ErrorKind::Credential,
565                        format!("Failed to configure Azure Authentication: {e}"),
566                    )
567                })?;
568            credential = Some(credential_result);
569        }
570        (Auth::Sas { .. }, Some(AzureAuthentication::Specific(..))) => {
571            return Err(Box::new(Error::with_message(
572                ErrorKind::Credential,
573                "Cannot use both SAS token and another Azure Authentication method at the same time",
574            )));
575        }
576        (Auth::SharedKey { .. }, Some(AzureAuthentication::Specific(..))) => {
577            return Err(Box::new(Error::with_message(
578                ErrorKind::Credential,
579                "Cannot use both Shared Key and another Azure Authentication method at the same time",
580            )));
581        }
582        #[cfg(test)]
583        (Auth::None, Some(AzureAuthentication::MockCredential)) => {
584            warn!("Using mock token credential authentication.");
585            credential = Some(auth.unwrap().credential().await.unwrap());
586        }
587        #[cfg(test)]
588        (_, Some(AzureAuthentication::MockCredential)) => {
589            return Err(Box::new(Error::with_message(
590                ErrorKind::Credential,
591                "Cannot use both connection string auth and mock credential at the same time",
592            )));
593        }
594    }
595
596    // Use reqwest v0.13 since Azure SDK only implements HttpClient for reqwest::Client v0.13
597    let mut reqwest_builder = reqwest_13::ClientBuilder::new();
598    let bypass_proxy = {
599        let host = url.host_str().unwrap_or("");
600        let port = url.port();
601        proxy.no_proxy.matches(host)
602            || port
603                .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
604                .unwrap_or(false)
605    };
606    if bypass_proxy || !proxy.enabled {
607        // Ensure no proxy (and disable any potential system proxy auto-detection)
608        reqwest_builder = reqwest_builder.no_proxy();
609    } else {
610        if let Some(http) = &proxy.http {
611            let p = reqwest_13::Proxy::http(http)
612                .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
613            // If credentials are embedded in the proxy URL, reqwest will handle them.
614            reqwest_builder = reqwest_builder.proxy(p);
615        }
616        if let Some(https) = &proxy.https {
617            let p = reqwest_13::Proxy::https(https)
618                .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
619            // If credentials are embedded in the proxy URL, reqwest will handle them.
620            reqwest_builder = reqwest_builder.proxy(p);
621        }
622    }
623
624    if let Some(AzureBlobTlsConfig { ca_file }) = &tls
625        && let Some(ca_file) = ca_file
626    {
627        let mut buf = Vec::new();
628        File::open(ca_file)?.read_to_end(&mut buf)?;
629        let cert = reqwest_13::Certificate::from_pem(&buf)?;
630
631        warn!("Adding TLS root certificate from {}", ca_file.display());
632        reqwest_builder = reqwest_builder.add_root_certificate(cert);
633    }
634
635    options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
636        reqwest_builder
637            .build()
638            .map_err(|e| format!("Failed to build reqwest client: {e}"))?,
639    )));
640    let client = BlobContainerClient::from_url(url, credential, Some(options))
641        .map_err(|e| format!("{e}"))?;
642    Ok(Arc::new(client))
643}
644
645#[cfg(test)]
646#[derive(Debug)]
647struct MockTokenCredential;
648
649#[cfg(test)]
650#[async_trait::async_trait]
651impl TokenCredential for MockTokenCredential {
652    async fn get_token(
653        &self,
654        scopes: &[&str],
655        _options: Option<azure_core::credentials::TokenRequestOptions<'_>>,
656    ) -> azure_core::Result<azure_core::credentials::AccessToken> {
657        let Some(scope) = scopes.first() else {
658            return Err(Error::with_message(
659                ErrorKind::Credential,
660                "no scopes were provided",
661            ));
662        };
663
664        // serde_json sometimes does and sometimes doesn't preserve order, be careful to sort
665        // the claims in alphabetical order to ensure a consistent base64 encoding for testing
666        let jwt = serde_json::json!({
667            "aud": scope.strip_suffix("/.default").unwrap_or(*scope),
668            "exp": 2147483647,
669            "iat": 0,
670            "iss": "https://sts.windows.net/",
671            "nbf": 0,
672        });
673
674        // JWTs do not include standard base64 padding.
675        // this seemed cleaner than importing a new crates just for this function
676        let jwt_base64 = format!(
677            "e30.{}.",
678            BASE64_STANDARD
679                .encode(serde_json::to_string(&jwt).unwrap())
680                .trim_end_matches("=")
681        )
682        .to_string();
683
684        warn!(
685            "Using mock token credential, JWT: {}, base64: {}",
686            serde_json::to_string(&jwt).unwrap(),
687            jwt_base64
688        );
689
690        Ok(azure_core::credentials::AccessToken::new(
691            jwt_base64,
692            azure_core::time::OffsetDateTime::now_utc() + std::time::Duration::from_secs(3600),
693        ))
694    }
695}
696
697#[cfg(test)]
698#[tokio::test]
699async fn azure_mock_token_credential_test() {
700    let credential = MockTokenCredential;
701    let access_token = credential
702        .get_token(&["https://example.com/.default"], None)
703        .await
704        .expect("valid credential should return a token");
705    assert_eq!(
706        access_token.token.secret(),
707        "e30.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiZXhwIjoyMTQ3NDgzNjQ3LCJpYXQiOjAsImlzcyI6Imh0dHBzOi8vc3RzLndpbmRvd3MubmV0LyIsIm5iZiI6MH0."
708    );
709}