vector/topology/
builder.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    sync::{Arc, LazyLock, Mutex},
6    time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
12use tokio::{
13    select,
14    sync::{mpsc::UnboundedSender, oneshot},
15    time::timeout,
16};
17use tracing::Instrument;
18use vector_lib::{
19    EstimatedJsonEncodedSizeOf,
20    buffers::{
21        BufferType, WhenFull,
22        topology::{
23            builder::TopologyBuilder,
24            channel::{
25                BufferChannelKind, BufferReceiver, BufferSender, ChannelMetricMetadata,
26                LimitedReceiver,
27            },
28        },
29    },
30    internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
31    latency::LatencyRecorder,
32    schema::Definition,
33    source_sender::{CHUNK_SIZE, SourceSenderItem},
34    transform::update_runtime_schema_definition,
35};
36use vector_lib::{gauge, internal_event::GaugeName};
37use vector_vrl_metrics::MetricsStorage;
38
39use super::{
40    BuiltBuffer, ConfigDiff,
41    fanout::{self, Fanout},
42    schema,
43    task::{Task, TaskOutput, TaskResult},
44};
45use crate::{
46    SourceSender,
47    config::{
48        ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
49        ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
50    },
51    event::{EventArray, EventContainer},
52    extra_context::ExtraContext,
53    internal_events::EventsReceived,
54    shutdown::SourceShutdownCoordinator,
55    spawn_named,
56    topology::task::TaskError,
57    transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
58    utilization::{
59        OutputUtilization, Utilization, UtilizationComponentSender, UtilizationEmitter,
60        UtilizationRegistry,
61    },
62};
63
64static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
65    LazyLock::new(vector_lib::enrichment::TableRegistry::default);
66static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
67
68pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
69    LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
70
71const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
72pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
73
74static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
75    crate::app::worker_threads()
76        .map(std::num::NonZeroUsize::get)
77        .unwrap_or_else(crate::num_threads)
78});
79
80const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
81
82struct Builder<'a> {
83    config: &'a super::Config,
84    diff: &'a ConfigDiff,
85    shutdown_coordinator: SourceShutdownCoordinator,
86    errors: Vec<String>,
87    outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
88    tasks: HashMap<ComponentKey, Task>,
89    buffers: HashMap<ComponentKey, BuiltBuffer>,
90    inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
91    healthchecks: HashMap<ComponentKey, Task>,
92    detach_triggers: HashMap<ComponentKey, Trigger>,
93    extra_context: ExtraContext,
94    utilization_emitter: Option<UtilizationEmitter>,
95    utilization_registry: UtilizationRegistry,
96}
97
98impl<'a> Builder<'a> {
99    fn new(
100        config: &'a super::Config,
101        diff: &'a ConfigDiff,
102        buffers: HashMap<ComponentKey, BuiltBuffer>,
103        extra_context: ExtraContext,
104        utilization_registry: Option<UtilizationRegistry>,
105    ) -> Self {
106        // If registry is not passed, we need to build a whole new utilization emitter + registry
107        // Otherwise, we just store the registry and reuse it for this build
108        let (emitter, registry) = if let Some(registry) = utilization_registry {
109            (None, registry)
110        } else {
111            let (emitter, registry) = UtilizationEmitter::new();
112            (Some(emitter), registry)
113        };
114        Self {
115            config,
116            diff,
117            buffers,
118            shutdown_coordinator: SourceShutdownCoordinator::default(),
119            errors: vec![],
120            outputs: HashMap::new(),
121            tasks: HashMap::new(),
122            inputs: HashMap::new(),
123            healthchecks: HashMap::new(),
124            detach_triggers: HashMap::new(),
125            extra_context,
126            utilization_emitter: emitter,
127            utilization_registry: registry,
128        }
129    }
130
131    /// Builds the new pieces of the topology found in `self.diff`.
132    async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
133        let enrichment_tables = self.load_enrichment_tables().await;
134        let source_tasks = self.build_sources(enrichment_tables).await;
135        self.build_transforms(enrichment_tables).await;
136        self.build_sinks(enrichment_tables).await;
137
138        // We should have all the data for the enrichment tables loaded now, so switch them over to
139        // readonly.
140        enrichment_tables.finish_load();
141
142        if self.errors.is_empty() {
143            Ok(TopologyPieces {
144                inputs: self.inputs,
145                outputs: Self::finalize_outputs(self.outputs),
146                tasks: self.tasks,
147                source_tasks,
148                healthchecks: self.healthchecks,
149                shutdown_coordinator: self.shutdown_coordinator,
150                detach_triggers: self.detach_triggers,
151                metrics_storage: METRICS_STORAGE.clone(),
152                utilization: self
153                    .utilization_emitter
154                    .map(|e| (e, self.utilization_registry)),
155            })
156        } else {
157            Err(self.errors)
158        }
159    }
160
161    fn finalize_outputs(
162        outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
163    ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
164    {
165        let mut finalized_outputs = HashMap::new();
166        for (id, output) in outputs {
167            let entry = finalized_outputs
168                .entry(id.component)
169                .or_insert_with(HashMap::new);
170            entry.insert(id.port, output);
171        }
172
173        finalized_outputs
174    }
175
176    /// Loads, or reloads the enrichment tables.
177    /// The tables are stored in the `ENRICHMENT_TABLES` global variable.
178    async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
179        let mut enrichment_tables = HashMap::new();
180
181        // Build enrichment tables
182        'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
183            let table_name = name.to_string();
184            if ENRICHMENT_TABLES.needs_reload(&table_name) {
185                let indexes = if !self.diff.enrichment_tables.is_added(name) {
186                    // If this is an existing enrichment table, we need to store the indexes to reapply
187                    // them again post load.
188                    Some(ENRICHMENT_TABLES.index_fields(&table_name))
189                } else {
190                    None
191                };
192
193                let mut table = match table_outer.inner.build(&self.config.global).await {
194                    Ok(table) => table,
195                    Err(error) => {
196                        self.errors
197                            .push(format!("Enrichment Table \"{name}\": {error}"));
198                        continue;
199                    }
200                };
201
202                if let Some(indexes) = indexes {
203                    for (case, index) in indexes {
204                        match table
205                            .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
206                        {
207                            Ok(_) => (),
208                            Err(error) => {
209                                // If there is an error adding an index we do not want to use the reloaded
210                                // data, the previously loaded data will still need to be used.
211                                // Just report the error and continue.
212                                error!(message = "Unable to add index to reloaded enrichment table.",
213                                    table = ?name.to_string(),
214                                    %error);
215                                continue 'tables;
216                            }
217                        }
218                    }
219                }
220
221                enrichment_tables.insert(table_name, table);
222            }
223        }
224
225        ENRICHMENT_TABLES.load(enrichment_tables);
226
227        &ENRICHMENT_TABLES
228    }
229
230    async fn build_sources(
231        &mut self,
232        enrichment_tables: &vector_lib::enrichment::TableRegistry,
233    ) -> HashMap<ComponentKey, Task> {
234        let mut source_tasks = HashMap::new();
235
236        let table_sources = self
237            .config
238            .enrichment_tables
239            .iter()
240            .filter_map(|(key, table)| table.as_source(key))
241            .collect::<Vec<_>>();
242        for (key, source) in self
243            .config
244            .sources()
245            .filter(|(key, _)| self.diff.sources.contains_new(key))
246            .chain(
247                table_sources
248                    .iter()
249                    .map(|(key, source)| (key, source))
250                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
251            )
252        {
253            debug!(component_id = %key, "Building new source.");
254
255            let typetag = source.inner.get_component_name();
256            let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
257
258            let span = error_span!(
259                "source",
260                component_kind = "source",
261                component_id = %key.id(),
262                component_type = %source.inner.get_component_name(),
263            );
264            let _entered_span = span.enter();
265
266            let task_name = format!(
267                ">> {} ({}, pump) >>",
268                source.inner.get_component_name(),
269                key.id()
270            );
271
272            let mut builder = SourceSender::builder()
273                .with_buffer(*SOURCE_SENDER_BUFFER_SIZE)
274                .with_timeout(source.inner.send_timeout())
275                .with_ewma_half_life_seconds(
276                    self.config.global.buffer_utilization_ewma_half_life_seconds,
277                );
278            let mut pumps = Vec::new();
279            let mut controls = HashMap::new();
280            let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
281
282            for output in source_outputs.into_iter() {
283                let rx = builder.add_source_output(output.clone(), key.clone());
284
285                let (fanout, control) = Fanout::new();
286                let source_type = source.inner.get_component_name();
287                let source = Arc::new(key.clone());
288
289                let pump = run_source_output_pump(rx, fanout, source, source_type);
290
291                pumps.push(pump.instrument(span.clone()));
292                controls.insert(
293                    OutputId {
294                        component: key.clone(),
295                        port: output.port.clone(),
296                    },
297                    control,
298                );
299
300                let port = output.port.clone();
301                if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
302                    schema_definitions.insert(port, definition);
303                }
304            }
305
306            let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
307            let pump = async move {
308                debug!("Source pump supervisor starting.");
309
310                // Spawn all of the per-output pumps and then await their completion.
311                //
312                // If any of the pumps complete with an error, or panic/are cancelled, we return
313                // immediately.
314                let mut handles = FuturesUnordered::new();
315                for pump in pumps {
316                    handles.push(spawn_named(pump, task_name.as_ref()));
317                }
318
319                let mut had_pump_error = false;
320                while let Some(output) = handles.try_next().await? {
321                    if let Err(e) = output {
322                        // Immediately send the error to the source's wrapper future, but ignore any
323                        // errors during the send, since nested errors wouldn't make any sense here.
324                        _ = pump_error_tx.send(e);
325                        had_pump_error = true;
326                        break;
327                    }
328                }
329
330                if had_pump_error {
331                    debug!("Source pump supervisor task finished with an error.");
332                } else {
333                    debug!("Source pump supervisor task finished normally.");
334                }
335                Ok(TaskOutput::Source)
336            };
337            let pump = Task::new(key.clone(), typetag, pump);
338
339            let (shutdown_signal, force_shutdown_tripwire) = self
340                .shutdown_coordinator
341                .register_source(key, INTERNAL_SOURCES.contains(&typetag));
342
343            let context = SourceContext {
344                key: key.clone(),
345                globals: self.config.global.clone(),
346                enrichment_tables: enrichment_tables.clone(),
347                metrics_storage: METRICS_STORAGE.clone(),
348                shutdown: shutdown_signal,
349                out: builder.build(),
350                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
351                acknowledgements: source.sink_acknowledgements,
352                schema_definitions,
353                schema: self.config.schema,
354                extra_context: self.extra_context.clone(),
355            };
356            let server = match source.inner.build(context).await {
357                Err(error) => {
358                    self.errors.push(format!("Source \"{key}\": {error}"));
359                    continue;
360                }
361                Ok(server) => server,
362            };
363
364            // Build a wrapper future that drives the actual source future, but returns early if we've
365            // been signalled to forcefully shutdown, or if the source pump encounters an error.
366            //
367            // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully
368            // within the allotted time window. This can occur normally for certain sources, like stdin,
369            // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time
370            // to shutdown unless some input is given.
371            let server = async move {
372                debug!("Source starting.");
373
374                let mut result = select! {
375                    biased;
376
377                    // We've been told that we must forcefully shut down.
378                    _ = force_shutdown_tripwire => Ok(()),
379
380                    // The source pump encountered an error, which we're now bubbling up here to stop
381                    // the source as well, since the source running makes no sense without the pump.
382                    //
383                    // We only match receiving a message, not the error of the sender being dropped,
384                    // just to keep things simpler.
385                    Ok(e) = &mut pump_error_rx => Err(e),
386
387                    // The source finished normally.
388                    result = server => result.map_err(|_| TaskError::Opaque),
389                };
390
391                // Even though we already tried to receive any pump task error above, we may have exited
392                // on the source itself returning an error due to task scheduling, where the pump task
393                // encountered an error, sent it over the oneshot, but we were polling the source
394                // already and hit an error trying to send to the now-shutdown pump task.
395                //
396                // Since the error from the source is opaque at the moment (i.e. `()`), we try a final
397                // time to see if the pump task encountered an error, using _that_ instead if so, to
398                // propagate the true error that caused the source to have to stop.
399                if let Ok(e) = pump_error_rx.try_recv() {
400                    result = Err(e);
401                }
402
403                match result {
404                    Ok(()) => {
405                        debug!("Source finished normally.");
406                        Ok(TaskOutput::Source)
407                    }
408                    Err(e) => {
409                        debug!("Source finished with an error.");
410                        Err(e)
411                    }
412                }
413            };
414            let server = Task::new(key.clone(), typetag, server);
415
416            self.outputs.extend(controls);
417            self.tasks.insert(key.clone(), pump);
418            source_tasks.insert(key.clone(), server);
419        }
420
421        source_tasks
422    }
423
424    async fn build_transforms(
425        &mut self,
426        enrichment_tables: &vector_lib::enrichment::TableRegistry,
427    ) {
428        let mut definition_cache = HashMap::default();
429
430        for (key, transform) in self
431            .config
432            .transforms()
433            .filter(|(key, _)| self.diff.transforms.contains_new(key))
434        {
435            debug!(component_id = %key, "Building new transform.");
436
437            let input_definitions = match schema::input_definitions(
438                &transform.inputs,
439                self.config,
440                enrichment_tables.clone(),
441                &mut definition_cache,
442            ) {
443                Ok(definitions) => definitions,
444                Err(_) => {
445                    // We have received an error whilst retrieving the definitions,
446                    // there is no point in continuing.
447
448                    return;
449                }
450            };
451
452            let merged_definition: Definition = input_definitions
453                .iter()
454                .map(|(_output_id, definition)| definition.clone())
455                .reduce(Definition::merge)
456                // We may not have any definitions if all the inputs are from metrics sources.
457                .unwrap_or_else(Definition::any);
458
459            let span = error_span!(
460                "transform",
461                component_kind = "transform",
462                component_id = %key.id(),
463                component_type = %transform.inner.get_component_name(),
464            );
465            let _span = span.enter();
466
467            // Create a map of the outputs to the list of possible definitions from those outputs.
468            let schema_definitions = transform
469                .inner
470                .outputs(
471                    &TransformContext {
472                        enrichment_tables: enrichment_tables.clone(),
473                        metrics_storage: METRICS_STORAGE.clone(),
474                        schema: self.config.schema,
475                        ..Default::default()
476                    },
477                    &input_definitions,
478                )
479                .into_iter()
480                .map(|output| {
481                    let definitions = output.schema_definitions(self.config.schema.enabled);
482                    (output.port, definitions)
483                })
484                .collect::<HashMap<_, _>>();
485
486            let context = TransformContext {
487                key: Some(key.clone()),
488                globals: self.config.global.clone(),
489                enrichment_tables: enrichment_tables.clone(),
490                metrics_storage: METRICS_STORAGE.clone(),
491                schema_definitions,
492                merged_schema_definition: merged_definition.clone(),
493                schema: self.config.schema,
494                extra_context: self.extra_context.clone(),
495            };
496
497            let node =
498                TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
499
500            let transform = match transform
501                .inner
502                .build(&context)
503                .instrument(span.clone())
504                .await
505            {
506                Err(error) => {
507                    self.errors.push(format!("Transform \"{key}\": {error}"));
508                    continue;
509                }
510                Ok(transform) => transform,
511            };
512
513            let metrics = ChannelMetricMetadata::new(BufferChannelKind::Transform, None);
514            let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
515                TOPOLOGY_BUFFER_SIZE,
516                WhenFull::Block,
517                &span,
518                Some(metrics),
519                self.config.global.buffer_utilization_ewma_half_life_seconds,
520            );
521
522            self.inputs
523                .insert(key.clone(), (input_tx, node.inputs.clone()));
524
525            let (transform_task, transform_outputs) =
526                self.build_transform(transform, node, input_rx);
527
528            self.outputs.extend(transform_outputs);
529            self.tasks.insert(key.clone(), transform_task);
530        }
531    }
532
533    async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
534        let table_sinks = self
535            .config
536            .enrichment_tables
537            .iter()
538            .filter_map(|(key, table)| table.as_sink(key))
539            .collect::<Vec<_>>();
540        for (key, sink) in self
541            .config
542            .sinks()
543            .filter(|(key, _)| self.diff.sinks.contains_new(key))
544            .chain(
545                table_sinks
546                    .iter()
547                    .map(|(key, sink)| (key, sink))
548                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
549            )
550        {
551            debug!(component_id = %key, "Building new sink.");
552
553            let sink_inputs = &sink.inputs;
554            let healthcheck = sink.healthcheck();
555            let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
556            let healthcheck_timeout = healthcheck.timeout;
557
558            let typetag = sink.inner.get_component_name();
559            let input_type = sink.inner.input().data_type();
560
561            let span = error_span!(
562                "sink",
563                component_kind = "sink",
564                component_id = %key.id(),
565                component_type = %sink.inner.get_component_name(),
566            );
567            let _entered_span = span.enter();
568
569            // At this point, we've validated that all transforms are valid, including any
570            // transform that mutates the schema provided by their sources. We can now validate the
571            // schema expectations of each individual sink.
572            if let Err(mut err) = schema::validate_sink_expectations(
573                key,
574                sink,
575                self.config,
576                enrichment_tables.clone(),
577            ) {
578                self.errors.append(&mut err);
579            };
580
581            let (tx, rx) = match self.buffers.remove(key) {
582                Some(buffer) => buffer,
583                _ => {
584                    let buffer_type =
585                        match sink.buffer.stages().first().expect("cant ever be empty") {
586                            BufferType::Memory { .. } => "memory",
587                            BufferType::DiskV2 { .. } => "disk",
588                        };
589                    let buffer_span = error_span!("sink", buffer_type);
590                    let buffer = sink
591                        .buffer
592                        .build(
593                            self.config.global.data_dir.clone(),
594                            key.to_string(),
595                            buffer_span,
596                        )
597                        .await;
598                    match buffer {
599                        Err(error) => {
600                            self.errors.push(format!("Sink \"{key}\": {error}"));
601                            continue;
602                        }
603                        Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
604                    }
605                }
606            };
607
608            let cx = SinkContext {
609                healthcheck,
610                globals: self.config.global.clone(),
611                enrichment_tables: enrichment_tables.clone(),
612                metrics_storage: METRICS_STORAGE.clone(),
613                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
614                schema: self.config.schema,
615                app_name: crate::get_app_name().to_string(),
616                app_name_slug: crate::get_slugified_app_name(),
617                extra_context: self.extra_context.clone(),
618            };
619
620            let (sink, healthcheck) = match sink.inner.build(cx).await {
621                Err(error) => {
622                    self.errors.push(format!("Sink \"{key}\": {error}"));
623                    continue;
624                }
625                Ok(built) => built,
626            };
627
628            let (trigger, tripwire) = Tripwire::new();
629
630            let utilization_sender = self
631                .utilization_registry
632                .add_component(key.clone(), gauge!(GaugeName::Utilization));
633            let component_key = key.clone();
634            let sink = async move {
635                debug!("Sink starting.");
636
637                // Why is this Arc<Mutex<Option<_>>> needed you ask.
638                // In case when this function build_pieces errors
639                // this future won't be run so this rx won't be taken
640                // which will enable us to reuse rx to rebuild
641                // old configuration by passing this Arc<Mutex<Option<_>>>
642                // yet again.
643                let rx = rx
644                    .lock()
645                    .unwrap()
646                    .take()
647                    .expect("Task started but input has been taken.");
648
649                let mut rx = Utilization::new(utilization_sender, component_key.clone(), rx);
650
651                let events_received = register!(EventsReceived);
652                sink.run(
653                    rx.by_ref()
654                        .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
655                        .inspect(|events| {
656                            events_received.emit(CountByteSize(
657                                events.len(),
658                                events.estimated_json_encoded_size_of(),
659                            ))
660                        })
661                        .take_until_if(tripwire),
662                )
663                .await
664                .map(|_| {
665                    debug!("Sink finished normally.");
666                    TaskOutput::Sink(rx)
667                })
668                .map_err(|_| {
669                    debug!("Sink finished with an error.");
670                    TaskError::Opaque
671                })
672            };
673
674            let task = Task::new(key.clone(), typetag, sink);
675
676            let component_key = key.clone();
677            let healthcheck_task = async move {
678                if enable_healthcheck {
679                    timeout(healthcheck_timeout, healthcheck)
680                        .map(|result| match result {
681                            Ok(Ok(_)) => {
682                                info!("Healthcheck passed.");
683                                Ok(TaskOutput::Healthcheck)
684                            }
685                            Ok(Err(error)) => {
686                                error!(
687                                    msg = "Healthcheck failed.",
688                                    %error,
689                                    component_kind = "sink",
690                                    component_type = typetag,
691                                    component_id = %component_key.id(),
692                                );
693                                Err(TaskError::wrapped(error))
694                            }
695                            Err(e) => {
696                                error!(
697                                    msg = "Healthcheck timed out.",
698                                    component_kind = "sink",
699                                    component_type = typetag,
700                                    component_id = %component_key.id(),
701                                );
702                                Err(TaskError::wrapped(Box::new(e)))
703                            }
704                        })
705                        .await
706                } else {
707                    info!("Healthcheck disabled.");
708                    Ok(TaskOutput::Healthcheck)
709                }
710            };
711
712            let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
713
714            self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
715            self.healthchecks.insert(key.clone(), healthcheck_task);
716            self.tasks.insert(key.clone(), task);
717            self.detach_triggers.insert(key.clone(), trigger);
718        }
719    }
720
721    fn build_transform(
722        &self,
723        transform: Transform,
724        node: TransformNode,
725        input_rx: BufferReceiver<EventArray>,
726    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
727        match transform {
728            // TODO: avoid the double boxing for function transforms here
729            Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx),
730            Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx),
731            Transform::Task(t) => self.build_task_transform(
732                t,
733                input_rx,
734                node.input_details.data_type(),
735                node.typetag,
736                &node.key,
737                &node.outputs,
738            ),
739        }
740    }
741
742    fn build_sync_transform(
743        &self,
744        t: Box<dyn SyncTransform>,
745        node: TransformNode,
746        input_rx: BufferReceiver<EventArray>,
747    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
748        let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
749
750        let sender = self
751            .utilization_registry
752            .add_component(node.key.clone(), gauge!(GaugeName::Utilization));
753        let runner = Runner::new(
754            t,
755            input_rx,
756            sender,
757            node.input_details.data_type(),
758            outputs,
759            LatencyRecorder::new(self.config.global.latency_ewma_alpha),
760        );
761        let transform = if node.enable_concurrency {
762            runner.run_concurrently().boxed()
763        } else {
764            runner.run_inline().boxed()
765        };
766
767        let transform = async move {
768            debug!("Synchronous transform starting.");
769
770            match transform.await {
771                Ok(v) => {
772                    debug!("Synchronous transform finished normally.");
773                    Ok(v)
774                }
775                Err(e) => {
776                    debug!("Synchronous transform finished with an error.");
777                    Err(e)
778                }
779            }
780        };
781
782        let mut output_controls = HashMap::new();
783        for (name, control) in controls {
784            let id = name
785                .map(|name| OutputId::from((&node.key, name)))
786                .unwrap_or_else(|| OutputId::from(&node.key));
787            output_controls.insert(id, control);
788        }
789
790        let task = Task::new(node.key.clone(), node.typetag, transform);
791
792        (task, output_controls)
793    }
794
795    fn build_task_transform(
796        &self,
797        t: Box<dyn TaskTransform<EventArray>>,
798        input_rx: BufferReceiver<EventArray>,
799        input_type: DataType,
800        typetag: &str,
801        key: &ComponentKey,
802        outputs: &[TransformOutput],
803    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
804        let (mut fanout, control) = Fanout::new();
805
806        let sender = self
807            .utilization_registry
808            .add_component(key.clone(), gauge!(GaugeName::Utilization));
809        let output_sender = sender.clone();
810        let input_rx = Utilization::new(sender, key.clone(), input_rx.into_stream());
811
812        let events_received = register!(EventsReceived);
813        let filtered = input_rx
814            .filter(move |events| ready(filter_events_type(events, input_type)))
815            .inspect(move |events| {
816                events_received.emit(CountByteSize(
817                    events.len(),
818                    events.estimated_json_encoded_size_of(),
819                ))
820            });
821        let events_sent = register!(EventsSent::from(internal_event::Output(None)));
822        let output_id = Arc::new(OutputId {
823            component: key.clone(),
824            port: None,
825        });
826        let latency_recorder = LatencyRecorder::new(self.config.global.latency_ewma_alpha);
827
828        // Task transforms can only write to the default output, so only a single schema def map is needed
829        let schema_definition_map = outputs
830            .iter()
831            .find(|x| x.port.is_none())
832            .expect("output for default port required for task transforms")
833            .log_schema_definitions
834            .clone()
835            .into_iter()
836            .map(|(key, value)| (key, Arc::new(value)))
837            .collect();
838
839        let stream = t
840            .transform(Box::pin(filtered))
841            .map(move |mut events| {
842                for event in events.iter_events_mut() {
843                    update_runtime_schema_definition(event, &output_id, &schema_definition_map);
844                }
845                let now = Instant::now();
846                latency_recorder.on_send(&mut events, now);
847                (events, now)
848            })
849            .inspect(move |(events, _): &(EventArray, Instant)| {
850                events_sent.emit(CountByteSize(
851                    events.len(),
852                    events.estimated_json_encoded_size_of(),
853                ));
854            });
855        let stream = OutputUtilization::new(output_sender, stream);
856        let transform = async move {
857            debug!("Task transform starting.");
858
859            match fanout.send_stream(stream).await {
860                Ok(()) => {
861                    debug!("Task transform finished normally.");
862                    Ok(TaskOutput::Transform)
863                }
864                Err(e) => {
865                    debug!("Task transform finished with an error.");
866                    Err(TaskError::wrapped(e))
867                }
868            }
869        }
870        .boxed();
871
872        let mut outputs = HashMap::new();
873        outputs.insert(OutputId::from(key), control);
874
875        let task = Task::new(key.clone(), typetag, transform);
876
877        (task, outputs)
878    }
879}
880
881async fn run_source_output_pump(
882    mut rx: LimitedReceiver<SourceSenderItem>,
883    mut fanout: Fanout,
884    source: Arc<ComponentKey>,
885    source_type: &'static str,
886) -> TaskResult {
887    debug!("Source pump starting.");
888
889    let mut control_channel_open = true;
890    loop {
891        tokio::select! {
892            biased;
893            // Process control messages (e.g. Remove/Pause) even when the source
894            // is idle, so that config reloads can proceed without waiting for the
895            // next event.
896            alive = fanout.recv_control_message(), if control_channel_open => {
897                control_channel_open = alive;
898            }
899            item = rx.next() => {
900                match item {
901                    Some(SourceSenderItem { events: mut array, send_reference }) => {
902                        // Even though we have a `send_reference` timestamp above, that reference
903                        // time is when the events were enqueued in the `SourceSender`, not when
904                        // they were pulled out of the `rx` stream on this end. Since those times
905                        // can be quite different (due to blocking inherent to the fanout send
906                        // operation), we set the `last_transform_timestamp` to the current time
907                        // instead to get an accurate reference for when the events started
908                        // waiting for the first transform.
909                        let now = Instant::now();
910                        array.for_each_metadata_mut(|metadata| {
911                            metadata.set_source_id(Arc::clone(&source));
912                            metadata.set_source_type(source_type);
913                            metadata.set_last_transform_timestamp(now);
914                        });
915                        fanout
916                            .send(array, Some(send_reference))
917                            .await
918                            .map_err(|e| {
919                                debug!("Source pump finished with an error.");
920                                TaskError::wrapped(e)
921                            })?;
922                    }
923                    None => break,
924                }
925            }
926        }
927    }
928
929    debug!("Source pump finished normally.");
930    Ok(TaskOutput::Source)
931}
932
933pub async fn reload_enrichment_tables(config: &Config) {
934    let mut enrichment_tables = HashMap::new();
935    // Build enrichment tables
936    'tables: for (name, table_outer) in config.enrichment_tables.iter() {
937        let table_name = name.to_string();
938        if ENRICHMENT_TABLES.needs_reload(&table_name) {
939            let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
940
941            let mut table = match table_outer.inner.build(&config.global).await {
942                Ok(table) => table,
943                Err(error) => {
944                    error!("Enrichment table \"{name}\" reload failed: {error}");
945                    continue;
946                }
947            };
948
949            if let Some(indexes) = indexes {
950                for (case, index) in indexes {
951                    match table
952                        .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
953                    {
954                        Ok(_) => (),
955                        Err(error) => {
956                            // If there is an error adding an index we do not want to use the reloaded
957                            // data, the previously loaded data will still need to be used.
958                            // Just report the error and continue.
959                            error!(
960                                message = "Unable to add index to reloaded enrichment table.",
961                                table = ?name.to_string(),
962                                %error
963                            );
964                            continue 'tables;
965                        }
966                    }
967                }
968            }
969
970            enrichment_tables.insert(table_name, table);
971        }
972    }
973
974    ENRICHMENT_TABLES.load(enrichment_tables);
975    ENRICHMENT_TABLES.finish_load();
976}
977
978pub struct TopologyPieces {
979    pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
980    pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
981    pub(super) tasks: HashMap<ComponentKey, Task>,
982    pub(crate) source_tasks: HashMap<ComponentKey, Task>,
983    pub(super) healthchecks: HashMap<ComponentKey, Task>,
984    pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
985    pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
986    pub(crate) metrics_storage: MetricsStorage,
987    pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
988}
989
990/// Builder for constructing TopologyPieces with a fluent API.
991///
992/// # Examples
993///
994/// ```ignore
995/// let pieces = TopologyPiecesBuilder::new(&config, &diff)
996///     .with_buffers(buffers)
997///     .with_extra_context(extra_context)
998///     .build()
999///     .await?;
1000/// ```
1001pub struct TopologyPiecesBuilder<'a> {
1002    config: &'a Config,
1003    diff: &'a ConfigDiff,
1004    buffers: HashMap<ComponentKey, BuiltBuffer>,
1005    extra_context: ExtraContext,
1006    utilization_registry: Option<UtilizationRegistry>,
1007}
1008
1009impl<'a> TopologyPiecesBuilder<'a> {
1010    /// Creates a new builder with required parameters.
1011    pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
1012        Self {
1013            config,
1014            diff,
1015            buffers: HashMap::new(),
1016            extra_context: ExtraContext::default(),
1017            utilization_registry: None,
1018        }
1019    }
1020
1021    /// Sets the buffers for the topology.
1022    pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
1023        self.buffers = buffers;
1024        self
1025    }
1026
1027    /// Sets the extra context for the topology.
1028    pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
1029        self.extra_context = extra_context;
1030        self
1031    }
1032
1033    /// Sets the utilization registry for the topology.
1034    pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
1035        self.utilization_registry = registry;
1036        self
1037    }
1038
1039    /// Builds the topology pieces, returning errors if any occur.
1040    ///
1041    /// Use this method when you need to handle errors explicitly,
1042    /// such as in tests or validation code.
1043    pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
1044        Builder::new(
1045            self.config,
1046            self.diff,
1047            self.buffers,
1048            self.extra_context,
1049            self.utilization_registry,
1050        )
1051        .build()
1052        .await
1053    }
1054
1055    /// Builds the topology pieces, logging any errors that occur.
1056    ///
1057    /// Use this method for runtime configuration loading where
1058    /// errors should be logged and execution should continue.
1059    pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
1060        match self.build().await {
1061            Err(errors) => {
1062                for error in errors {
1063                    error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
1064                }
1065                None
1066            }
1067            Ok(new_pieces) => Some(new_pieces),
1068        }
1069    }
1070}
1071
1072impl TopologyPieces {
1073    pub async fn build_or_log_errors(
1074        config: &Config,
1075        diff: &ConfigDiff,
1076        buffers: HashMap<ComponentKey, BuiltBuffer>,
1077        extra_context: ExtraContext,
1078        utilization_registry: Option<UtilizationRegistry>,
1079    ) -> Option<Self> {
1080        TopologyPiecesBuilder::new(config, diff)
1081            .with_buffers(buffers)
1082            .with_extra_context(extra_context)
1083            .with_utilization_registry(utilization_registry)
1084            .build_or_log_errors()
1085            .await
1086    }
1087
1088    /// Builds only the new pieces, and doesn't check their topology.
1089    pub async fn build(
1090        config: &super::Config,
1091        diff: &ConfigDiff,
1092        buffers: HashMap<ComponentKey, BuiltBuffer>,
1093        extra_context: ExtraContext,
1094        utilization_registry: Option<UtilizationRegistry>,
1095    ) -> Result<Self, Vec<String>> {
1096        TopologyPiecesBuilder::new(config, diff)
1097            .with_buffers(buffers)
1098            .with_extra_context(extra_context)
1099            .with_utilization_registry(utilization_registry)
1100            .build()
1101            .await
1102    }
1103}
1104
1105const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
1106    match events {
1107        EventArray::Logs(_) => data_type.contains(DataType::Log),
1108        EventArray::Metrics(_) => data_type.contains(DataType::Metric),
1109        EventArray::Traces(_) => data_type.contains(DataType::Trace),
1110    }
1111}
1112
1113#[derive(Debug, Clone)]
1114struct TransformNode {
1115    key: ComponentKey,
1116    typetag: &'static str,
1117    inputs: Inputs<OutputId>,
1118    input_details: Input,
1119    outputs: Vec<TransformOutput>,
1120    enable_concurrency: bool,
1121}
1122
1123impl TransformNode {
1124    pub fn from_parts(
1125        key: ComponentKey,
1126        context: &TransformContext,
1127        transform: &TransformOuter<OutputId>,
1128        schema_definition: &[(OutputId, Definition)],
1129    ) -> Self {
1130        Self {
1131            key,
1132            typetag: transform.inner.get_component_name(),
1133            inputs: transform.inputs.clone(),
1134            input_details: transform.inner.input(),
1135            outputs: transform.inner.outputs(context, schema_definition),
1136            enable_concurrency: transform.inner.enable_concurrency(),
1137        }
1138    }
1139}
1140
1141struct Runner {
1142    transform: Box<dyn SyncTransform>,
1143    input_rx: Option<BufferReceiver<EventArray>>,
1144    input_type: DataType,
1145    outputs: TransformOutputs,
1146    timer_tx: UtilizationComponentSender,
1147    latency_recorder: LatencyRecorder,
1148    events_received: Registered<EventsReceived>,
1149}
1150
1151impl Runner {
1152    fn new(
1153        transform: Box<dyn SyncTransform>,
1154        input_rx: BufferReceiver<EventArray>,
1155        timer_tx: UtilizationComponentSender,
1156        input_type: DataType,
1157        outputs: TransformOutputs,
1158        latency_recorder: LatencyRecorder,
1159    ) -> Self {
1160        Self {
1161            transform,
1162            input_rx: Some(input_rx),
1163            input_type,
1164            outputs,
1165            timer_tx,
1166            latency_recorder,
1167            events_received: register!(EventsReceived),
1168        }
1169    }
1170
1171    fn on_events_received(&mut self, events: &EventArray) {
1172        self.timer_tx.try_send_stop_wait();
1173
1174        self.events_received.emit(CountByteSize(
1175            events.len(),
1176            events.estimated_json_encoded_size_of(),
1177        ));
1178    }
1179
1180    async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1181        self.timer_tx.try_send_start_wait();
1182        let now = Instant::now();
1183        outputs_buf.for_each_array_mut(|array| self.latency_recorder.on_send(array, now));
1184        self.outputs.send(outputs_buf).await
1185    }
1186
1187    async fn run_inline(mut self) -> TaskResult {
1188        // 128 is an arbitrary, smallish constant
1189        const INLINE_BATCH_SIZE: usize = 128;
1190
1191        let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1192
1193        let mut input_rx = self
1194            .input_rx
1195            .take()
1196            .expect("can't run runner twice")
1197            .into_stream()
1198            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1199
1200        self.timer_tx.try_send_start_wait();
1201        while let Some(events) = input_rx.next().await {
1202            self.on_events_received(&events);
1203            self.transform.transform_all(events, &mut outputs_buf);
1204            self.send_outputs(&mut outputs_buf)
1205                .await
1206                .map_err(TaskError::wrapped)?;
1207        }
1208
1209        Ok(TaskOutput::Transform)
1210    }
1211
1212    async fn run_concurrently(mut self) -> TaskResult {
1213        let input_rx = self
1214            .input_rx
1215            .take()
1216            .expect("can't run runner twice")
1217            .into_stream()
1218            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1219
1220        let mut input_rx =
1221            super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1222
1223        let mut in_flight = FuturesOrdered::new();
1224        let mut shutting_down = false;
1225
1226        self.timer_tx.try_send_start_wait();
1227        loop {
1228            tokio::select! {
1229                biased;
1230
1231                result = in_flight.next(), if !in_flight.is_empty() => {
1232                    match result {
1233                        Some(Ok(mut outputs_buf)) => {
1234                            self.send_outputs(&mut outputs_buf).await
1235                                .map_err(TaskError::wrapped)?;
1236                        }
1237                        _ => unreachable!("join error or bad poll"),
1238                    }
1239                }
1240
1241                input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1242                    match input_arrays {
1243                        Some(input_arrays) => {
1244                            let mut len = 0;
1245                            for events in &input_arrays {
1246                                self.on_events_received(events);
1247                                len += events.len();
1248                            }
1249
1250                            let mut t = self.transform.clone();
1251                            let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1252                            let task = tokio::spawn(async move {
1253                                for events in input_arrays {
1254                                    t.transform_all(events, &mut outputs_buf);
1255                                }
1256                                outputs_buf
1257                            }.in_current_span());
1258                            in_flight.push_back(task);
1259                        }
1260                        None => {
1261                            shutting_down = true;
1262                            continue
1263                        }
1264                    }
1265                }
1266
1267                else => {
1268                    if shutting_down {
1269                        break
1270                    }
1271                }
1272            }
1273        }
1274
1275        Ok(TaskOutput::Transform)
1276    }
1277}