1use std::sync::Arc;
2
3use futures::{Sink, Stream, stream};
4use futures_util::{FutureExt, StreamExt, future, stream::BoxStream};
5use tokio::sync::{Mutex, oneshot};
6use vector_lib::{
7 config::{DataType, Input, LogNamespace},
8 configurable::configurable_component,
9 event::Event,
10 schema,
11 sink::{StreamSink, VectorSink},
12};
13
14use crate::{
15 conditions::Condition,
16 config::{
17 AcknowledgementsConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
18 },
19 sinks::Healthcheck,
20 sources,
21};
22
23#[configurable_component(source("unit_test", "Unit test."))]
25#[derive(Clone, Debug, Default)]
26pub struct UnitTestSourceConfig {
27 #[serde(skip)]
29 pub events: Vec<Event>,
30}
31
32impl_generate_config_from_default!(UnitTestSourceConfig);
33
34#[async_trait::async_trait]
35#[typetag::serde(name = "unit_test")]
36impl SourceConfig for UnitTestSourceConfig {
37 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
38 let events = self.events.clone().into_iter();
39
40 Ok(Box::pin(async move {
41 let mut out = cx.out;
42 let _shutdown = cx.shutdown;
43 out.send_batch(events).await.map_err(|_| ())?;
44 Ok(())
45 }))
46 }
47
48 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
49 vec![SourceOutput::new_maybe_logs(
50 DataType::all_bits(),
51 schema::Definition::default_legacy_namespace(),
52 )]
53 }
54
55 fn can_acknowledge(&self) -> bool {
56 false
57 }
58}
59
60#[configurable_component(source("unit_test_stream", "Unit test stream."))]
62#[derive(Clone)]
63pub struct UnitTestStreamSourceConfig {
64 #[serde(skip)]
65 stream: Arc<Mutex<Option<stream::BoxStream<'static, Event>>>>,
66}
67
68impl_generate_config_from_default!(UnitTestStreamSourceConfig);
69
70impl UnitTestStreamSourceConfig {
71 pub fn new(stream: impl Stream<Item = Event> + Send + 'static) -> Self {
72 Self {
73 stream: Arc::new(Mutex::new(Some(stream.boxed()))),
74 }
75 }
76}
77
78impl Default for UnitTestStreamSourceConfig {
79 fn default() -> Self {
80 Self::new(stream::empty().boxed())
81 }
82}
83
84impl std::fmt::Debug for UnitTestStreamSourceConfig {
85 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 formatter
87 .debug_struct("UnitTestStreamSourceConfig")
88 .finish()
89 }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "unit_test_stream")]
94impl SourceConfig for UnitTestStreamSourceConfig {
95 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
96 let stream = self.stream.lock().await.take().unwrap();
97 Ok(Box::pin(async move {
98 let mut out = cx.out;
99 let _shutdown = cx.shutdown;
100 out.send_event_stream(stream).await.map_err(|_| ())?;
101 Ok(())
102 }))
103 }
104
105 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
106 vec![SourceOutput::new_maybe_logs(
107 DataType::all_bits(),
108 schema::Definition::default_legacy_namespace(),
109 )]
110 }
111
112 fn can_acknowledge(&self) -> bool {
113 false
114 }
115}
116
117#[derive(Clone, Default)]
118pub enum UnitTestSinkCheck {
119 Checks {
121 conditions: Vec<Vec<Condition>>,
122 expected_event_count: Option<usize>,
123 },
124
125 NoOutputs,
127
128 #[default]
130 NoOp,
131}
132
133#[derive(Debug)]
134pub struct UnitTestSinkResult {
135 pub test_name: String,
136 pub test_errors: Vec<String>,
137}
138
139#[configurable_component(sink("unit_test", "Unit test."))]
141#[derive(Clone, Default, Derivative)]
142#[derivative(Debug)]
143pub struct UnitTestSinkConfig {
144 pub test_name: String,
146
147 pub transform_ids: Vec<String>,
149
150 #[serde(skip)]
152 pub result_tx: Arc<Mutex<Option<oneshot::Sender<UnitTestSinkResult>>>>,
153
154 #[serde(skip)]
156 #[derivative(Debug = "ignore")]
157 pub check: UnitTestSinkCheck,
158}
159
160impl_generate_config_from_default!(UnitTestSinkConfig);
161
162#[async_trait::async_trait]
163#[typetag::serde(name = "unit_test")]
164impl SinkConfig for UnitTestSinkConfig {
165 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
166 let tx = self.result_tx.lock().await.take();
167 let sink = UnitTestSink {
168 test_name: self.test_name.clone(),
169 transform_ids: self.transform_ids.clone(),
170 result_tx: tx,
171 check: self.check.clone(),
172 };
173 let healthcheck = future::ok(()).boxed();
174
175 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
176 }
177
178 fn input(&self) -> Input {
179 Input::all()
180 }
181
182 fn acknowledgements(&self) -> &AcknowledgementsConfig {
183 &AcknowledgementsConfig::DEFAULT
184 }
185}
186
187pub struct UnitTestSink {
188 pub test_name: String,
189 pub transform_ids: Vec<String>,
190 pub result_tx: Option<oneshot::Sender<UnitTestSinkResult>>,
192 pub check: UnitTestSinkCheck,
193}
194
195#[async_trait::async_trait]
196impl StreamSink<Event> for UnitTestSink {
197 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
198 let mut output_events = Vec::new();
199 let mut result = UnitTestSinkResult {
200 test_name: self.test_name,
201 test_errors: Vec::new(),
202 };
203
204 while let Some(event) = input.next().await {
205 output_events.push(event);
206 }
207
208 match self.check {
209 UnitTestSinkCheck::Checks {
210 conditions: checks,
211 expected_event_count,
212 } => {
213 if let Some(expected) = expected_event_count {
214 let actual = output_events.len();
215 if actual != expected {
216 result.test_errors.push(format!(
217 "expected {} events from transforms {:?}, but received {}",
218 expected, self.transform_ids, actual
219 ));
220 }
221 }
222
223 if output_events.is_empty() && expected_event_count != Some(0) {
224 result
225 .test_errors
226 .push(format!("checks for transforms {:?} failed: no events received. Topology may be disconnected or transform is missing inputs.", self.transform_ids));
227 } else {
228 for (i, check) in checks.iter().enumerate() {
229 let mut check_errors = Vec::new();
230 for (j, condition) in check.iter().enumerate() {
231 let mut condition_errors = Vec::new();
232 for event in output_events.iter() {
233 match condition.check_with_context(event.clone()).0 {
234 Ok(_) => {
235 condition_errors.clear();
236 break;
237 }
238 Err(error) => {
239 condition_errors.push(format!(" condition[{j}]: {error}"));
240 }
241 }
242 }
243 check_errors.extend(condition_errors);
244 }
245 if !check_errors.is_empty() {
247 check_errors.insert(
248 0,
249 format!(
250 "check[{}] for transforms {:?} failed conditions:",
251 i, self.transform_ids
252 ),
253 );
254 }
255
256 result.test_errors.extend(check_errors);
257 }
258
259 if !result.test_errors.is_empty() {
261 result.test_errors.push(format!(
262 "output payloads from {:?} (events encoded as JSON):\n {}",
263 self.transform_ids,
264 events_to_string(&output_events)
265 ));
266 }
267 }
268 }
269 UnitTestSinkCheck::NoOutputs => {
270 if !output_events.is_empty() {
271 result.test_errors.push(format!(
272 "check for transforms {:?} failed: expected no outputs",
273 self.transform_ids
274 ));
275 }
276 }
277 UnitTestSinkCheck::NoOp => {}
278 }
279
280 if let Some(tx) = self.result_tx
281 && tx.send(result).is_err()
282 {
283 error!(message = "Sending unit test results failed in unit test sink.");
284 }
285 Ok(())
286 }
287}
288
289#[configurable_component(sink("unit_test_stream", "Unit test stream."))]
291#[derive(Clone, Default)]
292pub struct UnitTestStreamSinkConfig {
293 #[serde(skip)]
295 sink: Arc<Mutex<Option<Box<dyn Sink<Event, Error = ()> + Send + Unpin>>>>,
296}
297
298impl_generate_config_from_default!(UnitTestStreamSinkConfig);
299
300impl UnitTestStreamSinkConfig {
301 pub fn new(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
302 Self {
303 sink: Arc::new(Mutex::new(Some(Box::new(sink)))),
304 }
305 }
306}
307
308impl std::fmt::Debug for UnitTestStreamSinkConfig {
309 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 formatter.debug_struct("UnitTestStreamSinkConfig").finish()
311 }
312}
313
314#[async_trait::async_trait]
315#[typetag::serde(name = "unit_test_stream")]
316impl SinkConfig for UnitTestStreamSinkConfig {
317 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
318 let sink = self.sink.lock().await.take().unwrap();
319 let healthcheck = future::ok(()).boxed();
320
321 #[allow(deprecated)]
322 Ok((VectorSink::from_event_sink(sink), healthcheck))
323 }
324
325 fn input(&self) -> Input {
326 Input::all()
327 }
328
329 fn acknowledgements(&self) -> &AcknowledgementsConfig {
330 &AcknowledgementsConfig::DEFAULT
331 }
332}
333
334fn events_to_string(events: &[Event]) -> String {
335 events
336 .iter()
337 .map(|event| match event {
338 Event::Log(log) => serde_json::to_string(log).unwrap_or_else(|_| "{}".to_string()),
339 Event::Metric(metric) => {
340 serde_json::to_string(metric).unwrap_or_else(|_| "{}".to_string())
341 }
342 Event::Trace(trace) => {
343 serde_json::to_string(trace).unwrap_or_else(|_| "{}".to_string())
344 }
345 })
346 .collect::<Vec<_>>()
347 .join("\n ")
348}