vector/sinks/
opendal_common.rs1use std::{fmt, task::Poll};
12
13use bytes::Bytes;
14use opendal::Operator;
15use snafu::Snafu;
16use tracing::Instrument;
17use vector_lib::codecs::encoding::Framer;
18
19use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner};
20
21pub struct OpenDalSink<Svc> {
28 service: Svc,
29 request_builder: OpenDalRequestBuilder,
30 partitioner: KeyPartitioner,
31 batcher_settings: BatcherSettings,
32}
33
34impl<Svc> OpenDalSink<Svc> {
35 pub const fn new(
37 service: Svc,
38 request_builder: OpenDalRequestBuilder,
39 partitioner: KeyPartitioner,
40 batcher_settings: BatcherSettings,
41 ) -> Self {
42 Self {
43 service,
44 request_builder,
45 partitioner,
46 batcher_settings,
47 }
48 }
49}
50
51#[async_trait::async_trait]
52impl<Svc> StreamSink<Event> for OpenDalSink<Svc>
53where
54 Svc: Service<OpenDalRequest> + Send + 'static,
55 Svc::Future: Send + 'static,
56 Svc::Response: DriverResponse + Send + 'static,
57 Svc::Error: fmt::Debug + Into<crate::Error> + Send,
58{
59 async fn run(
60 self: Box<Self>,
61 input: futures_util::stream::BoxStream<'_, Event>,
62 ) -> Result<(), ()> {
63 self.run_inner(input).await
64 }
65}
66
67impl<Svc> OpenDalSink<Svc>
68where
69 Svc: Service<OpenDalRequest> + Send + 'static,
70 Svc::Future: Send + 'static,
71 Svc::Response: DriverResponse + Send + 'static,
72 Svc::Error: fmt::Debug + Into<crate::Error> + Send,
73{
74 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
75 let partitioner = self.partitioner;
76 let settings = self.batcher_settings;
77
78 let request_builder = self.request_builder;
79
80 input
81 .batched_partitioned(partitioner, settings.timeout, |_| {
82 settings.as_byte_size_config()
83 })
84 .filter_map(|(key, batch)| async move {
85 key.map(move |k| (k, batch))
89 })
90 .request_builder(default_request_builder_concurrency_limit(), request_builder)
91 .filter_map(|request| async move {
92 match request {
93 Err(error) => {
94 emit!(SinkRequestBuildError { error });
95 None
96 }
97 Ok(req) => Some(req),
98 }
99 })
100 .into_driver(self.service)
101 .protocol("file")
103 .run()
104 .await
105 }
106}
107
108#[derive(Debug, Clone)]
111pub struct OpenDalService {
112 op: Operator,
113}
114
115impl OpenDalService {
116 pub const fn new(op: Operator) -> OpenDalService {
117 OpenDalService { op }
118 }
119}
120
121#[derive(Clone)]
126pub struct OpenDalRequest {
127 pub payload: Bytes,
128 pub metadata: OpenDalMetadata,
129 pub request_metadata: RequestMetadata,
130}
131
132impl MetaDescriptive for OpenDalRequest {
133 fn get_metadata(&self) -> &RequestMetadata {
134 &self.request_metadata
135 }
136
137 fn metadata_mut(&mut self) -> &mut RequestMetadata {
138 &mut self.request_metadata
139 }
140}
141
142impl Finalizable for OpenDalRequest {
143 fn take_finalizers(&mut self) -> EventFinalizers {
144 std::mem::take(&mut self.metadata.finalizers)
145 }
146}
147
148#[derive(Clone)]
150pub struct OpenDalMetadata {
151 pub partition_key: String,
152 pub count: usize,
153 pub byte_size: JsonSize,
154 pub finalizers: EventFinalizers,
155}
156
157pub struct OpenDalRequestBuilder {
160 pub encoder: (Transformer, Encoder<Framer>),
161 pub compression: Compression,
162}
163
164impl RequestBuilder<(String, Vec<Event>)> for OpenDalRequestBuilder {
165 type Metadata = OpenDalMetadata;
166 type Events = Vec<Event>;
167 type Encoder = (Transformer, Encoder<Framer>);
168 type Payload = Bytes;
169 type Request = OpenDalRequest;
170 type Error = std::io::Error;
171
172 fn compression(&self) -> Compression {
173 self.compression
174 }
175
176 fn encoder(&self) -> &Self::Encoder {
177 &self.encoder
178 }
179
180 fn split_input(
181 &self,
182 input: (String, Vec<Event>),
183 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
184 let (partition_key, mut events) = input;
185 let finalizers = events.take_finalizers();
186 let opendal_metadata = OpenDalMetadata {
187 partition_key,
188 count: events.len(),
189 byte_size: events.estimated_json_encoded_size_of(),
190 finalizers,
191 };
192
193 let builder = RequestMetadataBuilder::from_events(&events);
194
195 (opendal_metadata, builder, events)
196 }
197
198 fn build_request(
199 &self,
200 mut metadata: Self::Metadata,
201 request_metadata: RequestMetadata,
202 payload: EncodeResult<Self::Payload>,
203 ) -> Self::Request {
204 let name = uuid::Uuid::new_v4().to_string();
206 let extension = self.compression.extension();
207
208 metadata.partition_key = format!("{}{}.{}", metadata.partition_key, name, extension);
209
210 OpenDalRequest {
211 metadata,
212 payload: payload.into_payload(),
213 request_metadata,
214 }
215 }
216}
217
218#[derive(Debug)]
220pub struct OpenDalResponse {
221 pub events_byte_size: GroupedCountByteSize,
222 pub byte_size: usize,
223}
224
225impl DriverResponse for OpenDalResponse {
226 fn event_status(&self) -> EventStatus {
227 EventStatus::Delivered
228 }
229
230 fn events_sent(&self) -> &GroupedCountByteSize {
231 &self.events_byte_size
232 }
233
234 fn bytes_sent(&self) -> Option<usize> {
235 Some(self.byte_size)
236 }
237}
238
239impl Service<OpenDalRequest> for OpenDalService {
240 type Response = OpenDalResponse;
241 type Error = opendal::Error;
242 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
243
244 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
246 Poll::Ready(Ok(()))
247 }
248
249 fn call(&mut self, request: OpenDalRequest) -> Self::Future {
251 let byte_size = request.payload.len();
252 let op = self.op.clone();
253
254 Box::pin(async move {
255 let result = op
256 .write(&request.metadata.partition_key, request.payload)
257 .in_current_span()
258 .await;
259 result.map(|_| OpenDalResponse {
260 events_byte_size: request
261 .request_metadata
262 .into_events_estimated_json_encoded_byte_size(),
263 byte_size,
264 })
265 })
266 }
267}
268
269#[derive(Debug, Snafu)]
275pub enum OpenDalError {
276 #[snafu(display("Failed to call OpenDal: {}", source))]
277 OpenDal { source: opendal::Error },
278}
279
280impl From<opendal::Error> for OpenDalError {
281 fn from(source: opendal::Error) -> Self {
282 Self::OpenDal { source }
283 }
284}