vector/sinks/clickhouse/
config.rs

1//! Configuration for the `Clickhouse` sink.
2
3use std::fmt;
4
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use vector_lib::codecs::encoding::ArrowStreamSerializerConfig;
8use vector_lib::codecs::encoding::format::SchemaProvider;
9
10use super::{
11    request_builder::ClickhouseRequestBuilder,
12    service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
13    sink::{ClickhouseSink, PartitionKey},
14};
15use crate::{
16    http::{Auth, HttpClient, MaybeAuth},
17    sinks::{
18        prelude::*,
19        util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpService},
20    },
21};
22
23/// Data format.
24///
25/// The format used to parse input/output data.
26///
27/// [formats]: https://clickhouse.com/docs/en/interfaces/formats
28#[configurable_component]
29#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash)]
30#[serde(rename_all = "snake_case")]
31#[allow(clippy::enum_variant_names)]
32pub enum Format {
33    #[default]
34    /// JSONEachRow.
35    JsonEachRow,
36
37    /// JSONAsObject.
38    JsonAsObject,
39
40    /// JSONAsString.
41    JsonAsString,
42
43    /// ArrowStream (beta).
44    #[configurable(metadata(status = "beta"))]
45    ArrowStream,
46}
47
48/// Batch encoding configuration for the `clickhouse` sink.
49#[configurable_component]
50#[derive(Clone, Debug)]
51#[serde(tag = "codec", rename_all = "snake_case")]
52#[configurable(metadata(
53    docs::enum_tag_description = "The codec to use for batch encoding events."
54))]
55pub enum ClickhouseBatchEncoding {
56    /// Encodes events in [Apache Arrow][apache_arrow] IPC streaming format.
57    ///
58    /// This is the streaming variant of the Arrow IPC format, which writes
59    /// a continuous stream of record batches.
60    ///
61    /// [apache_arrow]: https://arrow.apache.org/
62    ArrowStream(ArrowStreamSerializerConfig),
63}
64
65impl fmt::Display for Format {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        match self {
68            Format::JsonEachRow => write!(f, "JSONEachRow"),
69            Format::JsonAsObject => write!(f, "JSONAsObject"),
70            Format::JsonAsString => write!(f, "JSONAsString"),
71            Format::ArrowStream => write!(f, "ArrowStream"),
72        }
73    }
74}
75
76/// Configuration for the `clickhouse` sink.
77#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
78#[derive(Clone, Debug, Default)]
79#[serde(deny_unknown_fields)]
80pub struct ClickhouseConfig {
81    /// The endpoint of the ClickHouse server.
82    #[serde(alias = "host")]
83    #[configurable(metadata(docs::examples = "http://localhost:8123"))]
84    pub endpoint: UriSerde,
85
86    /// The table that data is inserted into.
87    #[configurable(metadata(docs::examples = "mytable"))]
88    pub table: Template,
89
90    /// The database that contains the table that data is inserted into.
91    #[configurable(metadata(docs::examples = "mydatabase"))]
92    pub database: Option<Template>,
93
94    /// The format to parse input data.
95    #[serde(default)]
96    pub format: Format,
97
98    /// Sets `input_format_skip_unknown_fields`, allowing ClickHouse to discard fields not present in the table schema.
99    ///
100    /// If left unspecified, use the default provided by the `ClickHouse` server.
101    #[serde(default)]
102    pub skip_unknown_fields: Option<bool>,
103
104    /// Sets `date_time_input_format` to `best_effort`, allowing ClickHouse to properly parse RFC3339/ISO 8601.
105    #[serde(default)]
106    pub date_time_best_effort: bool,
107
108    /// Sets `insert_distributed_one_random_shard`, allowing ClickHouse to insert data into a random shard when using Distributed Table Engine.
109    #[serde(default)]
110    pub insert_random_shard: bool,
111
112    #[configurable(derived)]
113    #[serde(default = "Compression::gzip_default")]
114    pub compression: Compression,
115
116    #[configurable(derived)]
117    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
118    pub encoding: Transformer,
119
120    /// The batch encoding configuration for encoding events in batches.
121    ///
122    /// When specified, events are encoded together as a single batch.
123    /// This is mutually exclusive with per-event encoding based on the `format` field.
124    #[configurable(derived)]
125    #[serde(default)]
126    pub batch_encoding: Option<ClickhouseBatchEncoding>,
127
128    #[configurable(derived)]
129    #[serde(default)]
130    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
131
132    #[configurable(derived)]
133    pub auth: Option<Auth>,
134
135    #[configurable(derived)]
136    #[serde(default)]
137    pub request: TowerRequestConfig,
138
139    #[configurable(derived)]
140    pub tls: Option<TlsConfig>,
141
142    #[configurable(derived)]
143    #[serde(
144        default,
145        deserialize_with = "crate::serde::bool_or_struct",
146        skip_serializing_if = "crate::serde::is_default"
147    )]
148    pub acknowledgements: AcknowledgementsConfig,
149
150    #[configurable(derived)]
151    #[serde(default)]
152    pub query_settings: QuerySettingsConfig,
153}
154
155/// Query settings for the `clickhouse` sink.
156#[configurable_component]
157#[derive(Clone, Copy, Debug, Default)]
158#[serde(deny_unknown_fields)]
159pub struct QuerySettingsConfig {
160    /// Async insert-related settings.
161    #[serde(default)]
162    pub async_insert_settings: AsyncInsertSettingsConfig,
163}
164
165/// Async insert related settings for the `clickhouse` sink.
166#[configurable_component]
167#[derive(Clone, Copy, Debug, Default)]
168#[serde(deny_unknown_fields)]
169pub struct AsyncInsertSettingsConfig {
170    /// Sets `async_insert`, allowing ClickHouse to queue the inserted data and later flush to table in the background.
171    ///
172    /// If left unspecified, use the default provided by the `ClickHouse` server.
173    #[serde(default)]
174    pub enabled: Option<bool>,
175
176    /// Sets `wait_for`, allowing ClickHouse to wait for processing of asynchronous insertion.
177    ///
178    /// If left unspecified, use the default provided by the `ClickHouse` server.
179    #[serde(default)]
180    pub wait_for_processing: Option<bool>,
181
182    /// Sets 'wait_for_processing_timeout`, to control the timeout for waiting for processing asynchronous insertion.
183    ///
184    /// If left unspecified, use the default provided by the `ClickHouse` server.
185    #[serde(default)]
186    pub wait_for_processing_timeout: Option<u64>,
187
188    /// Sets `async_insert_deduplicate`, allowing ClickHouse to perform deduplication when inserting blocks in the replicated table.
189    ///
190    /// If left unspecified, use the default provided by the `ClickHouse` server.
191    #[serde(default)]
192    pub deduplicate: Option<bool>,
193
194    /// Sets `async_insert_max_data_size`, the maximum size in bytes of unparsed data collected per query before being inserted.
195    ///
196    /// If left unspecified, use the default provided by the `ClickHouse` server.
197    #[serde(default)]
198    pub max_data_size: Option<u64>,
199
200    /// Sets `async_insert_max_query_number`, the maximum number of insert queries before being inserted
201    ///
202    /// If left unspecified, use the default provided by the `ClickHouse` server.
203    #[serde(default)]
204    pub max_query_number: Option<u64>,
205}
206
207impl_generate_config_from_default!(ClickhouseConfig);
208
209#[async_trait::async_trait]
210#[typetag::serde(name = "clickhouse")]
211impl SinkConfig for ClickhouseConfig {
212    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
213        let endpoint = self.endpoint.with_default_parts().uri;
214
215        let auth = self.auth.choose_one(&self.endpoint.auth)?;
216
217        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
218
219        let client = HttpClient::new(tls_settings, &cx.proxy)?;
220
221        let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
222            auth: auth.clone(),
223            endpoint: endpoint.clone(),
224            skip_unknown_fields: self.skip_unknown_fields,
225            date_time_best_effort: self.date_time_best_effort,
226            insert_random_shard: self.insert_random_shard,
227            compression: self.compression,
228            query_settings: self.query_settings,
229        };
230
231        let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
232            HttpService::new(client.clone(), clickhouse_service_request_builder);
233
234        let request_limits = self.request.into_settings();
235
236        let service = ServiceBuilder::new()
237            .settings(request_limits, ClickhouseRetryLogic::default())
238            .service(service);
239
240        let batch_settings = self.batch.into_batcher_settings()?;
241
242        let database = self.database.clone().unwrap_or_else(|| {
243            "default"
244                .try_into()
245                .expect("'default' should be a valid template")
246        });
247
248        // Resolve the encoding strategy (format + encoder) based on configuration
249        let (format, encoder_kind) = self
250            .resolve_strategy(&client, &endpoint, &database, auth.as_ref())
251            .await?;
252
253        let request_builder = ClickhouseRequestBuilder {
254            compression: self.compression,
255            encoder: (self.encoding.clone(), encoder_kind),
256        };
257
258        let sink = ClickhouseSink::new(
259            batch_settings,
260            service,
261            database,
262            self.table.clone(),
263            format,
264            request_builder,
265        );
266
267        let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
268
269        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
270    }
271
272    fn input(&self) -> Input {
273        Input::log()
274    }
275
276    fn acknowledgements(&self) -> &AcknowledgementsConfig {
277        &self.acknowledgements
278    }
279}
280
281impl ClickhouseConfig {
282    /// Resolves the encoding strategy (format + encoder) based on configuration.
283    ///
284    /// This method determines the appropriate ClickHouse format and Vector encoder
285    /// based on the user's configuration, ensuring they are consistent.
286    async fn resolve_strategy(
287        &self,
288        client: &HttpClient,
289        endpoint: &Uri,
290        database: &Template,
291        auth: Option<&Auth>,
292    ) -> crate::Result<(Format, vector_lib::codecs::EncoderKind)> {
293        use vector_lib::codecs::EncoderKind;
294        use vector_lib::codecs::{
295            JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer,
296        };
297
298        if let Some(batch_encoding) = &self.batch_encoding {
299            use vector_lib::codecs::BatchEncoder;
300            use vector_lib::codecs::encoding::BatchSerializerConfig;
301
302            // Validate that batch_encoding is only compatible with ArrowStream format
303            if self.format != Format::ArrowStream {
304                return Err(format!(
305                    "'batch_encoding' is only compatible with 'format: arrow_stream'. Found 'format: {}'.",
306                    self.format
307                )
308                .into());
309            }
310
311            let ClickhouseBatchEncoding::ArrowStream(arrow_config) = batch_encoding;
312            let mut arrow_config = arrow_config.clone();
313
314            self.resolve_arrow_schema(
315                client,
316                endpoint.to_string(),
317                database,
318                auth,
319                &mut arrow_config,
320            )
321            .await?;
322
323            let resolved_batch_config = BatchSerializerConfig::ArrowStream(arrow_config);
324            let batch_serializer = resolved_batch_config.build_batch_serializer()?;
325            let encoder = EncoderKind::Batch(BatchEncoder::new(batch_serializer));
326
327            return Ok((Format::ArrowStream, encoder));
328        }
329
330        let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(
331            NewlineDelimitedEncoderConfig.build().into(),
332            JsonSerializerConfig::default().build().into(),
333        )));
334
335        Ok((self.format, encoder))
336    }
337
338    async fn resolve_arrow_schema(
339        &self,
340        client: &HttpClient,
341        endpoint: String,
342        database: &Template,
343        auth: Option<&Auth>,
344        config: &mut ArrowStreamSerializerConfig,
345    ) -> crate::Result<()> {
346        use super::arrow;
347
348        if self.table.is_dynamic() || database.is_dynamic() {
349            return Err(
350                "Arrow codec requires a static table and database. Dynamic schema inference is not supported."
351                    .into(),
352            );
353        }
354
355        let table_str = self.table.get_ref();
356        let database_str = database.get_ref();
357
358        debug!(
359            "Fetching schema for table {}.{} at startup.",
360            database_str, table_str
361        );
362
363        let provider = arrow::ClickHouseSchemaProvider::new(
364            client.clone(),
365            endpoint,
366            database_str.to_string(),
367            table_str.to_string(),
368            auth.cloned(),
369        );
370
371        let schema = provider.get_schema().await.map_err(|e| {
372            format!(
373                "Failed to fetch schema for {}.{}: {}.",
374                database_str, table_str, e
375            )
376        })?;
377
378        config.schema = Some(schema);
379
380        debug!(
381            "Successfully fetched Arrow schema with {} fields.",
382            config
383                .schema
384                .as_ref()
385                .map(|s| s.fields().len())
386                .unwrap_or(0)
387        );
388
389        Ok(())
390    }
391}
392
393fn get_healthcheck_uri(endpoint: &Uri) -> String {
394    let mut uri = endpoint.to_string();
395    if !uri.ends_with('/') {
396        uri.push('/');
397    }
398    uri.push_str("?query=SELECT%201");
399    uri
400}
401
402async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
403    let uri = get_healthcheck_uri(&endpoint);
404    let mut request = Request::get(uri).body(Body::empty()).unwrap();
405
406    if let Some(auth) = auth {
407        auth.apply(&mut request);
408    }
409
410    let response = client.send(request).await?;
411
412    match response.status() {
413        StatusCode::OK => Ok(()),
414        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use vector_lib::codecs::encoding::ArrowStreamSerializerConfig;
422
423    #[test]
424    fn generate_config() {
425        crate::test_util::test_generate_config::<ClickhouseConfig>();
426    }
427
428    #[test]
429    fn test_get_healthcheck_uri() {
430        assert_eq!(
431            get_healthcheck_uri(&"http://localhost:8123".parse().unwrap()),
432            "http://localhost:8123/?query=SELECT%201"
433        );
434        assert_eq!(
435            get_healthcheck_uri(&"http://localhost:8123/".parse().unwrap()),
436            "http://localhost:8123/?query=SELECT%201"
437        );
438        assert_eq!(
439            get_healthcheck_uri(&"http://localhost:8123/path/".parse().unwrap()),
440            "http://localhost:8123/path/?query=SELECT%201"
441        );
442    }
443
444    /// Codecs other than `arrow_stream` must be rejected at parse time, since
445    /// `ClickhouseBatchEncoding` only exposes the `arrow_stream` variant.
446    #[cfg(feature = "codecs-parquet")]
447    #[test]
448    fn batch_encoding_rejects_unsupported_codec() {
449        let err = serde_yaml::from_str::<ClickhouseConfig>(
450            r#"
451            endpoint: http://localhost:8123
452            table: test_table
453            batch_encoding:
454              codec: parquet
455            "#,
456        )
457        .unwrap_err();
458
459        assert!(
460            err.to_string().contains("parquet"),
461            "expected error to mention the offending codec, got: {err}"
462        );
463    }
464
465    /// Helper to create a minimal ClickhouseConfig for testing
466    fn create_test_config(
467        format: Format,
468        batch_encoding: Option<ClickhouseBatchEncoding>,
469    ) -> ClickhouseConfig {
470        ClickhouseConfig {
471            endpoint: "http://localhost:8123".parse::<http::Uri>().unwrap().into(),
472            table: "test_table".try_into().unwrap(),
473            database: Some("test_db".try_into().unwrap()),
474            format,
475            batch_encoding,
476            ..Default::default()
477        }
478    }
479
480    #[tokio::test]
481    async fn test_format_selection_with_batch_encoding() {
482        use crate::http::HttpClient;
483        use crate::tls::TlsSettings;
484
485        // Create minimal dependencies for resolve_strategy
486        let tls = TlsSettings::default();
487        let client = HttpClient::new(tls, &Default::default()).unwrap();
488        let endpoint: http::Uri = "http://localhost:8123".parse().unwrap();
489        let database: Template = "test_db".try_into().unwrap();
490
491        // Test incompatible formats - should all return errors
492        let incompatible_formats = vec![
493            (Format::JsonEachRow, "json_each_row"),
494            (Format::JsonAsObject, "json_as_object"),
495            (Format::JsonAsString, "json_as_string"),
496        ];
497
498        for (format, format_name) in incompatible_formats {
499            let config = create_test_config(
500                format,
501                Some(ClickhouseBatchEncoding::ArrowStream(
502                    ArrowStreamSerializerConfig::default(),
503                )),
504            );
505
506            let result = config
507                .resolve_strategy(&client, &endpoint, &database, None)
508                .await;
509
510            assert!(
511                result.is_err(),
512                "Expected error for format {} with batch_encoding, but got success",
513                format_name
514            );
515        }
516    }
517
518    #[test]
519    fn test_format_selection_without_batch_encoding() {
520        // When batch_encoding is None, the configured format should be used
521        let configs = vec![
522            Format::JsonEachRow,
523            Format::JsonAsObject,
524            Format::JsonAsString,
525            Format::ArrowStream,
526        ];
527
528        for format in configs {
529            let config = create_test_config(format, None);
530
531            assert!(
532                config.batch_encoding.is_none(),
533                "batch_encoding should be None for format {:?}",
534                format
535            );
536            assert_eq!(
537                config.format, format,
538                "format should match configured value"
539            );
540        }
541    }
542}