vector/topology/
controller.rs

1use std::sync::Arc;
2
3use futures_util::FutureExt as _;
4use tokio::sync::{Mutex, MutexGuard};
5
6#[cfg(feature = "api")]
7use crate::api;
8use crate::{
9    config,
10    extra_context::ExtraContext,
11    internal_events::{VectorRecoveryError, VectorReloadError, VectorReloaded},
12    signal::ShutdownError,
13    topology::{ReloadError, RunningTopology},
14};
15
16#[derive(Clone, Debug)]
17pub struct SharedTopologyController(Arc<Mutex<TopologyController>>);
18
19impl SharedTopologyController {
20    pub fn new(inner: TopologyController) -> Self {
21        Self(Arc::new(Mutex::new(inner)))
22    }
23
24    pub fn blocking_lock(&self) -> MutexGuard<'_, TopologyController> {
25        self.0.blocking_lock()
26    }
27
28    pub async fn lock(&self) -> MutexGuard<'_, TopologyController> {
29        self.0.lock().await
30    }
31
32    pub fn try_into_inner(self) -> Result<Mutex<TopologyController>, Self> {
33        Arc::try_unwrap(self.0).map_err(Self)
34    }
35}
36
37pub struct TopologyController {
38    pub topology: RunningTopology,
39    pub config_paths: Vec<config::ConfigPath>,
40    pub require_healthy: Option<bool>,
41    #[cfg(feature = "api")]
42    pub api_server: Option<api::GrpcServer>,
43    pub extra_context: ExtraContext,
44}
45
46impl std::fmt::Debug for TopologyController {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("TopologyController")
49            .field("config_paths", &self.config_paths)
50            .field("require_healthy", &self.require_healthy)
51            .finish()
52    }
53}
54
55#[derive(Clone, Debug)]
56pub enum ReloadOutcome {
57    MissingApiKey,
58    Success,
59    RolledBack,
60    FatalError(ShutdownError),
61}
62
63impl TopologyController {
64    pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome {
65        new_config
66            .healthchecks
67            .set_require_healthy(self.require_healthy);
68
69        // Start the api server or disable it, if necessary
70        #[cfg(feature = "api")]
71        if !new_config.api.enabled {
72            if let Some(server) = self.api_server.take() {
73                debug!("Stopping gRPC API server.");
74                drop(server);
75            }
76        } else if self.api_server.is_none() {
77            debug!("Starting gRPC API server.");
78
79            match api::GrpcServer::start(self.topology.config(), self.topology.watch()).await {
80                Ok(api_server) => {
81                    let addr = api_server.addr();
82                    info!(
83                        message = "GRPC API server started.",
84                        addr = %addr,
85                    );
86                    self.api_server = Some(api_server);
87                }
88                Err(error) => {
89                    let error_string = error.to_string();
90                    error!(
91                        message = "An error occurred that Vector couldn't handle.",
92                        error = %error_string,
93                        internal_log_rate_limit = false,
94                    );
95                    // Fail fast when API is explicitly enabled but fails to start
96                    // This ensures users don't run with a broken configuration thinking top/tap will work
97                    return ReloadOutcome::FatalError(ShutdownError::ApiFailed {
98                        error: error_string,
99                    });
100                }
101            }
102        }
103
104        match self
105            .topology
106            .reload_config_and_respawn(new_config, self.extra_context.clone())
107            .await
108        {
109            Ok(()) => {
110                emit!(VectorReloaded {
111                    config_paths: &self.config_paths
112                });
113                ReloadOutcome::Success
114            }
115            Err(ReloadError::GlobalOptionsChanged { changed_fields }) => {
116                error!(
117                    message = "Config reload rejected due to non-reloadable global options.",
118                    changed_fields = %changed_fields.join(", "),
119                    internal_log_rate_limit = false,
120                );
121                emit!(VectorReloadError {
122                    reason: "global_options_changed",
123                });
124                ReloadOutcome::RolledBack
125            }
126            Err(ReloadError::GlobalDiffFailed { source }) => {
127                error!(
128                    message = "Config reload rejected because computing global diff failed.",
129                    error = %source,
130                    internal_log_rate_limit = false,
131                );
132                emit!(VectorReloadError {
133                    reason: "global_diff_failed",
134                });
135                ReloadOutcome::RolledBack
136            }
137            Err(ReloadError::TopologyBuildFailed) => {
138                emit!(VectorReloadError {
139                    reason: "topology_build_failed",
140                });
141                ReloadOutcome::RolledBack
142            }
143            Err(ReloadError::FailedToRestore) => {
144                emit!(VectorReloadError {
145                    reason: "restore_failed",
146                });
147                emit!(VectorRecoveryError);
148                ReloadOutcome::FatalError(ShutdownError::ReloadFailedToRestore)
149            }
150        }
151    }
152
153    #[cfg_attr(not(feature = "api"), allow(unused_mut))]
154    pub async fn stop(mut self) {
155        // Phase 1: Mark the gRPC API as unavailable so that external probes
156        // (e.g. Kubernetes readiness) fail early and stop routing traffic
157        // to this instance.
158        #[cfg(feature = "api")]
159        if let Some(server) = self.api_server.as_mut() {
160            server.set_not_serving().await;
161        }
162
163        // Phase 2: Drain the topology -- shuts down sources, waits for
164        // in-flight events to flush through transforms and sinks.
165        self.topology.stop().await;
166    }
167
168    // The `sources_finished` method on `RunningTopology` only considers sources that are currently
169    // running at the time the method is called. This presents a problem when the set of running
170    // sources can change while we are waiting on the resulting future to resolve.
171    //
172    // This function resolves that issue by waiting in two stages. The first is the usual asynchronous
173    // wait for the future to complete. When it does, we know that all of the sources that existed when
174    // the future was built have finished, but we don't know if that's because they were replaced as
175    // part of a reload (in which case we don't want to return yet). To differentiate, we acquire the
176    // lock on the topology, create a new future, and check whether it resolves immediately or not. If
177    // it does resolve, we know all sources are truly finished because we held the lock during the
178    // check, preventing anyone else from adding new sources. If it does not resolve, that indicates
179    // that new sources have been added since our original call and we should start the process over to
180    // continue waiting.
181    pub async fn sources_finished(mutex: SharedTopologyController) {
182        loop {
183            // Do an initial async wait while the topology is running, making sure not the hold the
184            // mutex lock while we wait on sources to finish.
185            let initial = {
186                let tc = mutex.lock().await;
187                tc.topology.sources_finished()
188            };
189            initial.await;
190
191            // Once the initial signal is tripped, hold lock on the topology while checking again. This
192            // ensures that no other task is adding new sources.
193            let top = mutex.lock().await;
194            if top.topology.sources_finished().now_or_never().is_some() {
195                return;
196            } else {
197                continue;
198            }
199        }
200    }
201}