1use std::fmt;
4
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use vector_lib::codecs::encoding::ArrowStreamSerializerConfig;
8use vector_lib::codecs::encoding::format::SchemaProvider;
9
10use super::{
11 request_builder::ClickhouseRequestBuilder,
12 service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
13 sink::{ClickhouseSink, PartitionKey},
14};
15use crate::{
16 http::{Auth, HttpClient, MaybeAuth},
17 sinks::{
18 prelude::*,
19 util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpService},
20 },
21};
22
23#[configurable_component]
29#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash)]
30#[serde(rename_all = "snake_case")]
31#[allow(clippy::enum_variant_names)]
32pub enum Format {
33 #[default]
34 JsonEachRow,
36
37 JsonAsObject,
39
40 JsonAsString,
42
43 #[configurable(metadata(status = "beta"))]
45 ArrowStream,
46}
47
48#[configurable_component]
50#[derive(Clone, Debug)]
51#[serde(tag = "codec", rename_all = "snake_case")]
52#[configurable(metadata(
53 docs::enum_tag_description = "The codec to use for batch encoding events."
54))]
55pub enum ClickhouseBatchEncoding {
56 ArrowStream(ArrowStreamSerializerConfig),
63}
64
65impl fmt::Display for Format {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 match self {
68 Format::JsonEachRow => write!(f, "JSONEachRow"),
69 Format::JsonAsObject => write!(f, "JSONAsObject"),
70 Format::JsonAsString => write!(f, "JSONAsString"),
71 Format::ArrowStream => write!(f, "ArrowStream"),
72 }
73 }
74}
75
76#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
78#[derive(Clone, Debug, Default)]
79#[serde(deny_unknown_fields)]
80pub struct ClickhouseConfig {
81 #[serde(alias = "host")]
83 #[configurable(metadata(docs::examples = "http://localhost:8123"))]
84 pub endpoint: UriSerde,
85
86 #[configurable(metadata(docs::examples = "mytable"))]
88 pub table: Template,
89
90 #[configurable(metadata(docs::examples = "mydatabase"))]
92 pub database: Option<Template>,
93
94 #[serde(default)]
96 pub format: Format,
97
98 #[serde(default)]
102 pub skip_unknown_fields: Option<bool>,
103
104 #[serde(default)]
106 pub date_time_best_effort: bool,
107
108 #[serde(default)]
110 pub insert_random_shard: bool,
111
112 #[configurable(derived)]
113 #[serde(default = "Compression::gzip_default")]
114 pub compression: Compression,
115
116 #[configurable(derived)]
117 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
118 pub encoding: Transformer,
119
120 #[configurable(derived)]
125 #[serde(default)]
126 pub batch_encoding: Option<ClickhouseBatchEncoding>,
127
128 #[configurable(derived)]
129 #[serde(default)]
130 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
131
132 #[configurable(derived)]
133 pub auth: Option<Auth>,
134
135 #[configurable(derived)]
136 #[serde(default)]
137 pub request: TowerRequestConfig,
138
139 #[configurable(derived)]
140 pub tls: Option<TlsConfig>,
141
142 #[configurable(derived)]
143 #[serde(
144 default,
145 deserialize_with = "crate::serde::bool_or_struct",
146 skip_serializing_if = "crate::serde::is_default"
147 )]
148 pub acknowledgements: AcknowledgementsConfig,
149
150 #[configurable(derived)]
151 #[serde(default)]
152 pub query_settings: QuerySettingsConfig,
153}
154
155#[configurable_component]
157#[derive(Clone, Copy, Debug, Default)]
158#[serde(deny_unknown_fields)]
159pub struct QuerySettingsConfig {
160 #[serde(default)]
162 pub async_insert_settings: AsyncInsertSettingsConfig,
163}
164
165#[configurable_component]
167#[derive(Clone, Copy, Debug, Default)]
168#[serde(deny_unknown_fields)]
169pub struct AsyncInsertSettingsConfig {
170 #[serde(default)]
174 pub enabled: Option<bool>,
175
176 #[serde(default)]
180 pub wait_for_processing: Option<bool>,
181
182 #[serde(default)]
186 pub wait_for_processing_timeout: Option<u64>,
187
188 #[serde(default)]
192 pub deduplicate: Option<bool>,
193
194 #[serde(default)]
198 pub max_data_size: Option<u64>,
199
200 #[serde(default)]
204 pub max_query_number: Option<u64>,
205}
206
207impl_generate_config_from_default!(ClickhouseConfig);
208
209#[async_trait::async_trait]
210#[typetag::serde(name = "clickhouse")]
211impl SinkConfig for ClickhouseConfig {
212 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
213 let endpoint = self.endpoint.with_default_parts().uri;
214
215 let auth = self.auth.choose_one(&self.endpoint.auth)?;
216
217 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
218
219 let client = HttpClient::new(tls_settings, &cx.proxy)?;
220
221 let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
222 auth: auth.clone(),
223 endpoint: endpoint.clone(),
224 skip_unknown_fields: self.skip_unknown_fields,
225 date_time_best_effort: self.date_time_best_effort,
226 insert_random_shard: self.insert_random_shard,
227 compression: self.compression,
228 query_settings: self.query_settings,
229 };
230
231 let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
232 HttpService::new(client.clone(), clickhouse_service_request_builder);
233
234 let request_limits = self.request.into_settings();
235
236 let service = ServiceBuilder::new()
237 .settings(request_limits, ClickhouseRetryLogic::default())
238 .service(service);
239
240 let batch_settings = self.batch.into_batcher_settings()?;
241
242 let database = self.database.clone().unwrap_or_else(|| {
243 "default"
244 .try_into()
245 .expect("'default' should be a valid template")
246 });
247
248 let (format, encoder_kind) = self
250 .resolve_strategy(&client, &endpoint, &database, auth.as_ref())
251 .await?;
252
253 let request_builder = ClickhouseRequestBuilder {
254 compression: self.compression,
255 encoder: (self.encoding.clone(), encoder_kind),
256 };
257
258 let sink = ClickhouseSink::new(
259 batch_settings,
260 service,
261 database,
262 self.table.clone(),
263 format,
264 request_builder,
265 );
266
267 let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
268
269 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
270 }
271
272 fn input(&self) -> Input {
273 Input::log()
274 }
275
276 fn acknowledgements(&self) -> &AcknowledgementsConfig {
277 &self.acknowledgements
278 }
279}
280
281impl ClickhouseConfig {
282 async fn resolve_strategy(
287 &self,
288 client: &HttpClient,
289 endpoint: &Uri,
290 database: &Template,
291 auth: Option<&Auth>,
292 ) -> crate::Result<(Format, vector_lib::codecs::EncoderKind)> {
293 use vector_lib::codecs::EncoderKind;
294 use vector_lib::codecs::{
295 JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer,
296 };
297
298 if let Some(batch_encoding) = &self.batch_encoding {
299 use vector_lib::codecs::BatchEncoder;
300 use vector_lib::codecs::encoding::BatchSerializerConfig;
301
302 if self.format != Format::ArrowStream {
304 return Err(format!(
305 "'batch_encoding' is only compatible with 'format: arrow_stream'. Found 'format: {}'.",
306 self.format
307 )
308 .into());
309 }
310
311 let ClickhouseBatchEncoding::ArrowStream(arrow_config) = batch_encoding;
312 let mut arrow_config = arrow_config.clone();
313
314 self.resolve_arrow_schema(
315 client,
316 endpoint.to_string(),
317 database,
318 auth,
319 &mut arrow_config,
320 )
321 .await?;
322
323 let resolved_batch_config = BatchSerializerConfig::ArrowStream(arrow_config);
324 let batch_serializer = resolved_batch_config.build_batch_serializer()?;
325 let encoder = EncoderKind::Batch(BatchEncoder::new(batch_serializer));
326
327 return Ok((Format::ArrowStream, encoder));
328 }
329
330 let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(
331 NewlineDelimitedEncoderConfig.build().into(),
332 JsonSerializerConfig::default().build().into(),
333 )));
334
335 Ok((self.format, encoder))
336 }
337
338 async fn resolve_arrow_schema(
339 &self,
340 client: &HttpClient,
341 endpoint: String,
342 database: &Template,
343 auth: Option<&Auth>,
344 config: &mut ArrowStreamSerializerConfig,
345 ) -> crate::Result<()> {
346 use super::arrow;
347
348 if self.table.is_dynamic() || database.is_dynamic() {
349 return Err(
350 "Arrow codec requires a static table and database. Dynamic schema inference is not supported."
351 .into(),
352 );
353 }
354
355 let table_str = self.table.get_ref();
356 let database_str = database.get_ref();
357
358 debug!(
359 "Fetching schema for table {}.{} at startup.",
360 database_str, table_str
361 );
362
363 let provider = arrow::ClickHouseSchemaProvider::new(
364 client.clone(),
365 endpoint,
366 database_str.to_string(),
367 table_str.to_string(),
368 auth.cloned(),
369 );
370
371 let schema = provider.get_schema().await.map_err(|e| {
372 format!(
373 "Failed to fetch schema for {}.{}: {}.",
374 database_str, table_str, e
375 )
376 })?;
377
378 config.schema = Some(schema);
379
380 debug!(
381 "Successfully fetched Arrow schema with {} fields.",
382 config
383 .schema
384 .as_ref()
385 .map(|s| s.fields().len())
386 .unwrap_or(0)
387 );
388
389 Ok(())
390 }
391}
392
393fn get_healthcheck_uri(endpoint: &Uri) -> String {
394 let mut uri = endpoint.to_string();
395 if !uri.ends_with('/') {
396 uri.push('/');
397 }
398 uri.push_str("?query=SELECT%201");
399 uri
400}
401
402async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
403 let uri = get_healthcheck_uri(&endpoint);
404 let mut request = Request::get(uri).body(Body::empty()).unwrap();
405
406 if let Some(auth) = auth {
407 auth.apply(&mut request);
408 }
409
410 let response = client.send(request).await?;
411
412 match response.status() {
413 StatusCode::OK => Ok(()),
414 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use vector_lib::codecs::encoding::ArrowStreamSerializerConfig;
422
423 #[test]
424 fn generate_config() {
425 crate::test_util::test_generate_config::<ClickhouseConfig>();
426 }
427
428 #[test]
429 fn test_get_healthcheck_uri() {
430 assert_eq!(
431 get_healthcheck_uri(&"http://localhost:8123".parse().unwrap()),
432 "http://localhost:8123/?query=SELECT%201"
433 );
434 assert_eq!(
435 get_healthcheck_uri(&"http://localhost:8123/".parse().unwrap()),
436 "http://localhost:8123/?query=SELECT%201"
437 );
438 assert_eq!(
439 get_healthcheck_uri(&"http://localhost:8123/path/".parse().unwrap()),
440 "http://localhost:8123/path/?query=SELECT%201"
441 );
442 }
443
444 #[cfg(feature = "codecs-parquet")]
447 #[test]
448 fn batch_encoding_rejects_unsupported_codec() {
449 let err = serde_yaml::from_str::<ClickhouseConfig>(
450 r#"
451 endpoint: http://localhost:8123
452 table: test_table
453 batch_encoding:
454 codec: parquet
455 "#,
456 )
457 .unwrap_err();
458
459 assert!(
460 err.to_string().contains("parquet"),
461 "expected error to mention the offending codec, got: {err}"
462 );
463 }
464
465 fn create_test_config(
467 format: Format,
468 batch_encoding: Option<ClickhouseBatchEncoding>,
469 ) -> ClickhouseConfig {
470 ClickhouseConfig {
471 endpoint: "http://localhost:8123".parse::<http::Uri>().unwrap().into(),
472 table: "test_table".try_into().unwrap(),
473 database: Some("test_db".try_into().unwrap()),
474 format,
475 batch_encoding,
476 ..Default::default()
477 }
478 }
479
480 #[tokio::test]
481 async fn test_format_selection_with_batch_encoding() {
482 use crate::http::HttpClient;
483 use crate::tls::TlsSettings;
484
485 let tls = TlsSettings::default();
487 let client = HttpClient::new(tls, &Default::default()).unwrap();
488 let endpoint: http::Uri = "http://localhost:8123".parse().unwrap();
489 let database: Template = "test_db".try_into().unwrap();
490
491 let incompatible_formats = vec![
493 (Format::JsonEachRow, "json_each_row"),
494 (Format::JsonAsObject, "json_as_object"),
495 (Format::JsonAsString, "json_as_string"),
496 ];
497
498 for (format, format_name) in incompatible_formats {
499 let config = create_test_config(
500 format,
501 Some(ClickhouseBatchEncoding::ArrowStream(
502 ArrowStreamSerializerConfig::default(),
503 )),
504 );
505
506 let result = config
507 .resolve_strategy(&client, &endpoint, &database, None)
508 .await;
509
510 assert!(
511 result.is_err(),
512 "Expected error for format {} with batch_encoding, but got success",
513 format_name
514 );
515 }
516 }
517
518 #[test]
519 fn test_format_selection_without_batch_encoding() {
520 let configs = vec![
522 Format::JsonEachRow,
523 Format::JsonAsObject,
524 Format::JsonAsString,
525 Format::ArrowStream,
526 ];
527
528 for format in configs {
529 let config = create_test_config(format, None);
530
531 assert!(
532 config.batch_encoding.is_none(),
533 "batch_encoding should be None for format {:?}",
534 format
535 );
536 assert_eq!(
537 config.format, format,
538 "format should match configured value"
539 );
540 }
541 }
542}