vector/sinks/console/
config.rs

1use futures::{FutureExt, future};
2use tokio::io;
3use vector_lib::{
4    codecs::{
5        JsonSerializerConfig,
6        encoding::{Framer, FramingConfig},
7    },
8    configurable::configurable_component,
9};
10
11use crate::{
12    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
13    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
14    sinks::{Healthcheck, VectorSink, console::sink::WriterSink},
15};
16
17/// The [standard stream][standard_streams] to write to.
18///
19/// [standard_streams]: https://en.wikipedia.org/wiki/Standard_streams
20#[configurable_component]
21#[derive(Clone, Debug, Default)]
22#[serde(rename_all = "lowercase")]
23pub enum Target {
24    /// Write output to [STDOUT][stdout].
25    ///
26    /// [stdout]: https://en.wikipedia.org/wiki/Standard_streams#Standard_output_(stdout)
27    #[default]
28    Stdout,
29
30    /// Write output to [STDERR][stderr].
31    ///
32    /// [stderr]: https://en.wikipedia.org/wiki/Standard_streams#Standard_error_(stderr)
33    Stderr,
34}
35
36/// Configuration for the `console` sink.
37#[configurable_component(sink(
38    "console",
39    "Display observability events in the console, which can be useful for debugging purposes."
40))]
41#[derive(Clone, Debug)]
42#[serde(deny_unknown_fields)]
43pub struct ConsoleSinkConfig {
44    #[configurable(derived)]
45    #[serde(default = "default_target")]
46    pub target: Target,
47
48    #[serde(flatten)]
49    pub encoding: EncodingConfigWithFraming,
50
51    #[configurable(derived)]
52    #[serde(
53        default,
54        deserialize_with = "crate::serde::bool_or_struct",
55        skip_serializing_if = "crate::serde::is_default"
56    )]
57    pub acknowledgements: AcknowledgementsConfig,
58}
59
60const fn default_target() -> Target {
61    Target::Stdout
62}
63
64impl GenerateConfig for ConsoleSinkConfig {
65    fn generate_config() -> toml::Value {
66        toml::Value::try_from(Self {
67            target: Target::Stdout,
68            encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
69            acknowledgements: Default::default(),
70        })
71        .unwrap()
72    }
73}
74
75#[async_trait::async_trait]
76#[typetag::serde(name = "console")]
77impl SinkConfig for ConsoleSinkConfig {
78    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
79        let transformer = self.encoding.transformer();
80        let (framer, serializer) = self.encoding.build(SinkType::StreamBased)?;
81        let encoder = Encoder::<Framer>::new(framer, serializer);
82
83        let sink: VectorSink = match self.target {
84            Target::Stdout => VectorSink::from_event_streamsink(WriterSink {
85                output: io::stdout(),
86                transformer,
87                encoder,
88            }),
89            Target::Stderr => VectorSink::from_event_streamsink(WriterSink {
90                output: io::stderr(),
91                transformer,
92                encoder,
93            }),
94        };
95
96        Ok((sink, future::ok(()).boxed()))
97    }
98
99    fn input(&self) -> Input {
100        Input::new(self.encoding.config().1.input_type())
101    }
102
103    fn acknowledgements(&self) -> &AcknowledgementsConfig {
104        &self.acknowledgements
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111
112    #[test]
113    fn generate_config() {
114        crate::test_util::test_generate_config::<ConsoleSinkConfig>();
115    }
116}