vector/topology/
controller.rs1use std::sync::Arc;
2
3use futures_util::FutureExt as _;
4use tokio::sync::{Mutex, MutexGuard};
5
6#[cfg(feature = "api")]
7use crate::api;
8use crate::{
9 config,
10 extra_context::ExtraContext,
11 internal_events::{VectorRecoveryError, VectorReloadError, VectorReloaded},
12 signal::ShutdownError,
13 topology::{ReloadError, RunningTopology},
14};
15
16#[derive(Clone, Debug)]
17pub struct SharedTopologyController(Arc<Mutex<TopologyController>>);
18
19impl SharedTopologyController {
20 pub fn new(inner: TopologyController) -> Self {
21 Self(Arc::new(Mutex::new(inner)))
22 }
23
24 pub fn blocking_lock(&self) -> MutexGuard<'_, TopologyController> {
25 self.0.blocking_lock()
26 }
27
28 pub async fn lock(&self) -> MutexGuard<'_, TopologyController> {
29 self.0.lock().await
30 }
31
32 pub fn try_into_inner(self) -> Result<Mutex<TopologyController>, Self> {
33 Arc::try_unwrap(self.0).map_err(Self)
34 }
35}
36
37pub struct TopologyController {
38 pub topology: RunningTopology,
39 pub config_paths: Vec<config::ConfigPath>,
40 pub require_healthy: Option<bool>,
41 #[cfg(feature = "api")]
42 pub api_server: Option<api::GrpcServer>,
43 pub extra_context: ExtraContext,
44}
45
46impl std::fmt::Debug for TopologyController {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("TopologyController")
49 .field("config_paths", &self.config_paths)
50 .field("require_healthy", &self.require_healthy)
51 .finish()
52 }
53}
54
55#[derive(Clone, Debug)]
56pub enum ReloadOutcome {
57 MissingApiKey,
58 Success,
59 RolledBack,
60 FatalError(ShutdownError),
61}
62
63impl TopologyController {
64 pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome {
65 new_config
66 .healthchecks
67 .set_require_healthy(self.require_healthy);
68
69 #[cfg(feature = "api")]
71 if !new_config.api.enabled {
72 if let Some(server) = self.api_server.take() {
73 debug!("Stopping gRPC API server.");
74 drop(server);
75 }
76 } else if self.api_server.is_none() {
77 debug!("Starting gRPC API server.");
78
79 match api::GrpcServer::start(self.topology.config(), self.topology.watch()).await {
80 Ok(api_server) => {
81 let addr = api_server.addr();
82 info!(
83 message = "GRPC API server started.",
84 addr = %addr,
85 );
86 self.api_server = Some(api_server);
87 }
88 Err(error) => {
89 let error_string = error.to_string();
90 error!(
91 message = "An error occurred that Vector couldn't handle.",
92 error = %error_string,
93 internal_log_rate_limit = false,
94 );
95 return ReloadOutcome::FatalError(ShutdownError::ApiFailed {
98 error: error_string,
99 });
100 }
101 }
102 }
103
104 match self
105 .topology
106 .reload_config_and_respawn(new_config, self.extra_context.clone())
107 .await
108 {
109 Ok(()) => {
110 emit!(VectorReloaded {
111 config_paths: &self.config_paths
112 });
113 ReloadOutcome::Success
114 }
115 Err(ReloadError::GlobalOptionsChanged { changed_fields }) => {
116 error!(
117 message = "Config reload rejected due to non-reloadable global options.",
118 changed_fields = %changed_fields.join(", "),
119 internal_log_rate_limit = false,
120 );
121 emit!(VectorReloadError {
122 reason: "global_options_changed",
123 });
124 ReloadOutcome::RolledBack
125 }
126 Err(ReloadError::GlobalDiffFailed { source }) => {
127 error!(
128 message = "Config reload rejected because computing global diff failed.",
129 error = %source,
130 internal_log_rate_limit = false,
131 );
132 emit!(VectorReloadError {
133 reason: "global_diff_failed",
134 });
135 ReloadOutcome::RolledBack
136 }
137 Err(ReloadError::TopologyBuildFailed) => {
138 emit!(VectorReloadError {
139 reason: "topology_build_failed",
140 });
141 ReloadOutcome::RolledBack
142 }
143 Err(ReloadError::FailedToRestore) => {
144 emit!(VectorReloadError {
145 reason: "restore_failed",
146 });
147 emit!(VectorRecoveryError);
148 ReloadOutcome::FatalError(ShutdownError::ReloadFailedToRestore)
149 }
150 }
151 }
152
153 #[cfg_attr(not(feature = "api"), allow(unused_mut))]
154 pub async fn stop(mut self) {
155 #[cfg(feature = "api")]
159 if let Some(server) = self.api_server.as_mut() {
160 server.set_not_serving().await;
161 }
162
163 self.topology.stop().await;
166 }
167
168 pub async fn sources_finished(mutex: SharedTopologyController) {
182 loop {
183 let initial = {
186 let tc = mutex.lock().await;
187 tc.topology.sources_finished()
188 };
189 initial.await;
190
191 let top = mutex.lock().await;
194 if top.topology.sources_finished().now_or_never().is_some() {
195 return;
196 } else {
197 continue;
198 }
199 }
200 }
201}