1use std::{
2 collections::{HashMap, HashSet},
3 sync::{Arc, Mutex},
4};
5
6use futures::{Future, FutureExt, future};
7use snafu::Snafu;
8use stream_cancel::Trigger;
9use tokio::{
10 sync::{mpsc, watch},
11 time::{Duration, Instant, interval, sleep_until},
12};
13use tracing::Instrument;
14use vector_lib::{
15 buffers::topology::channel::BufferSender,
16 shutdown::ShutdownSignal,
17 tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
18 trigger::DisabledTrigger,
19};
20
21use super::{
22 BuiltBuffer, TaskHandle,
23 builder::{self, TopologyPieces, TopologyPiecesBuilder, reload_enrichment_tables},
24 fanout::{ControlChannel, ControlMessage},
25 handle_errors, retain, take_healthchecks,
26 task::{Task, TaskOutput},
27};
28use crate::{
29 config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
30 event::EventArray,
31 extra_context::ExtraContext,
32 shutdown::SourceShutdownCoordinator,
33 signal::ShutdownError,
34 spawn_named,
35 utilization::UtilizationRegistry,
36};
37
38pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
39
40#[derive(Debug, Snafu)]
41pub enum ReloadError {
42 #[snafu(display("global options changed: {}", changed_fields.join(", ")))]
43 GlobalOptionsChanged { changed_fields: Vec<String> },
44 #[snafu(display("failed to compute global diff: {}", source))]
45 GlobalDiffFailed { source: serde_json::Error },
46 #[snafu(display("topology build failed"))]
47 TopologyBuildFailed,
48 #[snafu(display("failed to restore previous config"))]
49 FailedToRestore,
50}
51
52#[allow(dead_code)]
53pub struct RunningTopology {
54 inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
55 inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
56 outputs: HashMap<OutputId, ControlChannel>,
57 outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
58 component_type_names: HashMap<ComponentKey, String>,
59 source_tasks: HashMap<ComponentKey, TaskHandle>,
60 tasks: HashMap<ComponentKey, TaskHandle>,
61 shutdown_coordinator: SourceShutdownCoordinator,
62 detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
63 pub(crate) config: Config,
64 pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
65 watch: (WatchTx, WatchRx),
66 graceful_shutdown_duration: Option<Duration>,
67 utilization_registry: Option<UtilizationRegistry>,
68 utilization_task: Option<TaskHandle>,
69 utilization_task_shutdown_trigger: Option<Trigger>,
70 metrics_task: Option<TaskHandle>,
71 metrics_task_shutdown_trigger: Option<Trigger>,
72 pending_reload: Option<HashSet<ComponentKey>>,
73}
74
75impl RunningTopology {
76 pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
77 Self {
78 inputs: HashMap::new(),
79 inputs_tap_metadata: HashMap::new(),
80 outputs: HashMap::new(),
81 outputs_tap_metadata: HashMap::new(),
82 component_type_names: HashMap::new(),
83 shutdown_coordinator: SourceShutdownCoordinator::default(),
84 detach_triggers: HashMap::new(),
85 source_tasks: HashMap::new(),
86 tasks: HashMap::new(),
87 abort_tx,
88 watch: watch::channel(TapResource::default()),
89 graceful_shutdown_duration: config.graceful_shutdown_duration,
90 config,
91 utilization_registry: None,
92 utilization_task: None,
93 utilization_task_shutdown_trigger: None,
94 metrics_task: None,
95 metrics_task_shutdown_trigger: None,
96 pending_reload: None,
97 }
98 }
99
100 pub const fn config(&self) -> &Config {
102 &self.config
103 }
104
105 pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
108 match &mut self.pending_reload {
109 None => self.pending_reload = Some(new_set.clone()),
110 Some(existing) => existing.extend(new_set),
111 }
112 }
113
114 pub fn watch(&self) -> watch::Receiver<TapResource> {
118 self.watch.1.clone()
119 }
120
121 pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
129 self.shutdown_coordinator.shutdown_tripwire()
130 }
131
132 pub fn stop(self) -> impl Future<Output = ()> {
146 let mut wait_handles = Vec::new();
149 let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
152
153 let map_closure = |_result| ();
154
155 for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
158 let task = task.map(map_closure).shared();
159
160 wait_handles.push(task.clone());
161 check_handles.entry(key).or_default().push(task);
162 }
163
164 if let Some(utilization_task) = self.utilization_task {
165 wait_handles.push(utilization_task.map(map_closure).shared());
166 }
167
168 if let Some(metrics_task) = self.metrics_task {
169 wait_handles.push(metrics_task.map(map_closure).shared());
170 }
171
172 let deadline = self
174 .graceful_shutdown_duration
175 .map(|grace_period| Instant::now() + grace_period);
176
177 let timeout = if let Some(deadline) = deadline {
178 let mut check_handles2 = check_handles.clone();
182 Box::pin(async move {
183 sleep_until(deadline).await;
184 check_handles2.retain(|_key, handles| {
186 retain(handles, |handle| handle.peek().is_none());
187 !handles.is_empty()
188 });
189 let remaining_components = check_handles2
190 .keys()
191 .map(|item| item.to_string())
192 .collect::<Vec<_>>()
193 .join(", ");
194
195 error!(
196 components = ?remaining_components,
197 message = "Failed to gracefully shut down in time. Killing components.",
198 internal_log_rate_limit = false
199 );
200 }) as future::BoxFuture<'static, ()>
201 } else {
202 Box::pin(future::pending()) as future::BoxFuture<'static, ()>
203 };
204
205 let mut interval = interval(Duration::from_secs(5));
207 let reporter = async move {
208 loop {
209 interval.tick().await;
210
211 check_handles.retain(|_key, handles| {
213 retain(handles, |handle| handle.peek().is_none());
214 !handles.is_empty()
215 });
216 let remaining_components = check_handles
217 .keys()
218 .map(|item| item.to_string())
219 .collect::<Vec<_>>()
220 .join(", ");
221
222 let (deadline_passed, time_remaining) = match deadline {
223 Some(d) => match d.checked_duration_since(Instant::now()) {
224 Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
225 None => (true, "overdue".to_string()),
226 },
227 None => (false, "no time limit".to_string()),
228 };
229
230 info!(
231 remaining_components = ?remaining_components,
232 time_remaining = ?time_remaining,
233 "Shutting down... Waiting on running components."
234 );
235
236 let all_done = check_handles.is_empty();
237
238 if all_done {
239 info!("Shutdown reporter exiting: all components shut down.");
240 break;
241 } else if deadline_passed {
242 error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
243 break;
244 }
245 }
246 };
247
248 let success = futures::future::join_all(wait_handles).map(|_| ());
250
251 let shutdown_complete_future = future::select_all(vec![
253 Box::pin(timeout) as future::BoxFuture<'static, ()>,
254 Box::pin(reporter) as future::BoxFuture<'static, ()>,
255 Box::pin(success) as future::BoxFuture<'static, ()>,
256 ]);
257
258 let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
260 if let Some(trigger) = self.utilization_task_shutdown_trigger {
261 trigger.cancel();
262 }
263 if let Some(trigger) = self.metrics_task_shutdown_trigger {
264 trigger.cancel();
265 }
266
267 futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
268 }
269
270 pub async fn reload_config_and_respawn(
282 &mut self,
283 new_config: Config,
284 extra_context: ExtraContext,
285 ) -> Result<(), ReloadError> {
286 info!("Reloading running topology with new configuration.");
287
288 if self.config.global != new_config.global {
289 return match self.config.global.diff(&new_config.global) {
290 Ok(changed_fields) => Err(ReloadError::GlobalOptionsChanged { changed_fields }),
291 Err(source) => Err(ReloadError::GlobalDiffFailed { source }),
292 };
293 }
294
295 let diff = if let Some(components) = &self.pending_reload {
301 ConfigDiff::new(&self.config, &new_config, components.clone())
302 } else {
303 ConfigDiff::new(&self.config, &new_config, HashSet::new())
304 };
305 let buffers = self.shutdown_diff(&diff, &new_config).await;
306
307 if cfg!(windows) {
311 tokio::time::sleep(Duration::from_millis(200)).await;
313 }
314
315 if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
319 .with_buffers(buffers.clone())
320 .with_extra_context(extra_context.clone())
321 .with_utilization_registry(self.utilization_registry.clone())
322 .build_or_log_errors()
323 .await
324 {
325 if self
329 .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
330 .await
331 {
332 self.connect_diff(&diff, &mut new_pieces).await;
333 self.spawn_diff(&diff, new_pieces);
334 self.config = new_config;
335
336 info!("New configuration loaded successfully.");
337
338 return Ok(());
339 }
340 }
341
342 warn!("Failed to completely load new configuration. Restoring old configuration.");
346
347 let diff = diff.flip();
348 if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
349 .with_buffers(buffers)
350 .with_extra_context(extra_context.clone())
351 .with_utilization_registry(self.utilization_registry.clone())
352 .build_or_log_errors()
353 .await
354 && self
355 .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
356 .await
357 {
358 self.connect_diff(&diff, &mut new_pieces).await;
359 self.spawn_diff(&diff, new_pieces);
360
361 info!("Old configuration restored successfully.");
362
363 return Err(ReloadError::TopologyBuildFailed);
364 }
365
366 error!(
367 message = "Failed to restore old configuration.",
368 internal_log_rate_limit = false
369 );
370
371 Err(ReloadError::FailedToRestore)
372 }
373
374 pub(crate) async fn reload_enrichment_tables(&self) {
376 reload_enrichment_tables(&self.config).await;
377 }
378
379 pub(crate) async fn run_healthchecks(
380 &mut self,
381 diff: &ConfigDiff,
382 pieces: &mut TopologyPieces,
383 options: HealthcheckOptions,
384 ) -> bool {
385 if options.enabled {
386 let healthchecks = take_healthchecks(diff, pieces)
387 .into_iter()
388 .map(|(_, task)| task);
389 let healthchecks = future::try_join_all(healthchecks);
390
391 info!("Running healthchecks.");
392 if options.require_healthy {
393 let success = healthchecks.await;
394
395 if success.is_ok() {
396 info!("All healthchecks passed.");
397 true
398 } else {
399 error!(
400 message = "Sinks unhealthy.",
401 internal_log_rate_limit = false
402 );
403 false
404 }
405 } else {
406 tokio::spawn(healthchecks);
407 true
408 }
409 } else {
410 true
411 }
412 }
413
414 async fn shutdown_diff(
418 &mut self,
419 diff: &ConfigDiff,
420 new_config: &Config,
421 ) -> HashMap<ComponentKey, BuiltBuffer> {
422 if diff.sources.any_changed_or_removed() {
425 let timeout = Duration::from_secs(30);
426 let mut source_shutdown_handles = Vec::new();
427
428 let deadline = Instant::now() + timeout;
429 for key in &diff.sources.to_remove {
430 debug!(component_id = %key, "Removing source.");
431
432 let previous = self.tasks.remove(key).unwrap();
433 drop(previous); self.remove_outputs(key);
436 source_shutdown_handles
437 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
438 }
439
440 for key in &diff.sources.to_change {
441 debug!(component_id = %key, "Changing source.");
442
443 self.remove_outputs(key);
444 source_shutdown_handles
445 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
446 }
447
448 debug!(
449 "Waiting for up to {} seconds for source(s) to finish shutting down.",
450 timeout.as_secs()
451 );
452 futures::future::join_all(source_shutdown_handles).await;
453
454 for key in diff.sources.removed_and_changed() {
456 if let Some(task) = self.source_tasks.remove(key) {
457 task.await.unwrap().unwrap();
458 }
459 }
460 }
461
462 for key in &diff.transforms.to_remove {
470 debug!(component_id = %key, "Removing transform.");
471
472 let previous = self.tasks.remove(key).unwrap();
473 drop(previous); self.remove_inputs(key, diff, new_config).await;
476 self.remove_outputs(key);
477
478 if let Some(registry) = self.utilization_registry.as_ref() {
479 registry.remove_component(key);
480 }
481 }
482
483 for key in &diff.transforms.to_change {
484 debug!(component_id = %key, "Changing transform.");
485
486 self.remove_inputs(key, diff, new_config).await;
487 self.remove_outputs(key);
488 }
489
490 let removed_table_sinks = diff
496 .enrichment_tables
497 .removed_and_changed()
498 .filter_map(|key| {
499 self.config
500 .enrichment_table(key)
501 .and_then(|t| t.as_sink(key))
502 .map(|(key, s)| (key.clone(), s.resources(&key)))
503 })
504 .collect::<Vec<_>>();
505 let remove_sink = diff
506 .sinks
507 .removed_and_changed()
508 .map(|key| {
509 (
510 key,
511 self.config
512 .sink(key)
513 .map(|s| s.resources(key))
514 .unwrap_or_default(),
515 )
516 })
517 .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
518 let add_source = diff
519 .sources
520 .changed_and_added()
521 .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
522 let added_table_sinks = diff
523 .enrichment_tables
524 .changed_and_added()
525 .filter_map(|key| {
526 self.config
527 .enrichment_table(key)
528 .and_then(|t| t.as_sink(key))
529 .map(|(key, s)| (key.clone(), s.resources(&key)))
530 })
531 .collect::<Vec<_>>();
532 let add_sink = diff
533 .sinks
534 .changed_and_added()
535 .map(|key| {
536 (
537 key,
538 new_config
539 .sink(key)
540 .map(|s| s.resources(key))
541 .unwrap_or_default(),
542 )
543 })
544 .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
545 let conflicts = Resource::conflicts(
546 remove_sink.map(|(key, value)| ((true, key), value)).chain(
547 add_sink
548 .chain(add_source)
549 .map(|(key, value)| ((false, key), value)),
550 ),
551 )
552 .into_iter()
553 .flat_map(|(_, components)| components)
554 .collect::<HashSet<_>>();
555 let conflicting_sinks = conflicts
557 .into_iter()
558 .filter(|&(existing_sink, _)| existing_sink)
559 .map(|(_, key)| key.clone());
560
561 let reuse_buffers = diff
563 .sinks
564 .to_change
565 .iter()
566 .filter(|&key| {
567 if diff.components_to_reload.contains(key) {
568 return false;
569 }
570 self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
571 self.config
572 .enrichment_table(key)
573 .and_then(|t| t.as_sink(key))
574 .map(|(_, s)| s.buffer)
575 }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
576 self.config
577 .enrichment_table(key)
578 .and_then(|t| t.as_sink(key))
579 .map(|(_, s)| s.buffer)
580 })
581 })
582 .cloned()
583 .collect::<HashSet<_>>();
584
585 let changed_disk_buffer_sinks = diff
590 .sinks
591 .to_change
592 .iter()
593 .filter(|key| {
594 !reuse_buffers.contains(*key)
595 && self
596 .config
597 .sink(key)
598 .is_some_and(|s| s.buffer.has_disk_stage())
599 })
600 .cloned()
601 .collect::<HashSet<_>>();
602
603 let wait_for_sinks = conflicting_sinks
604 .chain(reuse_buffers.iter().cloned())
605 .chain(changed_disk_buffer_sinks.iter().cloned())
606 .collect::<HashSet<_>>();
607
608 let removed_sinks = diff
610 .sinks
611 .to_remove
612 .iter()
613 .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
614 self.config
615 .enrichment_table(key)
616 .and_then(|t| t.as_sink(key))
617 .is_some()
618 }))
619 .collect::<Vec<_>>();
620 for key in &removed_sinks {
621 debug!(component_id = %key, "Removing sink.");
622 self.remove_inputs(key, diff, new_config).await;
623
624 if let Some(registry) = self.utilization_registry.as_ref() {
625 registry.remove_component(key);
626 }
627 }
628
629 let mut buffer_tx = HashMap::new();
632
633 let sinks_to_change = diff
634 .sinks
635 .to_change
636 .iter()
637 .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
638 self.config
639 .enrichment_table(key)
640 .and_then(|t| t.as_sink(key))
641 .is_some()
642 }))
643 .collect::<Vec<_>>();
644
645 for key in &sinks_to_change {
646 debug!(component_id = %key, "Changing sink.");
647 if reuse_buffers.contains(key) || changed_disk_buffer_sinks.contains(key) {
648 self.detach_triggers
649 .remove(key)
650 .unwrap()
651 .into_inner()
652 .cancel();
653
654 if reuse_buffers.contains(key) {
655 buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
666 }
667 }
668 self.remove_inputs(key, diff, new_config).await;
669 }
670
671 for key in &removed_sinks {
678 let previous = self.tasks.remove(key).unwrap();
679 if wait_for_sinks.contains(key) {
680 debug!(message = "Waiting for sink to shutdown.", component_id = %key);
681 previous.await.unwrap().unwrap();
682 } else {
683 drop(previous); }
685 }
686
687 let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
688 for key in &sinks_to_change {
689 if wait_for_sinks.contains(key) {
690 let previous = self.tasks.remove(key).unwrap();
691 debug!(message = "Waiting for sink to shutdown.", component_id = %key);
692 let buffer = previous.await.unwrap().unwrap();
693
694 if reuse_buffers.contains(key) {
695 let tx = buffer_tx.remove(key).unwrap();
703 let rx = match buffer {
704 TaskOutput::Sink(rx) => rx.into_inner(),
705 _ => unreachable!(),
706 };
707
708 buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
709 }
710 }
711 }
712
713 buffers
714 }
715
716 pub(crate) async fn connect_diff(
718 &mut self,
719 diff: &ConfigDiff,
720 new_pieces: &mut TopologyPieces,
721 ) {
722 debug!("Connecting changed/added component(s).");
723
724 if !self.watch.0.is_closed() {
726 for key in &diff.sources.to_remove {
727 self.outputs_tap_metadata.remove(key);
729 self.component_type_names.remove(key);
730 }
731
732 for key in &diff.transforms.to_remove {
733 self.outputs_tap_metadata.remove(key);
735 self.inputs_tap_metadata.remove(key);
736 self.component_type_names.remove(key);
737 }
738
739 for key in &diff.sinks.to_remove {
740 self.inputs_tap_metadata.remove(key);
742 self.component_type_names.remove(key);
743 }
744
745 let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
746 self.config
747 .enrichment_table(key)
748 .and_then(|t| t.as_sink(key))
749 .is_some()
750 });
751 for key in removed_sinks {
752 self.inputs_tap_metadata.remove(key);
754 }
755
756 let removed_sources = diff.enrichment_tables.to_remove.iter().filter_map(|key| {
757 self.config
758 .enrichment_table(key)
759 .and_then(|t| t.as_source(key).map(|(key, _)| key))
760 });
761 for key in removed_sources {
762 self.outputs_tap_metadata.remove(&key);
764 }
765
766 for key in diff.sources.changed_and_added() {
767 if let Some(task) = new_pieces.tasks.get(key) {
768 let typetag = task.typetag().to_string();
769 self.outputs_tap_metadata
770 .insert(key.clone(), ("source", typetag.clone()));
771 self.component_type_names.insert(key.clone(), typetag);
772 }
773 }
774
775 for key in diff
776 .enrichment_tables
777 .changed_and_added()
778 .filter_map(|key| {
779 self.config
780 .enrichment_table(key)
781 .and_then(|t| t.as_source(key).map(|(key, _)| key))
782 })
783 {
784 if let Some(task) = new_pieces.tasks.get(&key) {
785 self.outputs_tap_metadata
786 .insert(key.clone(), ("source", task.typetag().to_string()));
787 }
788 }
789
790 for key in diff.transforms.changed_and_added() {
791 if let Some(task) = new_pieces.tasks.get(key) {
792 let typetag = task.typetag().to_string();
793 self.outputs_tap_metadata
794 .insert(key.clone(), ("transform", typetag.clone()));
795 self.component_type_names.insert(key.clone(), typetag);
796 }
797 }
798
799 for key in diff.sinks.changed_and_added() {
800 if let Some(task) = new_pieces.tasks.get(key) {
801 self.component_type_names
802 .insert(key.clone(), task.typetag().to_string());
803 }
804 }
805
806 for (key, input) in &new_pieces.inputs {
807 self.inputs_tap_metadata
808 .insert(key.clone(), input.1.clone());
809 }
810 }
811
812 for key in diff.sources.changed_and_added() {
815 debug!(component_id = %key, "Configuring outputs for source.");
816 self.setup_outputs(key, new_pieces).await;
817 }
818
819 let added_changed_table_sources: Vec<&ComponentKey> = diff
820 .enrichment_tables
821 .changed_and_added()
822 .filter(|k| new_pieces.source_tasks.contains_key(k))
823 .collect();
824 for key in added_changed_table_sources.iter() {
825 debug!(component_id = %key, "Connecting outputs for enrichment table source.");
826 self.setup_outputs(key, new_pieces).await;
827 }
828
829 for key in diff.transforms.changed_and_added() {
832 debug!(component_id = %key, "Configuring outputs for transform.");
833 self.setup_outputs(key, new_pieces).await;
834 }
835
836 for key in diff.transforms.changed_and_added() {
839 debug!(component_id = %key, "Connecting inputs for transform.");
840 self.setup_inputs(key, diff, new_pieces).await;
841 }
842
843 for key in diff.sinks.changed_and_added() {
845 debug!(component_id = %key, "Connecting inputs for sink.");
846 self.setup_inputs(key, diff, new_pieces).await;
847 }
848 let added_changed_tables: Vec<&ComponentKey> = diff
849 .enrichment_tables
850 .changed_and_added()
851 .filter(|k| new_pieces.inputs.contains_key(k))
852 .collect();
853 for key in added_changed_tables.iter() {
854 debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
855 self.setup_inputs(key, diff, new_pieces).await;
856 }
857
858 self.reattach_severed_inputs(diff);
869
870 if !self.watch.0.is_closed() {
872 let outputs = self
873 .outputs
874 .clone()
875 .into_iter()
876 .flat_map(|(output_id, control_tx)| {
877 self.outputs_tap_metadata.get(&output_id.component).map(
878 |(component_kind, component_type)| {
879 (
880 TapOutput {
881 output_id,
882 component_kind,
883 component_type: component_type.clone(),
884 },
885 control_tx,
886 )
887 },
888 )
889 })
890 .collect::<HashMap<_, _>>();
891
892 let mut removals = diff.sources.to_remove.clone();
893 removals.extend(diff.transforms.to_remove.iter().cloned());
894 self.watch
895 .0
896 .send(TapResource {
897 outputs,
898 inputs: self.inputs_tap_metadata.clone(),
899 source_keys: diff
900 .sources
901 .changed_and_added()
902 .map(|key| key.to_string())
903 .chain(
904 added_changed_table_sources
905 .iter()
906 .map(|key| key.to_string()),
907 )
908 .collect(),
909 sink_keys: diff
910 .sinks
911 .changed_and_added()
912 .map(|key| key.to_string())
913 .chain(added_changed_tables.iter().map(|key| key.to_string()))
914 .collect(),
915 removals,
918 type_names: self
919 .component_type_names
920 .iter()
921 .map(|(k, v)| (k.to_string(), v.clone()))
922 .collect(),
923 })
924 .expect("Couldn't broadcast config changes.");
925 }
926 }
927
928 async fn setup_outputs(
929 &mut self,
930 key: &ComponentKey,
931 new_pieces: &mut builder::TopologyPieces,
932 ) {
933 let outputs = new_pieces.outputs.remove(key).unwrap();
934 for (port, output) in outputs {
935 debug!(component_id = %key, output_id = ?port, "Configuring output for component.");
936
937 let id = OutputId {
938 component: key.clone(),
939 port,
940 };
941
942 self.outputs.insert(id, output);
943 }
944 }
945
946 async fn setup_inputs(
947 &mut self,
948 key: &ComponentKey,
949 diff: &ConfigDiff,
950 new_pieces: &mut builder::TopologyPieces,
951 ) {
952 let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
953
954 let old_inputs = self
955 .config
956 .inputs_for_node(key)
957 .into_iter()
958 .flatten()
959 .cloned()
960 .collect::<HashSet<_>>();
961
962 let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
963 let inputs_to_add = &new_inputs - &old_inputs;
964
965 for input in inputs {
966 let output = self.outputs.get_mut(&input).expect("unknown output");
967
968 if diff.contains(&input.component) || inputs_to_add.contains(&input) {
969 debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");
973
974 _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
975 } else {
976 debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout.");
981
982 _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
983 }
984 }
985
986 self.inputs.insert(key.clone(), tx);
987 new_pieces
988 .detach_triggers
989 .remove(key)
990 .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
991 }
992
993 fn remove_outputs(&mut self, key: &ComponentKey) {
994 self.outputs.retain(|id, _output| &id.component != key);
995 }
996
997 async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
998 self.inputs.remove(key);
999 self.detach_triggers.remove(key);
1000
1001 let old_inputs = self.config.inputs_for_node(key).expect("node exists");
1002 let new_inputs = new_config
1003 .inputs_for_node(key)
1004 .unwrap_or_default()
1005 .iter()
1006 .collect::<HashSet<_>>();
1007
1008 for input in old_inputs {
1009 if let Some(output) = self.outputs.get_mut(input) {
1010 if diff.contains(&input.component)
1011 || diff.is_removed(key)
1012 || !new_inputs.contains(input)
1013 {
1014 debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");
1025
1026 _ = output.send(ControlMessage::Remove(key.clone()));
1027 } else {
1028 debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout.");
1032
1033 _ = output.send(ControlMessage::Pause(key.clone()));
1034 }
1035 }
1036 }
1037 }
1038
1039 fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
1040 let unchanged_transforms = self
1041 .config
1042 .transforms()
1043 .filter(|(key, _)| !diff.transforms.contains(key));
1044 for (transform_key, transform) in unchanged_transforms {
1045 let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
1046 for output_id in changed_outputs {
1047 debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1048
1049 let input = self.inputs.get(transform_key).cloned().unwrap();
1050 let output = self.outputs.get_mut(&output_id).unwrap();
1051 _ = output.send(ControlMessage::Add(transform_key.clone(), input));
1052 }
1053 }
1054
1055 let unchanged_sinks = self
1056 .config
1057 .sinks()
1058 .filter(|(key, _)| !diff.sinks.contains(key));
1059 for (sink_key, sink) in unchanged_sinks {
1060 let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
1061 for output_id in changed_outputs {
1062 debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1063
1064 let input = self.inputs.get(sink_key).cloned().unwrap();
1065 let output = self.outputs.get_mut(&output_id).unwrap();
1066 _ = output.send(ControlMessage::Add(sink_key.clone(), input));
1067 }
1068 }
1069 }
1070
1071 pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
1073 for key in &diff.sources.to_change {
1074 debug!(message = "Spawning changed source.", component_id = %key);
1075 self.spawn_source(key, &mut new_pieces);
1076 }
1077
1078 for key in &diff.sources.to_add {
1079 debug!(message = "Spawning new source.", component_id = %key);
1080 self.spawn_source(key, &mut new_pieces);
1081 }
1082
1083 let changed_table_sources: Vec<&ComponentKey> = diff
1084 .enrichment_tables
1085 .to_change
1086 .iter()
1087 .filter(|k| new_pieces.source_tasks.contains_key(k))
1088 .collect();
1089
1090 let added_table_sources: Vec<&ComponentKey> = diff
1091 .enrichment_tables
1092 .to_add
1093 .iter()
1094 .filter(|k| new_pieces.source_tasks.contains_key(k))
1095 .collect();
1096
1097 for key in changed_table_sources {
1098 debug!(message = "Spawning changed enrichment table source.", component_id = %key);
1099 self.spawn_source(key, &mut new_pieces);
1100 }
1101
1102 for key in added_table_sources {
1103 debug!(message = "Spawning new enrichment table source.", component_id = %key);
1104 self.spawn_source(key, &mut new_pieces);
1105 }
1106
1107 for key in &diff.transforms.to_change {
1108 debug!(message = "Spawning changed transform.", component_id = %key);
1109 self.spawn_transform(key, &mut new_pieces);
1110 }
1111
1112 for key in &diff.transforms.to_add {
1113 debug!(message = "Spawning new transform.", component_id = %key);
1114 self.spawn_transform(key, &mut new_pieces);
1115 }
1116
1117 for key in &diff.sinks.to_change {
1118 debug!(message = "Spawning changed sink.", component_id = %key);
1119 self.spawn_sink(key, &mut new_pieces);
1120 }
1121
1122 for key in &diff.sinks.to_add {
1123 trace!(message = "Spawning new sink.", component_id = %key);
1124 self.spawn_sink(key, &mut new_pieces);
1125 }
1126
1127 let changed_tables: Vec<&ComponentKey> = diff
1128 .enrichment_tables
1129 .to_change
1130 .iter()
1131 .filter(|k| {
1132 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1133 })
1134 .collect();
1135
1136 let added_tables: Vec<&ComponentKey> = diff
1137 .enrichment_tables
1138 .to_add
1139 .iter()
1140 .filter(|k| {
1141 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1142 })
1143 .collect();
1144
1145 for key in changed_tables {
1146 debug!(message = "Spawning changed enrichment table sink.", component_id = %key);
1147 self.spawn_sink(key, &mut new_pieces);
1148 }
1149
1150 for key in added_tables {
1151 debug!(message = "Spawning enrichment table new sink.", component_id = %key);
1152 self.spawn_sink(key, &mut new_pieces);
1153 }
1154 }
1155
1156 fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1157 let task = new_pieces.tasks.remove(key).unwrap();
1158 let span = error_span!(
1159 "sink",
1160 component_kind = "sink",
1161 component_id = %task.id(),
1162 component_type = %task.typetag(),
1163 );
1164
1165 let task_span = span.or_current();
1166 #[cfg(feature = "allocation-tracing")]
1167 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1168 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1169 task.id().to_string(),
1170 "sink".to_string(),
1171 task.typetag().to_string(),
1172 );
1173 debug!(
1174 component_kind = "sink",
1175 component_type = task.typetag(),
1176 component_id = task.id(),
1177 group_id = group_id.as_raw().to_string(),
1178 "Registered new allocation group."
1179 );
1180 group_id.attach_to_span(&task_span);
1181 }
1182
1183 let task_name = format!(">> {} ({})", task.typetag(), task.id());
1184 let task = {
1185 let key = key.clone();
1186 handle_errors(task, self.abort_tx.clone(), |error| {
1187 ShutdownError::SinkAborted { key, error }
1188 })
1189 }
1190 .instrument(task_span);
1191 let spawned = spawn_named(task, task_name.as_ref());
1192 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1193 drop(previous); }
1195 }
1196
1197 fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1198 let task = new_pieces.tasks.remove(key).unwrap();
1199 let span = error_span!(
1200 "transform",
1201 component_kind = "transform",
1202 component_id = %task.id(),
1203 component_type = %task.typetag(),
1204 );
1205
1206 let task_span = span.or_current();
1207 #[cfg(feature = "allocation-tracing")]
1208 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1209 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1210 task.id().to_string(),
1211 "transform".to_string(),
1212 task.typetag().to_string(),
1213 );
1214 debug!(
1215 component_kind = "transform",
1216 component_type = task.typetag(),
1217 component_id = task.id(),
1218 group_id = group_id.as_raw().to_string(),
1219 "Registered new allocation group."
1220 );
1221 group_id.attach_to_span(&task_span);
1222 }
1223
1224 let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1225 let task = {
1226 let key = key.clone();
1227 handle_errors(task, self.abort_tx.clone(), |error| {
1228 ShutdownError::TransformAborted { key, error }
1229 })
1230 }
1231 .instrument(task_span);
1232 let spawned = spawn_named(task, task_name.as_ref());
1233 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1234 drop(previous); }
1236 }
1237
1238 fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1239 let task = new_pieces.tasks.remove(key).unwrap();
1240 let span = error_span!(
1241 "source",
1242 component_kind = "source",
1243 component_id = %task.id(),
1244 component_type = %task.typetag(),
1245 );
1246
1247 let task_span = span.or_current();
1248 #[cfg(feature = "allocation-tracing")]
1249 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1250 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1251 task.id().to_string(),
1252 "source".to_string(),
1253 task.typetag().to_string(),
1254 );
1255
1256 debug!(
1257 component_kind = "source",
1258 component_type = task.typetag(),
1259 component_id = task.id(),
1260 group_id = group_id.as_raw().to_string(),
1261 "Registered new allocation group."
1262 );
1263 group_id.attach_to_span(&task_span);
1264 }
1265
1266 let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1267 let task = {
1268 let key = key.clone();
1269 handle_errors(task, self.abort_tx.clone(), |error| {
1270 ShutdownError::SourceAborted { key, error }
1271 })
1272 }
1273 .instrument(task_span.clone());
1274 let spawned = spawn_named(task, task_name.as_ref());
1275 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1276 drop(previous); }
1278
1279 self.shutdown_coordinator
1280 .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1281
1282 let source_task = new_pieces.source_tasks.remove(key).unwrap();
1284 let source_task = {
1285 let key = key.clone();
1286 handle_errors(source_task, self.abort_tx.clone(), |error| {
1287 ShutdownError::SourceAborted { key, error }
1288 })
1289 }
1290 .instrument(task_span);
1291 self.source_tasks
1292 .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1293 }
1294
1295 pub async fn start_init_validated(
1296 config: Config,
1297 extra_context: ExtraContext,
1298 ) -> Option<(Self, ShutdownErrorReceiver)> {
1299 let diff = ConfigDiff::initial(&config);
1300 let pieces = TopologyPiecesBuilder::new(&config, &diff)
1301 .with_extra_context(extra_context)
1302 .build_or_log_errors()
1303 .await?;
1304 Self::start_validated(config, diff, pieces).await
1305 }
1306
1307 pub async fn start_validated(
1308 config: Config,
1309 diff: ConfigDiff,
1310 mut pieces: TopologyPieces,
1311 ) -> Option<(Self, ShutdownErrorReceiver)> {
1312 let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1313
1314 let expire_metrics = match (
1315 config.global.expire_metrics,
1316 config.global.expire_metrics_secs,
1317 ) {
1318 (Some(e), None) => {
1319 warn!(
1320 "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1321 );
1322 if e < Duration::from_secs(0) {
1323 None
1324 } else {
1325 Some(e.as_secs_f64())
1326 }
1327 }
1328 (Some(_), Some(_)) => {
1329 error!(
1330 message = "Cannot set both `expire_metrics` and `expire_metrics_secs`.",
1331 internal_log_rate_limit = false
1332 );
1333 return None;
1334 }
1335 (None, Some(e)) => {
1336 if e < 0f64 {
1337 None
1338 } else {
1339 Some(e)
1340 }
1341 }
1342 (None, None) => Some(300f64),
1343 };
1344
1345 if let Err(error) = crate::metrics::Controller::get()
1346 .expect("Metrics must be initialized")
1347 .set_expiry(
1348 expire_metrics,
1349 config
1350 .global
1351 .expire_metrics_per_metric_set
1352 .clone()
1353 .unwrap_or_default(),
1354 )
1355 {
1356 error!(message = "Invalid metrics expiry.", %error, internal_log_rate_limit = false);
1357 return None;
1358 }
1359
1360 let (utilization_emitter, utilization_registry) = pieces
1361 .utilization
1362 .take()
1363 .expect("Topology is missing the utilization metric emitter!");
1364 let metrics_storage = pieces.metrics_storage.clone();
1365 let metrics_refresh_period = config
1366 .global
1367 .metrics_storage_refresh_period
1368 .map(Duration::from_secs_f64);
1369 let mut running_topology = Self::new(config, abort_tx);
1370
1371 if !running_topology
1372 .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1373 .await
1374 {
1375 return None;
1376 }
1377 running_topology.connect_diff(&diff, &mut pieces).await;
1378 running_topology.spawn_diff(&diff, pieces);
1379
1380 let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1381 ShutdownSignal::new_wired();
1382 running_topology.utilization_registry = Some(utilization_registry.clone());
1383 running_topology.utilization_task_shutdown_trigger =
1384 Some(utilization_task_shutdown_trigger);
1385 running_topology.utilization_task = Some(tokio::spawn(Task::new(
1386 "utilization_heartbeat".into(),
1387 "",
1388 async move {
1389 utilization_emitter
1390 .run_utilization(utilization_shutdown_signal)
1391 .await;
1392 Ok(TaskOutput::Healthcheck)
1393 },
1394 )));
1395 if let Some(metrics_refresh_period) = metrics_refresh_period {
1396 let (metrics_task_shutdown_trigger, metrics_shutdown_signal, _) =
1397 ShutdownSignal::new_wired();
1398 running_topology.metrics_task_shutdown_trigger = Some(metrics_task_shutdown_trigger);
1399 running_topology.metrics_task = Some(tokio::spawn(Task::new(
1400 "metrics_heartbeat".into(),
1401 "",
1402 async move {
1403 metrics_storage
1404 .run_periodic_refresh(metrics_refresh_period, metrics_shutdown_signal)
1405 .await;
1406 Ok(TaskOutput::Healthcheck)
1407 },
1408 )));
1409 }
1410
1411 Some((running_topology, abort_rx))
1412 }
1413}
1414
1415fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1416 let mut changed_outputs = Vec::new();
1417
1418 for source_key in &diff.sources.to_change {
1419 changed_outputs.extend(
1420 output_ids
1421 .iter()
1422 .filter(|id| &id.component == source_key)
1423 .cloned(),
1424 );
1425 }
1426
1427 for transform_key in &diff.transforms.to_change {
1428 changed_outputs.extend(
1429 output_ids
1430 .iter()
1431 .filter(|id| &id.component == transform_key)
1432 .cloned(),
1433 );
1434 }
1435
1436 changed_outputs
1437}