vector/sinks/databricks_zerobus/
config.rs

1//! Configuration for the Zerobus sink.
2
3use vector_lib::configurable::configurable_component;
4use vector_lib::sensitive_string::SensitiveString;
5
6use crate::config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext};
7use crate::sinks::{
8    prelude::*,
9    util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings},
10};
11
12use vector_lib::codecs::encoding::{
13    BatchEncoder, BatchSerializerConfig, ProtoBatchSerializerConfig,
14};
15
16use super::{
17    error::ZerobusSinkError,
18    service::{StreamMode, ZerobusService},
19    sink::ZerobusSink,
20};
21
22/// Authentication configuration for Databricks.
23#[configurable_component]
24#[derive(Clone, Debug)]
25#[serde(tag = "strategy", rename_all = "snake_case")]
26#[configurable(metadata(
27    docs::enum_tag_description = "The authentication strategy to use for Databricks."
28))]
29pub enum DatabricksAuthentication {
30    /// Authenticate using OAuth 2.0 client credentials.
31    #[serde(rename = "oauth")]
32    OAuth {
33        /// OAuth 2.0 client ID.
34        #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_ID}"))]
35        #[configurable(metadata(docs::examples = "abc123..."))]
36        client_id: SensitiveString,
37
38        /// OAuth 2.0 client secret.
39        #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_SECRET}"))]
40        #[configurable(metadata(docs::examples = "secret123..."))]
41        client_secret: SensitiveString,
42    },
43}
44
45impl DatabricksAuthentication {
46    /// Extract the client ID and client secret as string references.
47    pub fn credentials(&self) -> (&str, &str) {
48        match self {
49            DatabricksAuthentication::OAuth {
50                client_id,
51                client_secret,
52            } => (client_id.inner(), client_secret.inner()),
53        }
54    }
55}
56
57/// Zerobus stream configuration options.
58///
59/// This is a thin wrapper around the SDK's `StreamConfigurationOptions` with Vector-specific
60/// configuration attributes and custom defaults suitable for Vector's use case.
61#[configurable_component]
62#[derive(Clone, Debug)]
63#[serde(deny_unknown_fields)]
64pub struct ZerobusStreamOptions {
65    /// Timeout in milliseconds for flush operations.
66    #[serde(default = "default_flush_timeout_ms")]
67    #[configurable(metadata(docs::examples = 30000))]
68    pub flush_timeout_ms: u64,
69
70    /// Timeout in milliseconds for server acknowledgements.
71    #[serde(default = "default_server_ack_timeout_ms")]
72    #[configurable(metadata(docs::examples = 60000))]
73    pub server_lack_of_ack_timeout_ms: u64,
74}
75
76impl Default for ZerobusStreamOptions {
77    fn default() -> Self {
78        Self {
79            flush_timeout_ms: default_flush_timeout_ms(),
80            server_lack_of_ack_timeout_ms: default_server_ack_timeout_ms(),
81        }
82    }
83}
84
85/// Configuration for the Databricks Zerobus sink.
86#[configurable_component(sink(
87    "databricks_zerobus",
88    "Stream observability data to Databricks Unity Catalog via Zerobus."
89))]
90#[derive(Clone, Debug)]
91#[serde(deny_unknown_fields)]
92pub struct ZerobusSinkConfig {
93    /// The Zerobus ingestion endpoint URL.
94    ///
95    /// This should be the full URL to the Zerobus ingestion service.
96    #[configurable(metadata(docs::examples = "https://ingest.dev.databricks.com"))]
97    #[configurable(metadata(docs::examples = "https://ingest.prod.databricks.com"))]
98    pub ingestion_endpoint: String,
99
100    /// The Unity Catalog table name to write to.
101    ///
102    /// This should be in the format `catalog.schema.table`.
103    #[configurable(metadata(docs::examples = "logging_platform.my_team.logs"))]
104    #[configurable(metadata(docs::examples = "main.default.vector_logs"))]
105    pub table_name: String,
106
107    /// The Unity Catalog endpoint URL.
108    ///
109    /// This is used for authentication and table metadata.
110    #[configurable(metadata(
111        docs::examples = "https://dbc-e2f0eb31-2b0e.staging.cloud.databricks.com"
112    ))]
113    #[configurable(metadata(docs::examples = "https://your-workspace.cloud.databricks.com"))]
114    pub unity_catalog_endpoint: String,
115
116    /// Databricks authentication configuration.
117    #[configurable(derived)]
118    pub auth: DatabricksAuthentication,
119
120    #[configurable(derived)]
121    #[serde(default)]
122    pub stream_options: ZerobusStreamOptions,
123
124    #[configurable(derived)]
125    #[serde(default)]
126    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
127
128    #[configurable(derived)]
129    #[serde(default)]
130    pub request: TowerRequestConfig,
131
132    #[configurable(derived)]
133    #[serde(
134        default,
135        deserialize_with = "crate::serde::bool_or_struct",
136        skip_serializing_if = "crate::serde::is_default"
137    )]
138    pub acknowledgements: AcknowledgementsConfig,
139}
140
141impl GenerateConfig for ZerobusSinkConfig {
142    fn generate_config() -> toml::Value {
143        toml::Value::try_from(Self {
144            ingestion_endpoint: "https://ingest.dev.databricks.com".to_string(),
145            table_name: "catalog.schema.table".to_string(),
146            unity_catalog_endpoint: "https://your-workspace.cloud.databricks.com".to_string(),
147            auth: DatabricksAuthentication::OAuth {
148                client_id: SensitiveString::from("${DATABRICKS_CLIENT_ID}".to_string()),
149                client_secret: SensitiveString::from("${DATABRICKS_CLIENT_SECRET}".to_string()),
150            },
151            stream_options: ZerobusStreamOptions::default(),
152            batch: BatchConfig::default(),
153            request: TowerRequestConfig::default(),
154            acknowledgements: AcknowledgementsConfig::default(),
155        })
156        .unwrap()
157    }
158}
159
160#[async_trait::async_trait]
161#[typetag::serde(name = "databricks_zerobus")]
162impl SinkConfig for ZerobusSinkConfig {
163    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
164        self.validate()?;
165
166        let descriptor = ZerobusService::resolve_descriptor(self, cx.proxy()).await?;
167
168        // The zerobus sink always encodes in proto_batch form — the stream
169        // descriptor is the one we just resolved from Unity Catalog.
170        let descriptor_proto = std::sync::Arc::new(descriptor.descriptor_proto().clone());
171        let stream_mode = StreamMode::Proto { descriptor_proto };
172
173        let proto_config = ProtoBatchSerializerConfig {
174            descriptor: Some(descriptor),
175        };
176        let batch_serializer = BatchSerializerConfig::ProtoBatch(proto_config)
177            .build_batch_serializer()
178            .map_err(|e| format!("Failed to build batch serializer: {}", e))?;
179        let encoder = BatchEncoder::new(batch_serializer);
180
181        let service = ZerobusService::new(self.clone(), stream_mode, cx.proxy()).await?;
182        let healthcheck_service = service.clone();
183
184        let request_limits = self.request.into_settings();
185
186        let sink = ZerobusSink::new(service, request_limits, self.batch, encoder)?;
187
188        let healthcheck = async move {
189            healthcheck_service
190                .ensure_stream()
191                .await
192                .map_err(|e| e.into())
193        };
194
195        Ok((
196            VectorSink::from_event_streamsink(sink),
197            Box::pin(healthcheck),
198        ))
199    }
200
201    fn input(&self) -> Input {
202        Input::log()
203    }
204
205    fn acknowledgements(&self) -> &AcknowledgementsConfig {
206        &self.acknowledgements
207    }
208}
209
210impl ZerobusSinkConfig {
211    pub fn validate(&self) -> Result<(), ZerobusSinkError> {
212        if self.ingestion_endpoint.is_empty() {
213            return Err(ZerobusSinkError::ConfigError {
214                message: "ingestion_endpoint cannot be empty".to_string(),
215            });
216        }
217
218        if self.table_name.is_empty() {
219            return Err(ZerobusSinkError::ConfigError {
220                message: "table_name cannot be empty".to_string(),
221            });
222        }
223
224        let parts: Vec<&str> = self.table_name.split('.').collect();
225        if parts.len() != 3 || parts.iter().any(|p| p.is_empty()) {
226            return Err(ZerobusSinkError::ConfigError {
227                message: "table_name must be in format 'catalog.schema.table' (exactly 3 non-empty parts)"
228                    .to_string(),
229            });
230        }
231
232        if self.unity_catalog_endpoint.is_empty() {
233            return Err(ZerobusSinkError::ConfigError {
234                message: "unity_catalog_endpoint cannot be empty".to_string(),
235            });
236        }
237
238        // Validate authentication credentials
239        match &self.auth {
240            DatabricksAuthentication::OAuth {
241                client_id,
242                client_secret,
243            } => {
244                if client_id.inner().is_empty() {
245                    return Err(ZerobusSinkError::ConfigError {
246                        message: "OAuth client_id cannot be empty".to_string(),
247                    });
248                }
249                if client_secret.inner().is_empty() {
250                    return Err(ZerobusSinkError::ConfigError {
251                        message: "OAuth client_secret cannot be empty".to_string(),
252                    });
253                }
254            }
255        }
256
257        if let Some(max_bytes) = self.batch.max_bytes {
258            // Zerobus SDK limits max bytes to 10MB. This cap is a conservative safety limit:
259            // it's measured against Vector's pre-serialization sizing, not the protobuf bytes
260            // the SDK actually sends. Vector's pre-serialization size is generally larger than
261            // the SDK's protobuf-encoded size, so enforcing the 10MB cap here ensures the SDK's
262            // 10MB limit cannot be exceeded at runtime.
263            if max_bytes > 10_000_000 {
264                return Err(ZerobusSinkError::ConfigError {
265                    message: "max_bytes must be less than or equal to 10MB".to_string(),
266                });
267            }
268        }
269
270        Ok(())
271    }
272}
273
274// Default value functions
275const fn default_flush_timeout_ms() -> u64 {
276    30000
277}
278
279const fn default_server_ack_timeout_ms() -> u64 {
280    60000
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use vector_lib::sensitive_string::SensitiveString;
287
288    fn create_test_config() -> ZerobusSinkConfig {
289        ZerobusSinkConfig {
290            ingestion_endpoint: "https://test.databricks.com".to_string(),
291            table_name: "test.default.logs".to_string(),
292            unity_catalog_endpoint: "https://test-workspace.databricks.com".to_string(),
293            auth: DatabricksAuthentication::OAuth {
294                client_id: SensitiveString::from("test-client-id".to_string()),
295                client_secret: SensitiveString::from("test-client-secret".to_string()),
296            },
297            stream_options: ZerobusStreamOptions::default(),
298            batch: Default::default(),
299            request: Default::default(),
300            acknowledgements: Default::default(),
301        }
302    }
303
304    #[test]
305    fn test_config_validation_success() {
306        let config = create_test_config();
307        assert!(config.validate().is_ok());
308    }
309
310    #[test]
311    fn test_config_validation_empty_endpoint() {
312        let mut config = create_test_config();
313        config.ingestion_endpoint = "".to_string();
314
315        let result = config.validate();
316        assert!(result.is_err());
317
318        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
319            message,
320        }) = result
321        {
322            assert!(message.contains("ingestion_endpoint cannot be empty"));
323        } else {
324            panic!("Expected ConfigError for empty ingestion_endpoint");
325        }
326    }
327
328    #[test]
329    fn test_config_validation_empty_table_name() {
330        let mut config = create_test_config();
331        config.table_name = "".to_string();
332
333        let result = config.validate();
334        assert!(result.is_err());
335
336        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
337            message,
338        }) = result
339        {
340            assert!(message.contains("table_name cannot be empty"));
341        } else {
342            panic!("Expected ConfigError for empty table_name");
343        }
344    }
345
346    #[test]
347    fn test_config_validation_invalid_table_name() {
348        let mut config = create_test_config();
349        config.table_name = "invalid_table".to_string(); // Missing dots
350
351        let result = config.validate();
352        assert!(result.is_err());
353
354        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
355            message,
356        }) = result
357        {
358            assert!(message.contains("catalog.schema.table"));
359        } else {
360            panic!("Expected ConfigError for invalid table_name format");
361        }
362    }
363
364    #[test]
365    fn test_config_validation_table_name_empty_segments() {
366        for bad in [
367            "catalog..table",
368            ".schema.table",
369            "catalog.schema.",
370            "..",
371            "catalog.schema.table.extra",
372        ] {
373            let mut config = create_test_config();
374            config.table_name = bad.to_string();
375            let result = config.validate();
376            assert!(result.is_err(), "expected error for table_name={bad:?}");
377            if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
378                message,
379            }) = result
380            {
381                assert!(message.contains("catalog.schema.table"));
382            } else {
383                panic!("Expected ConfigError for table_name={bad:?}");
384            }
385        }
386    }
387
388    #[test]
389    fn test_config_validation_empty_unity_catalog_endpoint() {
390        let mut config = create_test_config();
391        config.unity_catalog_endpoint = "".to_string();
392
393        let result = config.validate();
394        assert!(result.is_err());
395
396        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
397            message,
398        }) = result
399        {
400            assert!(message.contains("unity_catalog_endpoint cannot be empty"));
401        } else {
402            panic!("Expected ConfigError for empty unity_catalog_endpoint");
403        }
404    }
405
406    #[test]
407    fn test_config_validation_empty_oauth_credentials() {
408        let mut config = create_test_config();
409        config.auth = DatabricksAuthentication::OAuth {
410            client_id: SensitiveString::from("".to_string()),
411            client_secret: SensitiveString::from("test-secret".to_string()),
412        };
413
414        let result = config.validate();
415        assert!(result.is_err());
416
417        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
418            message,
419        }) = result
420        {
421            assert!(message.contains("OAuth client_id cannot be empty"));
422        } else {
423            panic!("Expected ConfigError for empty OAuth client_id");
424        }
425    }
426
427    /// When `batch.max_bytes` is `None` (user omitted the field or set it to `null`),
428    /// `into_batcher_settings()` must merge it against
429    /// `RealtimeSizeBasedDefaultBatchSettings::MAX_BYTES` (10MB) — never unbounded.
430    /// This guarantees the Zerobus SDK's 10MB limit cannot be exceeded at runtime
431    /// even without an explicit user cap.
432    #[test]
433    fn test_batch_max_bytes_none_defaults_to_10mb() {
434        let mut config = create_test_config();
435        config.batch.max_bytes = None;
436
437        let settings = config
438            .batch
439            .into_batcher_settings()
440            .expect("batch settings should build");
441
442        assert_eq!(settings.size_limit, 10_000_000);
443    }
444}