vector/sinks/azure_common/
service.rs1use std::{
2 result::Result as StdResult,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use azure_core::http::RequestContent;
8use azure_storage_blob::{BlobContainerClient, models::BlockBlobClientUploadOptions};
9use futures::future::BoxFuture;
10use tower::Service;
11use tracing::Instrument;
12
13use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse};
14
15#[derive(Clone)]
16pub struct AzureBlobService {
17 client: Arc<BlobContainerClient>,
19}
20
21impl AzureBlobService {
22 pub const fn new(client: Arc<BlobContainerClient>) -> AzureBlobService {
23 AzureBlobService { client }
24 }
25}
26
27impl Service<AzureBlobRequest> for AzureBlobService {
28 type Response = AzureBlobResponse;
29 type Error = Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>;
30 type Future = BoxFuture<'static, StdResult<Self::Response, Self::Error>>;
31
32 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> {
34 Poll::Ready(Ok(()))
35 }
36
37 fn call(&mut self, request: AzureBlobRequest) -> Self::Future {
39 let this = self.clone();
40
41 Box::pin(async move {
42 let blob_client = this
43 .client
44 .blob_client(request.metadata.partition_key.as_str());
45 let byte_size = request.blob_data.len();
46 let upload_options = BlockBlobClientUploadOptions {
47 blob_content_type: Some(request.content_type.to_string()),
48 blob_content_encoding: request.content_encoding.map(|e| e.to_string()),
49 ..Default::default()
50 }
51 .with_if_not_exists();
52
53 let result = blob_client
54 .upload(
55 RequestContent::from(request.blob_data.to_vec()),
56 Some(upload_options),
57 )
58 .instrument(info_span!("request").or_current())
59 .await
60 .map_err(|err| err.into());
61
62 result.map(|_resp| AzureBlobResponse {
63 events_byte_size: request
64 .request_metadata
65 .into_events_estimated_json_encoded_byte_size(),
66 byte_size,
67 })
68 })
69 }
70}