1#![allow(missing_docs)]
2use std::{collections::HashMap, env, path::PathBuf};
3
4use bollard::{
5 API_DEFAULT_VERSION, Docker,
6 errors::Error as DockerError,
7 models::{ContainerCreateBody, HostConfig},
8 query_parameters::{
9 CreateContainerOptionsBuilder, CreateImageOptionsBuilder, ListImagesOptionsBuilder,
10 RemoveContainerOptions, StartContainerOptions, StopContainerOptions,
11 },
12};
13use futures::StreamExt;
14use http::uri::Uri;
15use snafu::Snafu;
16use vector_lib::configurable::configurable_component;
17
18const DEFAULT_TIMEOUT: u64 = 120;
20
21#[derive(Debug, Snafu)]
22pub enum Error {
23 #[snafu(display("URL has no host."))]
24 NoHost,
25}
26
27#[configurable_component]
33#[derive(Clone, Debug)]
34#[serde(deny_unknown_fields)]
35pub struct DockerTlsConfig {
36 ca_file: PathBuf,
38
39 crt_file: PathBuf,
41
42 key_file: PathBuf,
44}
45
46pub fn docker(host: Option<String>, tls: Option<DockerTlsConfig>) -> crate::Result<Docker> {
47 let host = host.or_else(|| env::var("DOCKER_HOST").ok());
48
49 match host {
50 None => Docker::connect_with_defaults().map_err(Into::into),
51 Some(host) => {
52 let scheme = host
53 .parse::<Uri>()
54 .ok()
55 .and_then(|uri| uri.into_parts().scheme);
56
57 match scheme.as_ref().map(|scheme| scheme.as_str()) {
58 Some("http") | Some("tcp") => {
59 let host = get_authority(&host)?;
60 Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
61 .map_err(Into::into)
62 }
63 Some("https") => {
64 let host = get_authority(&host)?;
65 let tls = tls
66 .or_else(default_certs)
67 .ok_or(DockerError::NoHomePathError)?;
68 Docker::connect_with_ssl(
69 &host,
70 &tls.key_file,
71 &tls.crt_file,
72 &tls.ca_file,
73 DEFAULT_TIMEOUT,
74 API_DEFAULT_VERSION,
75 )
76 .map_err(Into::into)
77 }
78 Some("unix") | Some("npipe") | None => {
79 Docker::connect_with_socket(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
80 .map_err(Into::into)
81 }
82 Some(scheme) => Err(format!("Unknown scheme: {scheme}").into()),
83 }
84 }
85 }
86}
87
88fn default_certs() -> Option<DockerTlsConfig> {
90 let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
91 let base = match from_env {
92 Ok(path) => PathBuf::from(path),
93 Err(_) => dirs_next::home_dir()?.join(".docker"),
94 };
95 Some(DockerTlsConfig {
96 ca_file: base.join("ca.pem"),
97 key_file: base.join("key.pem"),
98 crt_file: base.join("cert.pem"),
99 })
100}
101
102fn get_authority(url: &str) -> Result<String, Error> {
103 url.parse::<Uri>()
104 .ok()
105 .and_then(|uri| uri.authority().map(<_>::to_string))
106 .ok_or(Error::NoHost)
107}
108
109async fn pull_image(docker: &Docker, image: &str, tag: &str) {
110 let mut filters = HashMap::new();
111 filters.insert(
112 String::from("reference"),
113 vec![format!("{}:{}", image, tag)],
114 );
115
116 let options = Some(ListImagesOptionsBuilder::new().filters(&filters).build());
117
118 let images = docker.list_images(options).await.unwrap();
119 if images.is_empty() {
120 let options = Some(
122 CreateImageOptionsBuilder::new()
123 .from_image(image)
124 .tag(tag)
125 .build(),
126 );
127
128 docker
129 .create_image(options, None, None)
130 .for_each(|item| async move {
131 let info = item.unwrap();
132 if let Some(error) = info.error_detail {
133 panic!("{error:?}");
134 }
135 })
136 .await
137 }
138}
139
140async fn remove_container(docker: &Docker, id: &str) {
141 trace!("Stopping container.");
142
143 _ = docker
144 .stop_container(id, None::<StopContainerOptions>)
145 .await
146 .map_err(|e| error!(%e));
147
148 trace!("Removing container.");
149
150 _ = docker
152 .remove_container(id, None::<RemoveContainerOptions>)
153 .await
154 .map_err(|e| error!(%e));
155}
156
157pub struct Container {
158 image: &'static str,
159 tag: &'static str,
160 binds: Option<Vec<String>>,
161 cmd: Option<Vec<String>>,
162}
163
164impl Container {
165 pub const fn new(image: &'static str, tag: &'static str) -> Self {
166 Self {
167 image,
168 tag,
169 binds: None,
170 cmd: None,
171 }
172 }
173
174 pub fn bind(mut self, src: impl std::fmt::Display, dst: &str) -> Self {
175 let bind = format!("{src}:{dst}");
176 self.binds.get_or_insert_with(Vec::new).push(bind);
177 self
178 }
179
180 pub fn cmd(mut self, option: &str) -> Self {
181 self.cmd.get_or_insert_with(Vec::new).push(option.into());
182 self
183 }
184
185 pub async fn run<T>(self, doit: impl futures::Future<Output = T>) -> T {
186 let docker = docker(None, None).unwrap();
187
188 pull_image(&docker, self.image, self.tag).await;
189
190 let options = CreateContainerOptionsBuilder::new()
191 .name(&format!("vector_test_{}", uuid::Uuid::new_v4()))
192 .build();
193
194 let config = ContainerCreateBody {
195 image: Some(format!("{}:{}", &self.image, &self.tag)),
196 cmd: self.cmd,
197 host_config: Some(HostConfig {
198 network_mode: Some(String::from("host")),
199 extra_hosts: Some(vec!["host.docker.internal:host-gateway".into()]),
200 binds: self.binds,
201 ..Default::default()
202 }),
203 ..Default::default()
204 };
205
206 let container = docker
207 .create_container(Some(options), config)
208 .await
209 .unwrap();
210
211 docker
212 .start_container(&container.id, None::<StartContainerOptions>)
213 .await
214 .unwrap();
215
216 let result = doit.await;
217
218 remove_container(&docker, &container.id).await;
219
220 result
221 }
222}