vector_tap/
topology.rs

1use std::collections::{HashMap, HashSet};
2
3use tokio::sync::watch;
4use vector_common::{config::ComponentKey, id::Inputs};
5use vector_core::{config::OutputId, fanout};
6
7/// A tappable output consisting of an output ID and associated metadata
8#[derive(Debug, Clone, Hash, PartialEq, Eq)]
9pub struct TapOutput {
10    pub output_id: OutputId,
11    pub component_kind: &'static str,
12    pub component_type: String,
13}
14
15/// Resources used by the `tap` API to monitor component inputs and outputs,
16/// updated alongside the topology
17#[derive(Debug, Default, Clone)]
18pub struct TapResource {
19    // Outputs and their corresponding Fanout control
20    pub outputs: HashMap<TapOutput, fanout::ControlChannel>,
21    // Components (transforms, sinks) and their corresponding inputs
22    pub inputs: HashMap<ComponentKey, Inputs<OutputId>>,
23    // Source component keys used to warn against invalid pattern matches
24    pub source_keys: Vec<String>,
25    // Sink component keys used to warn against invalid pattern matches
26    pub sink_keys: Vec<String>,
27    // Components removed on a reload (used to drop TapSinks)
28    pub removals: HashSet<ComponentKey>,
29    // Plugin type name for every component (e.g. "demo_logs", "remap", "console")
30    pub type_names: HashMap<String, String>,
31}
32
33impl TapResource {
34    /// Returns each component's output port names, derived from the full `outputs` snapshot.
35    ///
36    /// Keys are all components that have at least one output (sources and transforms).
37    /// The port value is `None` for the default output and `Some(name)` for named ports.
38    pub fn output_ports_by_component(&self) -> HashMap<&ComponentKey, Vec<Option<&str>>> {
39        let mut map: HashMap<&ComponentKey, Vec<Option<&str>>> = HashMap::new();
40        for tap_output in self.outputs.keys() {
41            map.entry(&tap_output.output_id.component)
42                .or_default()
43                .push(tap_output.output_id.port.as_deref());
44        }
45        map
46    }
47}
48
49// Watcher types for topology changes.
50pub type WatchTx = watch::Sender<TapResource>;
51pub type WatchRx = watch::Receiver<TapResource>;