1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock, Mutex},
6 time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
12use tokio::{
13 select,
14 sync::{mpsc::UnboundedSender, oneshot},
15 time::timeout,
16};
17use tracing::Instrument;
18use vector_lib::{
19 EstimatedJsonEncodedSizeOf,
20 buffers::{
21 BufferType, WhenFull,
22 topology::{
23 builder::TopologyBuilder,
24 channel::{
25 BufferChannelKind, BufferReceiver, BufferSender, ChannelMetricMetadata,
26 LimitedReceiver,
27 },
28 },
29 },
30 internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
31 latency::LatencyRecorder,
32 schema::Definition,
33 source_sender::{CHUNK_SIZE, SourceSenderItem},
34 transform::update_runtime_schema_definition,
35};
36use vector_lib::{gauge, internal_event::GaugeName};
37use vector_vrl_metrics::MetricsStorage;
38
39use super::{
40 BuiltBuffer, ConfigDiff,
41 fanout::{self, Fanout},
42 schema,
43 task::{Task, TaskOutput, TaskResult},
44};
45use crate::{
46 SourceSender,
47 config::{
48 ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
49 ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
50 },
51 event::{EventArray, EventContainer},
52 extra_context::ExtraContext,
53 internal_events::EventsReceived,
54 shutdown::SourceShutdownCoordinator,
55 spawn_named,
56 topology::task::TaskError,
57 transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
58 utilization::{
59 OutputUtilization, Utilization, UtilizationComponentSender, UtilizationEmitter,
60 UtilizationRegistry,
61 },
62};
63
64static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
65 LazyLock::new(vector_lib::enrichment::TableRegistry::default);
66static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
67
68pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
69 LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
70
71const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
72pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
73
74static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
75 crate::app::worker_threads()
76 .map(std::num::NonZeroUsize::get)
77 .unwrap_or_else(crate::num_threads)
78});
79
80const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
81
82struct Builder<'a> {
83 config: &'a super::Config,
84 diff: &'a ConfigDiff,
85 shutdown_coordinator: SourceShutdownCoordinator,
86 errors: Vec<String>,
87 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
88 tasks: HashMap<ComponentKey, Task>,
89 buffers: HashMap<ComponentKey, BuiltBuffer>,
90 inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
91 healthchecks: HashMap<ComponentKey, Task>,
92 detach_triggers: HashMap<ComponentKey, Trigger>,
93 extra_context: ExtraContext,
94 utilization_emitter: Option<UtilizationEmitter>,
95 utilization_registry: UtilizationRegistry,
96}
97
98impl<'a> Builder<'a> {
99 fn new(
100 config: &'a super::Config,
101 diff: &'a ConfigDiff,
102 buffers: HashMap<ComponentKey, BuiltBuffer>,
103 extra_context: ExtraContext,
104 utilization_registry: Option<UtilizationRegistry>,
105 ) -> Self {
106 let (emitter, registry) = if let Some(registry) = utilization_registry {
109 (None, registry)
110 } else {
111 let (emitter, registry) = UtilizationEmitter::new();
112 (Some(emitter), registry)
113 };
114 Self {
115 config,
116 diff,
117 buffers,
118 shutdown_coordinator: SourceShutdownCoordinator::default(),
119 errors: vec![],
120 outputs: HashMap::new(),
121 tasks: HashMap::new(),
122 inputs: HashMap::new(),
123 healthchecks: HashMap::new(),
124 detach_triggers: HashMap::new(),
125 extra_context,
126 utilization_emitter: emitter,
127 utilization_registry: registry,
128 }
129 }
130
131 async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
133 let enrichment_tables = self.load_enrichment_tables().await;
134 let source_tasks = self.build_sources(enrichment_tables).await;
135 self.build_transforms(enrichment_tables).await;
136 self.build_sinks(enrichment_tables).await;
137
138 enrichment_tables.finish_load();
141
142 if self.errors.is_empty() {
143 Ok(TopologyPieces {
144 inputs: self.inputs,
145 outputs: Self::finalize_outputs(self.outputs),
146 tasks: self.tasks,
147 source_tasks,
148 healthchecks: self.healthchecks,
149 shutdown_coordinator: self.shutdown_coordinator,
150 detach_triggers: self.detach_triggers,
151 metrics_storage: METRICS_STORAGE.clone(),
152 utilization: self
153 .utilization_emitter
154 .map(|e| (e, self.utilization_registry)),
155 })
156 } else {
157 Err(self.errors)
158 }
159 }
160
161 fn finalize_outputs(
162 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
163 ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
164 {
165 let mut finalized_outputs = HashMap::new();
166 for (id, output) in outputs {
167 let entry = finalized_outputs
168 .entry(id.component)
169 .or_insert_with(HashMap::new);
170 entry.insert(id.port, output);
171 }
172
173 finalized_outputs
174 }
175
176 async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
179 let mut enrichment_tables = HashMap::new();
180
181 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
183 let table_name = name.to_string();
184 if ENRICHMENT_TABLES.needs_reload(&table_name) {
185 let indexes = if !self.diff.enrichment_tables.is_added(name) {
186 Some(ENRICHMENT_TABLES.index_fields(&table_name))
189 } else {
190 None
191 };
192
193 let mut table = match table_outer.inner.build(&self.config.global).await {
194 Ok(table) => table,
195 Err(error) => {
196 self.errors
197 .push(format!("Enrichment Table \"{name}\": {error}"));
198 continue;
199 }
200 };
201
202 if let Some(indexes) = indexes {
203 for (case, index) in indexes {
204 match table
205 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
206 {
207 Ok(_) => (),
208 Err(error) => {
209 error!(message = "Unable to add index to reloaded enrichment table.",
213 table = ?name.to_string(),
214 %error);
215 continue 'tables;
216 }
217 }
218 }
219 }
220
221 enrichment_tables.insert(table_name, table);
222 }
223 }
224
225 ENRICHMENT_TABLES.load(enrichment_tables);
226
227 &ENRICHMENT_TABLES
228 }
229
230 async fn build_sources(
231 &mut self,
232 enrichment_tables: &vector_lib::enrichment::TableRegistry,
233 ) -> HashMap<ComponentKey, Task> {
234 let mut source_tasks = HashMap::new();
235
236 let table_sources = self
237 .config
238 .enrichment_tables
239 .iter()
240 .filter_map(|(key, table)| table.as_source(key))
241 .collect::<Vec<_>>();
242 for (key, source) in self
243 .config
244 .sources()
245 .filter(|(key, _)| self.diff.sources.contains_new(key))
246 .chain(
247 table_sources
248 .iter()
249 .map(|(key, source)| (key, source))
250 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
251 )
252 {
253 debug!(component_id = %key, "Building new source.");
254
255 let typetag = source.inner.get_component_name();
256 let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
257
258 let span = error_span!(
259 "source",
260 component_kind = "source",
261 component_id = %key.id(),
262 component_type = %source.inner.get_component_name(),
263 );
264 let _entered_span = span.enter();
265
266 let task_name = format!(
267 ">> {} ({}, pump) >>",
268 source.inner.get_component_name(),
269 key.id()
270 );
271
272 let mut builder = SourceSender::builder()
273 .with_buffer(*SOURCE_SENDER_BUFFER_SIZE)
274 .with_timeout(source.inner.send_timeout())
275 .with_ewma_half_life_seconds(
276 self.config.global.buffer_utilization_ewma_half_life_seconds,
277 );
278 let mut pumps = Vec::new();
279 let mut controls = HashMap::new();
280 let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
281
282 for output in source_outputs.into_iter() {
283 let rx = builder.add_source_output(output.clone(), key.clone());
284
285 let (fanout, control) = Fanout::new();
286 let source_type = source.inner.get_component_name();
287 let source = Arc::new(key.clone());
288
289 let pump = run_source_output_pump(rx, fanout, source, source_type);
290
291 pumps.push(pump.instrument(span.clone()));
292 controls.insert(
293 OutputId {
294 component: key.clone(),
295 port: output.port.clone(),
296 },
297 control,
298 );
299
300 let port = output.port.clone();
301 if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
302 schema_definitions.insert(port, definition);
303 }
304 }
305
306 let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
307 let pump = async move {
308 debug!("Source pump supervisor starting.");
309
310 let mut handles = FuturesUnordered::new();
315 for pump in pumps {
316 handles.push(spawn_named(pump, task_name.as_ref()));
317 }
318
319 let mut had_pump_error = false;
320 while let Some(output) = handles.try_next().await? {
321 if let Err(e) = output {
322 _ = pump_error_tx.send(e);
325 had_pump_error = true;
326 break;
327 }
328 }
329
330 if had_pump_error {
331 debug!("Source pump supervisor task finished with an error.");
332 } else {
333 debug!("Source pump supervisor task finished normally.");
334 }
335 Ok(TaskOutput::Source)
336 };
337 let pump = Task::new(key.clone(), typetag, pump);
338
339 let (shutdown_signal, force_shutdown_tripwire) = self
340 .shutdown_coordinator
341 .register_source(key, INTERNAL_SOURCES.contains(&typetag));
342
343 let context = SourceContext {
344 key: key.clone(),
345 globals: self.config.global.clone(),
346 enrichment_tables: enrichment_tables.clone(),
347 metrics_storage: METRICS_STORAGE.clone(),
348 shutdown: shutdown_signal,
349 out: builder.build(),
350 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
351 acknowledgements: source.sink_acknowledgements,
352 schema_definitions,
353 schema: self.config.schema,
354 extra_context: self.extra_context.clone(),
355 };
356 let server = match source.inner.build(context).await {
357 Err(error) => {
358 self.errors.push(format!("Source \"{key}\": {error}"));
359 continue;
360 }
361 Ok(server) => server,
362 };
363
364 let server = async move {
372 debug!("Source starting.");
373
374 let mut result = select! {
375 biased;
376
377 _ = force_shutdown_tripwire => Ok(()),
379
380 Ok(e) = &mut pump_error_rx => Err(e),
386
387 result = server => result.map_err(|_| TaskError::Opaque),
389 };
390
391 if let Ok(e) = pump_error_rx.try_recv() {
400 result = Err(e);
401 }
402
403 match result {
404 Ok(()) => {
405 debug!("Source finished normally.");
406 Ok(TaskOutput::Source)
407 }
408 Err(e) => {
409 debug!("Source finished with an error.");
410 Err(e)
411 }
412 }
413 };
414 let server = Task::new(key.clone(), typetag, server);
415
416 self.outputs.extend(controls);
417 self.tasks.insert(key.clone(), pump);
418 source_tasks.insert(key.clone(), server);
419 }
420
421 source_tasks
422 }
423
424 async fn build_transforms(
425 &mut self,
426 enrichment_tables: &vector_lib::enrichment::TableRegistry,
427 ) {
428 let mut definition_cache = HashMap::default();
429
430 for (key, transform) in self
431 .config
432 .transforms()
433 .filter(|(key, _)| self.diff.transforms.contains_new(key))
434 {
435 debug!(component_id = %key, "Building new transform.");
436
437 let input_definitions = match schema::input_definitions(
438 &transform.inputs,
439 self.config,
440 enrichment_tables.clone(),
441 &mut definition_cache,
442 ) {
443 Ok(definitions) => definitions,
444 Err(_) => {
445 return;
449 }
450 };
451
452 let merged_definition: Definition = input_definitions
453 .iter()
454 .map(|(_output_id, definition)| definition.clone())
455 .reduce(Definition::merge)
456 .unwrap_or_else(Definition::any);
458
459 let span = error_span!(
460 "transform",
461 component_kind = "transform",
462 component_id = %key.id(),
463 component_type = %transform.inner.get_component_name(),
464 );
465 let _span = span.enter();
466
467 let schema_definitions = transform
469 .inner
470 .outputs(
471 &TransformContext {
472 enrichment_tables: enrichment_tables.clone(),
473 metrics_storage: METRICS_STORAGE.clone(),
474 schema: self.config.schema,
475 ..Default::default()
476 },
477 &input_definitions,
478 )
479 .into_iter()
480 .map(|output| {
481 let definitions = output.schema_definitions(self.config.schema.enabled);
482 (output.port, definitions)
483 })
484 .collect::<HashMap<_, _>>();
485
486 let context = TransformContext {
487 key: Some(key.clone()),
488 globals: self.config.global.clone(),
489 enrichment_tables: enrichment_tables.clone(),
490 metrics_storage: METRICS_STORAGE.clone(),
491 schema_definitions,
492 merged_schema_definition: merged_definition.clone(),
493 schema: self.config.schema,
494 extra_context: self.extra_context.clone(),
495 };
496
497 let node =
498 TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
499
500 let transform = match transform
501 .inner
502 .build(&context)
503 .instrument(span.clone())
504 .await
505 {
506 Err(error) => {
507 self.errors.push(format!("Transform \"{key}\": {error}"));
508 continue;
509 }
510 Ok(transform) => transform,
511 };
512
513 let metrics = ChannelMetricMetadata::new(BufferChannelKind::Transform, None);
514 let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
515 TOPOLOGY_BUFFER_SIZE,
516 WhenFull::Block,
517 &span,
518 Some(metrics),
519 self.config.global.buffer_utilization_ewma_half_life_seconds,
520 );
521
522 self.inputs
523 .insert(key.clone(), (input_tx, node.inputs.clone()));
524
525 let (transform_task, transform_outputs) =
526 self.build_transform(transform, node, input_rx);
527
528 self.outputs.extend(transform_outputs);
529 self.tasks.insert(key.clone(), transform_task);
530 }
531 }
532
533 async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
534 let table_sinks = self
535 .config
536 .enrichment_tables
537 .iter()
538 .filter_map(|(key, table)| table.as_sink(key))
539 .collect::<Vec<_>>();
540 for (key, sink) in self
541 .config
542 .sinks()
543 .filter(|(key, _)| self.diff.sinks.contains_new(key))
544 .chain(
545 table_sinks
546 .iter()
547 .map(|(key, sink)| (key, sink))
548 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
549 )
550 {
551 debug!(component_id = %key, "Building new sink.");
552
553 let sink_inputs = &sink.inputs;
554 let healthcheck = sink.healthcheck();
555 let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
556 let healthcheck_timeout = healthcheck.timeout;
557
558 let typetag = sink.inner.get_component_name();
559 let input_type = sink.inner.input().data_type();
560
561 let span = error_span!(
562 "sink",
563 component_kind = "sink",
564 component_id = %key.id(),
565 component_type = %sink.inner.get_component_name(),
566 );
567 let _entered_span = span.enter();
568
569 if let Err(mut err) = schema::validate_sink_expectations(
573 key,
574 sink,
575 self.config,
576 enrichment_tables.clone(),
577 ) {
578 self.errors.append(&mut err);
579 };
580
581 let (tx, rx) = match self.buffers.remove(key) {
582 Some(buffer) => buffer,
583 _ => {
584 let buffer_type =
585 match sink.buffer.stages().first().expect("cant ever be empty") {
586 BufferType::Memory { .. } => "memory",
587 BufferType::DiskV2 { .. } => "disk",
588 };
589 let buffer_span = error_span!("sink", buffer_type);
590 let buffer = sink
591 .buffer
592 .build(
593 self.config.global.data_dir.clone(),
594 key.to_string(),
595 buffer_span,
596 )
597 .await;
598 match buffer {
599 Err(error) => {
600 self.errors.push(format!("Sink \"{key}\": {error}"));
601 continue;
602 }
603 Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
604 }
605 }
606 };
607
608 let cx = SinkContext {
609 healthcheck,
610 globals: self.config.global.clone(),
611 enrichment_tables: enrichment_tables.clone(),
612 metrics_storage: METRICS_STORAGE.clone(),
613 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
614 schema: self.config.schema,
615 app_name: crate::get_app_name().to_string(),
616 app_name_slug: crate::get_slugified_app_name(),
617 extra_context: self.extra_context.clone(),
618 };
619
620 let (sink, healthcheck) = match sink.inner.build(cx).await {
621 Err(error) => {
622 self.errors.push(format!("Sink \"{key}\": {error}"));
623 continue;
624 }
625 Ok(built) => built,
626 };
627
628 let (trigger, tripwire) = Tripwire::new();
629
630 let utilization_sender = self
631 .utilization_registry
632 .add_component(key.clone(), gauge!(GaugeName::Utilization));
633 let component_key = key.clone();
634 let sink = async move {
635 debug!("Sink starting.");
636
637 let rx = rx
644 .lock()
645 .unwrap()
646 .take()
647 .expect("Task started but input has been taken.");
648
649 let mut rx = Utilization::new(utilization_sender, component_key.clone(), rx);
650
651 let events_received = register!(EventsReceived);
652 sink.run(
653 rx.by_ref()
654 .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
655 .inspect(|events| {
656 events_received.emit(CountByteSize(
657 events.len(),
658 events.estimated_json_encoded_size_of(),
659 ))
660 })
661 .take_until_if(tripwire),
662 )
663 .await
664 .map(|_| {
665 debug!("Sink finished normally.");
666 TaskOutput::Sink(rx)
667 })
668 .map_err(|_| {
669 debug!("Sink finished with an error.");
670 TaskError::Opaque
671 })
672 };
673
674 let task = Task::new(key.clone(), typetag, sink);
675
676 let component_key = key.clone();
677 let healthcheck_task = async move {
678 if enable_healthcheck {
679 timeout(healthcheck_timeout, healthcheck)
680 .map(|result| match result {
681 Ok(Ok(_)) => {
682 info!("Healthcheck passed.");
683 Ok(TaskOutput::Healthcheck)
684 }
685 Ok(Err(error)) => {
686 error!(
687 msg = "Healthcheck failed.",
688 %error,
689 component_kind = "sink",
690 component_type = typetag,
691 component_id = %component_key.id(),
692 );
693 Err(TaskError::wrapped(error))
694 }
695 Err(e) => {
696 error!(
697 msg = "Healthcheck timed out.",
698 component_kind = "sink",
699 component_type = typetag,
700 component_id = %component_key.id(),
701 );
702 Err(TaskError::wrapped(Box::new(e)))
703 }
704 })
705 .await
706 } else {
707 info!("Healthcheck disabled.");
708 Ok(TaskOutput::Healthcheck)
709 }
710 };
711
712 let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
713
714 self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
715 self.healthchecks.insert(key.clone(), healthcheck_task);
716 self.tasks.insert(key.clone(), task);
717 self.detach_triggers.insert(key.clone(), trigger);
718 }
719 }
720
721 fn build_transform(
722 &self,
723 transform: Transform,
724 node: TransformNode,
725 input_rx: BufferReceiver<EventArray>,
726 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
727 match transform {
728 Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx),
730 Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx),
731 Transform::Task(t) => self.build_task_transform(
732 t,
733 input_rx,
734 node.input_details.data_type(),
735 node.typetag,
736 &node.key,
737 &node.outputs,
738 ),
739 }
740 }
741
742 fn build_sync_transform(
743 &self,
744 t: Box<dyn SyncTransform>,
745 node: TransformNode,
746 input_rx: BufferReceiver<EventArray>,
747 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
748 let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
749
750 let sender = self
751 .utilization_registry
752 .add_component(node.key.clone(), gauge!(GaugeName::Utilization));
753 let runner = Runner::new(
754 t,
755 input_rx,
756 sender,
757 node.input_details.data_type(),
758 outputs,
759 LatencyRecorder::new(self.config.global.latency_ewma_alpha),
760 );
761 let transform = if node.enable_concurrency {
762 runner.run_concurrently().boxed()
763 } else {
764 runner.run_inline().boxed()
765 };
766
767 let transform = async move {
768 debug!("Synchronous transform starting.");
769
770 match transform.await {
771 Ok(v) => {
772 debug!("Synchronous transform finished normally.");
773 Ok(v)
774 }
775 Err(e) => {
776 debug!("Synchronous transform finished with an error.");
777 Err(e)
778 }
779 }
780 };
781
782 let mut output_controls = HashMap::new();
783 for (name, control) in controls {
784 let id = name
785 .map(|name| OutputId::from((&node.key, name)))
786 .unwrap_or_else(|| OutputId::from(&node.key));
787 output_controls.insert(id, control);
788 }
789
790 let task = Task::new(node.key.clone(), node.typetag, transform);
791
792 (task, output_controls)
793 }
794
795 fn build_task_transform(
796 &self,
797 t: Box<dyn TaskTransform<EventArray>>,
798 input_rx: BufferReceiver<EventArray>,
799 input_type: DataType,
800 typetag: &str,
801 key: &ComponentKey,
802 outputs: &[TransformOutput],
803 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
804 let (mut fanout, control) = Fanout::new();
805
806 let sender = self
807 .utilization_registry
808 .add_component(key.clone(), gauge!(GaugeName::Utilization));
809 let output_sender = sender.clone();
810 let input_rx = Utilization::new(sender, key.clone(), input_rx.into_stream());
811
812 let events_received = register!(EventsReceived);
813 let filtered = input_rx
814 .filter(move |events| ready(filter_events_type(events, input_type)))
815 .inspect(move |events| {
816 events_received.emit(CountByteSize(
817 events.len(),
818 events.estimated_json_encoded_size_of(),
819 ))
820 });
821 let events_sent = register!(EventsSent::from(internal_event::Output(None)));
822 let output_id = Arc::new(OutputId {
823 component: key.clone(),
824 port: None,
825 });
826 let latency_recorder = LatencyRecorder::new(self.config.global.latency_ewma_alpha);
827
828 let schema_definition_map = outputs
830 .iter()
831 .find(|x| x.port.is_none())
832 .expect("output for default port required for task transforms")
833 .log_schema_definitions
834 .clone()
835 .into_iter()
836 .map(|(key, value)| (key, Arc::new(value)))
837 .collect();
838
839 let stream = t
840 .transform(Box::pin(filtered))
841 .map(move |mut events| {
842 for event in events.iter_events_mut() {
843 update_runtime_schema_definition(event, &output_id, &schema_definition_map);
844 }
845 let now = Instant::now();
846 latency_recorder.on_send(&mut events, now);
847 (events, now)
848 })
849 .inspect(move |(events, _): &(EventArray, Instant)| {
850 events_sent.emit(CountByteSize(
851 events.len(),
852 events.estimated_json_encoded_size_of(),
853 ));
854 });
855 let stream = OutputUtilization::new(output_sender, stream);
856 let transform = async move {
857 debug!("Task transform starting.");
858
859 match fanout.send_stream(stream).await {
860 Ok(()) => {
861 debug!("Task transform finished normally.");
862 Ok(TaskOutput::Transform)
863 }
864 Err(e) => {
865 debug!("Task transform finished with an error.");
866 Err(TaskError::wrapped(e))
867 }
868 }
869 }
870 .boxed();
871
872 let mut outputs = HashMap::new();
873 outputs.insert(OutputId::from(key), control);
874
875 let task = Task::new(key.clone(), typetag, transform);
876
877 (task, outputs)
878 }
879}
880
881async fn run_source_output_pump(
882 mut rx: LimitedReceiver<SourceSenderItem>,
883 mut fanout: Fanout,
884 source: Arc<ComponentKey>,
885 source_type: &'static str,
886) -> TaskResult {
887 debug!("Source pump starting.");
888
889 let mut control_channel_open = true;
890 loop {
891 tokio::select! {
892 biased;
893 alive = fanout.recv_control_message(), if control_channel_open => {
897 control_channel_open = alive;
898 }
899 item = rx.next() => {
900 match item {
901 Some(SourceSenderItem { events: mut array, send_reference }) => {
902 let now = Instant::now();
910 array.for_each_metadata_mut(|metadata| {
911 metadata.set_source_id(Arc::clone(&source));
912 metadata.set_source_type(source_type);
913 metadata.set_last_transform_timestamp(now);
914 });
915 fanout
916 .send(array, Some(send_reference))
917 .await
918 .map_err(|e| {
919 debug!("Source pump finished with an error.");
920 TaskError::wrapped(e)
921 })?;
922 }
923 None => break,
924 }
925 }
926 }
927 }
928
929 debug!("Source pump finished normally.");
930 Ok(TaskOutput::Source)
931}
932
933pub async fn reload_enrichment_tables(config: &Config) {
934 let mut enrichment_tables = HashMap::new();
935 'tables: for (name, table_outer) in config.enrichment_tables.iter() {
937 let table_name = name.to_string();
938 if ENRICHMENT_TABLES.needs_reload(&table_name) {
939 let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
940
941 let mut table = match table_outer.inner.build(&config.global).await {
942 Ok(table) => table,
943 Err(error) => {
944 error!("Enrichment table \"{name}\" reload failed: {error}");
945 continue;
946 }
947 };
948
949 if let Some(indexes) = indexes {
950 for (case, index) in indexes {
951 match table
952 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
953 {
954 Ok(_) => (),
955 Err(error) => {
956 error!(
960 message = "Unable to add index to reloaded enrichment table.",
961 table = ?name.to_string(),
962 %error
963 );
964 continue 'tables;
965 }
966 }
967 }
968 }
969
970 enrichment_tables.insert(table_name, table);
971 }
972 }
973
974 ENRICHMENT_TABLES.load(enrichment_tables);
975 ENRICHMENT_TABLES.finish_load();
976}
977
978pub struct TopologyPieces {
979 pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
980 pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
981 pub(super) tasks: HashMap<ComponentKey, Task>,
982 pub(crate) source_tasks: HashMap<ComponentKey, Task>,
983 pub(super) healthchecks: HashMap<ComponentKey, Task>,
984 pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
985 pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
986 pub(crate) metrics_storage: MetricsStorage,
987 pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
988}
989
990pub struct TopologyPiecesBuilder<'a> {
1002 config: &'a Config,
1003 diff: &'a ConfigDiff,
1004 buffers: HashMap<ComponentKey, BuiltBuffer>,
1005 extra_context: ExtraContext,
1006 utilization_registry: Option<UtilizationRegistry>,
1007}
1008
1009impl<'a> TopologyPiecesBuilder<'a> {
1010 pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
1012 Self {
1013 config,
1014 diff,
1015 buffers: HashMap::new(),
1016 extra_context: ExtraContext::default(),
1017 utilization_registry: None,
1018 }
1019 }
1020
1021 pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
1023 self.buffers = buffers;
1024 self
1025 }
1026
1027 pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
1029 self.extra_context = extra_context;
1030 self
1031 }
1032
1033 pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
1035 self.utilization_registry = registry;
1036 self
1037 }
1038
1039 pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
1044 Builder::new(
1045 self.config,
1046 self.diff,
1047 self.buffers,
1048 self.extra_context,
1049 self.utilization_registry,
1050 )
1051 .build()
1052 .await
1053 }
1054
1055 pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
1060 match self.build().await {
1061 Err(errors) => {
1062 for error in errors {
1063 error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
1064 }
1065 None
1066 }
1067 Ok(new_pieces) => Some(new_pieces),
1068 }
1069 }
1070}
1071
1072impl TopologyPieces {
1073 pub async fn build_or_log_errors(
1074 config: &Config,
1075 diff: &ConfigDiff,
1076 buffers: HashMap<ComponentKey, BuiltBuffer>,
1077 extra_context: ExtraContext,
1078 utilization_registry: Option<UtilizationRegistry>,
1079 ) -> Option<Self> {
1080 TopologyPiecesBuilder::new(config, diff)
1081 .with_buffers(buffers)
1082 .with_extra_context(extra_context)
1083 .with_utilization_registry(utilization_registry)
1084 .build_or_log_errors()
1085 .await
1086 }
1087
1088 pub async fn build(
1090 config: &super::Config,
1091 diff: &ConfigDiff,
1092 buffers: HashMap<ComponentKey, BuiltBuffer>,
1093 extra_context: ExtraContext,
1094 utilization_registry: Option<UtilizationRegistry>,
1095 ) -> Result<Self, Vec<String>> {
1096 TopologyPiecesBuilder::new(config, diff)
1097 .with_buffers(buffers)
1098 .with_extra_context(extra_context)
1099 .with_utilization_registry(utilization_registry)
1100 .build()
1101 .await
1102 }
1103}
1104
1105const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
1106 match events {
1107 EventArray::Logs(_) => data_type.contains(DataType::Log),
1108 EventArray::Metrics(_) => data_type.contains(DataType::Metric),
1109 EventArray::Traces(_) => data_type.contains(DataType::Trace),
1110 }
1111}
1112
1113#[derive(Debug, Clone)]
1114struct TransformNode {
1115 key: ComponentKey,
1116 typetag: &'static str,
1117 inputs: Inputs<OutputId>,
1118 input_details: Input,
1119 outputs: Vec<TransformOutput>,
1120 enable_concurrency: bool,
1121}
1122
1123impl TransformNode {
1124 pub fn from_parts(
1125 key: ComponentKey,
1126 context: &TransformContext,
1127 transform: &TransformOuter<OutputId>,
1128 schema_definition: &[(OutputId, Definition)],
1129 ) -> Self {
1130 Self {
1131 key,
1132 typetag: transform.inner.get_component_name(),
1133 inputs: transform.inputs.clone(),
1134 input_details: transform.inner.input(),
1135 outputs: transform.inner.outputs(context, schema_definition),
1136 enable_concurrency: transform.inner.enable_concurrency(),
1137 }
1138 }
1139}
1140
1141struct Runner {
1142 transform: Box<dyn SyncTransform>,
1143 input_rx: Option<BufferReceiver<EventArray>>,
1144 input_type: DataType,
1145 outputs: TransformOutputs,
1146 timer_tx: UtilizationComponentSender,
1147 latency_recorder: LatencyRecorder,
1148 events_received: Registered<EventsReceived>,
1149}
1150
1151impl Runner {
1152 fn new(
1153 transform: Box<dyn SyncTransform>,
1154 input_rx: BufferReceiver<EventArray>,
1155 timer_tx: UtilizationComponentSender,
1156 input_type: DataType,
1157 outputs: TransformOutputs,
1158 latency_recorder: LatencyRecorder,
1159 ) -> Self {
1160 Self {
1161 transform,
1162 input_rx: Some(input_rx),
1163 input_type,
1164 outputs,
1165 timer_tx,
1166 latency_recorder,
1167 events_received: register!(EventsReceived),
1168 }
1169 }
1170
1171 fn on_events_received(&mut self, events: &EventArray) {
1172 self.timer_tx.try_send_stop_wait();
1173
1174 self.events_received.emit(CountByteSize(
1175 events.len(),
1176 events.estimated_json_encoded_size_of(),
1177 ));
1178 }
1179
1180 async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1181 self.timer_tx.try_send_start_wait();
1182 let now = Instant::now();
1183 outputs_buf.for_each_array_mut(|array| self.latency_recorder.on_send(array, now));
1184 self.outputs.send(outputs_buf).await
1185 }
1186
1187 async fn run_inline(mut self) -> TaskResult {
1188 const INLINE_BATCH_SIZE: usize = 128;
1190
1191 let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1192
1193 let mut input_rx = self
1194 .input_rx
1195 .take()
1196 .expect("can't run runner twice")
1197 .into_stream()
1198 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1199
1200 self.timer_tx.try_send_start_wait();
1201 while let Some(events) = input_rx.next().await {
1202 self.on_events_received(&events);
1203 self.transform.transform_all(events, &mut outputs_buf);
1204 self.send_outputs(&mut outputs_buf)
1205 .await
1206 .map_err(TaskError::wrapped)?;
1207 }
1208
1209 Ok(TaskOutput::Transform)
1210 }
1211
1212 async fn run_concurrently(mut self) -> TaskResult {
1213 let input_rx = self
1214 .input_rx
1215 .take()
1216 .expect("can't run runner twice")
1217 .into_stream()
1218 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1219
1220 let mut input_rx =
1221 super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1222
1223 let mut in_flight = FuturesOrdered::new();
1224 let mut shutting_down = false;
1225
1226 self.timer_tx.try_send_start_wait();
1227 loop {
1228 tokio::select! {
1229 biased;
1230
1231 result = in_flight.next(), if !in_flight.is_empty() => {
1232 match result {
1233 Some(Ok(mut outputs_buf)) => {
1234 self.send_outputs(&mut outputs_buf).await
1235 .map_err(TaskError::wrapped)?;
1236 }
1237 _ => unreachable!("join error or bad poll"),
1238 }
1239 }
1240
1241 input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1242 match input_arrays {
1243 Some(input_arrays) => {
1244 let mut len = 0;
1245 for events in &input_arrays {
1246 self.on_events_received(events);
1247 len += events.len();
1248 }
1249
1250 let mut t = self.transform.clone();
1251 let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1252 let task = tokio::spawn(async move {
1253 for events in input_arrays {
1254 t.transform_all(events, &mut outputs_buf);
1255 }
1256 outputs_buf
1257 }.in_current_span());
1258 in_flight.push_back(task);
1259 }
1260 None => {
1261 shutting_down = true;
1262 continue
1263 }
1264 }
1265 }
1266
1267 else => {
1268 if shutting_down {
1269 break
1270 }
1271 }
1272 }
1273 }
1274
1275 Ok(TaskOutput::Transform)
1276 }
1277}