vector/sinks/gcs_common/
config.rs

1use std::marker::PhantomData;
2
3use futures::FutureExt;
4use http::{StatusCode, Uri};
5use hyper::Body;
6use snafu::Snafu;
7use vector_lib::configurable::configurable_component;
8
9use crate::{
10    gcp::{GcpAuthenticator, GcpError},
11    http::HttpClient,
12    sinks::{
13        Healthcheck, HealthcheckError,
14        gcs_common::service::GcsResponse,
15        util::retries::{RetryAction, RetryLogic},
16    },
17};
18
19pub fn default_endpoint() -> String {
20    "https://storage.googleapis.com".to_string()
21}
22
23/// GCS Predefined ACLs.
24///
25/// For more information, see [Predefined ACLs][predefined_acls].
26///
27/// [predefined_acls]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
28#[configurable_component]
29#[derive(Clone, Copy, Debug, Default)]
30#[serde(rename_all = "kebab-case")]
31pub enum GcsPredefinedAcl {
32    /// Bucket/object can be read by authenticated users.
33    ///
34    /// The bucket/object owner is granted the `OWNER` permission, and anyone authenticated Google
35    /// account holder is granted the `READER` permission.
36    AuthenticatedRead,
37
38    /// Object is semi-private.
39    ///
40    /// Both the object owner and bucket owner are granted the `OWNER` permission.
41    ///
42    /// Only relevant when specified for an object: this predefined ACL is otherwise ignored when
43    /// specified for a bucket.
44    BucketOwnerFullControl,
45
46    /// Object is private, except to the bucket owner.
47    ///
48    /// The object owner is granted the `OWNER` permission, and the bucket owner is granted the
49    /// `READER` permission.
50    ///
51    /// Only relevant when specified for an object: this predefined ACL is otherwise ignored when
52    /// specified for a bucket.
53    BucketOwnerRead,
54
55    /// Bucket/object are private.
56    ///
57    /// The bucket/object owner is granted the `OWNER` permission, and no one else has
58    /// access.
59    Private,
60
61    /// Bucket/object are private within the project.
62    ///
63    /// Project owners and project editors are granted the `OWNER` permission, and anyone who is
64    /// part of the project team is granted the `READER` permission.
65    ///
66    /// This is the default.
67    #[default]
68    ProjectPrivate,
69
70    /// Bucket/object can be read publicly.
71    ///
72    /// The bucket/object owner is granted the `OWNER` permission, and all other users, whether
73    /// authenticated or anonymous, are granted the `READER` permission.
74    PublicRead,
75}
76
77/// GCS storage classes.
78///
79/// For more information, see [Storage classes][storage_classes].
80///
81/// [storage_classes]: https://cloud.google.com/storage/docs/storage-classes
82#[configurable_component]
83#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
84#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
85pub enum GcsStorageClass {
86    /// Standard storage.
87    ///
88    /// This is the default.
89    #[default]
90    Standard,
91
92    /// Nearline storage.
93    Nearline,
94
95    /// Coldline storage.
96    Coldline,
97
98    /// Archive storage.
99    Archive,
100}
101
102#[derive(Debug, Snafu)]
103pub enum GcsError {
104    #[snafu(display("Bucket {:?} not found", bucket))]
105    BucketNotFound { bucket: String },
106}
107
108pub fn build_healthcheck(
109    bucket: String,
110    client: HttpClient,
111    base_url: String,
112    auth: GcpAuthenticator,
113) -> crate::Result<Healthcheck> {
114    let healthcheck = async move {
115        let uri = base_url.parse::<Uri>()?;
116        let mut request = http::Request::head(uri).body(Body::empty())?;
117
118        auth.apply(&mut request);
119
120        let not_found_error = GcsError::BucketNotFound { bucket }.into();
121
122        let response = client.send(request).await?;
123        healthcheck_response(response, not_found_error)
124    };
125
126    Ok(healthcheck.boxed())
127}
128
129pub fn healthcheck_response(
130    response: http::Response<hyper::Body>,
131    not_found_error: crate::Error,
132) -> crate::Result<()> {
133    match response.status() {
134        StatusCode::OK => Ok(()),
135        StatusCode::FORBIDDEN => Err(GcpError::HealthcheckForbidden.into()),
136        StatusCode::NOT_FOUND => Err(not_found_error),
137        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
138    }
139}
140
141pub struct GcsRetryLogic<Request> {
142    request: PhantomData<Request>,
143}
144
145impl<Request> Default for GcsRetryLogic<Request> {
146    fn default() -> Self {
147        Self {
148            request: PhantomData,
149        }
150    }
151}
152
153impl<Request> Clone for GcsRetryLogic<Request> {
154    fn clone(&self) -> Self {
155        Self {
156            request: PhantomData,
157        }
158    }
159}
160
161// This is a clone of HttpRetryLogic for the Body type, should get merged
162impl<Request: Clone + Send + Sync + 'static> RetryLogic for GcsRetryLogic<Request> {
163    type Error = hyper::Error;
164    type Request = Request;
165    type Response = GcsResponse;
166
167    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
168        true
169    }
170
171    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
172        let status = response.inner.status();
173
174        match status {
175            StatusCode::UNAUTHORIZED => RetryAction::Retry("unauthorized".into()),
176            StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
177            StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
178            StatusCode::NOT_IMPLEMENTED => {
179                RetryAction::DontRetry("endpoint not implemented".into())
180            }
181            _ if status.is_server_error() => RetryAction::Retry(status.to_string().into()),
182            _ if status.is_success() => RetryAction::Successful,
183            _ => RetryAction::DontRetry(format!("response status: {status}").into()),
184        }
185    }
186}