vector/transforms/sample/
config.rs

1use snafu::Snafu;
2use vector_lib::{
3    config::LegacyKey,
4    configurable::configurable_component,
5    lookup::{lookup_v2::OptionalValuePath, owned_value_path},
6};
7use vrl::value::Kind;
8
9use super::transform::{DynamicSampleFields, Sample, SampleMode};
10use crate::{
11    conditions::AnyCondition,
12    config::{
13        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
14        TransformOutput,
15    },
16    schema,
17    template::Template,
18    transforms::Transform,
19};
20
21#[derive(Debug, Snafu)]
22pub enum SampleError {
23    // Errors from `determine_sample_mode`
24    #[snafu(display(
25        "Only positive, non-zero numbers are allowed values for `ratio`, value: {ratio}"
26    ))]
27    InvalidRatio { ratio: f64 },
28
29    #[snafu(display("Only non-zero numbers are allowed values for `rate`"))]
30    InvalidRate,
31
32    #[snafu(display("Only one value can be provided for either 'rate' or 'ratio', but not both"))]
33    InvalidStaticConfiguration,
34
35    #[snafu(display(
36        "Only one value can be provided for either 'ratio_field' or 'rate_field', but not both"
37    ))]
38    InvalidDynamicConfiguration,
39
40    #[snafu(display(
41        "Exactly one value must be provided for either 'rate' or 'ratio' to configure static sampling"
42    ))]
43    MissingStaticConfiguration,
44
45    #[snafu(display(
46        "'key_field' cannot be combined with 'ratio_field' or 'rate_field' because dynamic values can vary per event and break key-based coherence"
47    ))]
48    InvalidKeyFieldDynamicCombination,
49}
50
51/// Configuration for the `sample` transform.
52#[configurable_component(transform(
53    "sample",
54    "Sample events from an event stream based on supplied criteria and at a configurable rate."
55))]
56#[derive(Clone, Debug)]
57#[serde(deny_unknown_fields)]
58pub struct SampleConfig {
59    /// The rate at which events are forwarded, expressed as `1/N`.
60    ///
61    /// For example, `rate = 1500` means 1 out of every 1500 events are forwarded and the rest are
62    /// dropped. This differs from `ratio` which allows more precise control over the number of events
63    /// retained and values greater than 1/2. It is an error to provide a value for both `rate` and `ratio`.
64    #[configurable(metadata(docs::examples = 1500))]
65    pub rate: Option<u64>,
66
67    /// The rate at which events are forwarded, expressed as a percentage
68    ///
69    /// For example, `ratio = .13` means that 13% out of all events on the stream are forwarded and
70    /// the rest are dropped. This differs from `rate` allowing the configuration of a higher
71    /// precision value and also the ability to retain values of greater than 50% of all events. It is
72    /// an error to provide a value for both `rate` and `ratio`.
73    #[configurable(metadata(docs::examples = 0.13))]
74    #[configurable(validation(range(min = 0.0, max = 1.0)))]
75    pub ratio: Option<f64>,
76
77    /// The event field whose numeric value is used as the sampling ratio on a per-event basis.
78    ///
79    /// Accepts integer, floating point, or string values that parse as a number. The value must be
80    /// in `(0, 1]` to be considered valid (for example, `0.25` keeps 25%). If the field is missing
81    /// or invalid, static sampling settings (`rate` or `ratio`) are used as a fallback.
82    /// This option cannot be used together with `rate_field`.
83    #[configurable(metadata(docs::examples = "sample_rate"))]
84    pub ratio_field: Option<String>,
85
86    /// The event field whose integer value is used as the sampling rate on a per-event basis, expressed as `1/N`.
87    ///
88    /// Accepts an integer, or a string that parses as a positive integer; floating point values
89    /// are rejected. The value must be a positive integer to be considered valid. If the field is
90    /// missing or invalid, static sampling settings (`rate` or `ratio`) are used as a fallback.
91    /// This option cannot be used together with `ratio_field`.
92    #[configurable(metadata(docs::examples = "sample_rate_n"))]
93    pub rate_field: Option<String>,
94
95    /// The name of the field whose value is hashed to determine if the event should be
96    /// sampled.
97    ///
98    /// Each unique value for the key creates a bucket of related events to be sampled together
99    /// and the rate is applied to the buckets themselves to sample `1/N` buckets.  The overall rate
100    /// of sampling may differ from the configured one if values in the field are not uniformly
101    /// distributed. If left unspecified, or if the event doesn’t have `key_field`, then the
102    /// event is sampled independently.
103    ///
104    /// This can be useful to, for example, ensure that all logs for a given transaction are
105    /// sampled together, but that overall `1/N` transactions are sampled.
106    ///
107    /// This option cannot be combined with `ratio_field` or `rate_field`.
108    #[configurable(metadata(docs::examples = "message"))]
109    pub key_field: Option<String>,
110
111    /// The event key in which the sample rate is stored. If set to an empty string, the sample rate will not be added to the event.
112    #[configurable(metadata(docs::examples = "sample_rate"))]
113    #[serde(default = "default_sample_rate_key")]
114    pub sample_rate_key: OptionalValuePath,
115
116    /// The value to group events into separate buckets to be sampled independently.
117    ///
118    /// If left unspecified, or if the event doesn't have `group_by`, then the event is not
119    /// sampled separately.
120    ///
121    /// This can also be used with `ratio_field` or `rate_field` to apply dynamic sampling
122    /// independently per rendered group value.
123    #[configurable(metadata(
124        docs::examples = "{{ service }}",
125        docs::examples = "{{ hostname }}-{{ service }}"
126    ))]
127    pub group_by: Option<Template>,
128
129    /// A logical condition used to exclude events from sampling.
130    pub exclude: Option<AnyCondition>,
131}
132
133impl SampleConfig {
134    fn sample_rate(&self) -> Result<SampleMode, SampleError> {
135        if self.ratio_field.is_some() && self.rate_field.is_some() {
136            return Err(SampleError::InvalidDynamicConfiguration);
137        }
138
139        if self.key_field.is_some() && (self.ratio_field.is_some() || self.rate_field.is_some()) {
140            return Err(SampleError::InvalidKeyFieldDynamicCombination);
141        }
142
143        if self.rate.is_some() && self.ratio.is_some() {
144            return Err(SampleError::InvalidStaticConfiguration);
145        }
146
147        match (self.rate, self.ratio) {
148            (None, Some(ratio)) => {
149                if ratio <= 0.0 {
150                    Err(SampleError::InvalidRatio { ratio })
151                } else {
152                    Ok(SampleMode::new_ratio(ratio))
153                }
154            }
155            (Some(rate), None) => {
156                if rate == 0 {
157                    Err(SampleError::InvalidRate)
158                } else {
159                    Ok(SampleMode::new_rate(rate))
160                }
161            }
162            (None, None) => Err(SampleError::MissingStaticConfiguration),
163            _ => Err(SampleError::InvalidStaticConfiguration),
164        }
165    }
166}
167
168impl GenerateConfig for SampleConfig {
169    fn generate_config() -> toml::Value {
170        toml::Value::try_from(Self {
171            rate: None,
172            ratio: Some(0.1),
173            ratio_field: None,
174            rate_field: None,
175            key_field: None,
176            group_by: None,
177            exclude: None::<AnyCondition>,
178            sample_rate_key: default_sample_rate_key(),
179        })
180        .unwrap()
181    }
182}
183
184#[async_trait::async_trait]
185#[typetag::serde(name = "sample")]
186impl TransformConfig for SampleConfig {
187    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
188        let sample_mode = self.sample_rate()?;
189        let exclude = self
190            .exclude
191            .as_ref()
192            .map(|condition| condition.build(&context.enrichment_tables, &context.metrics_storage))
193            .transpose()?;
194
195        let sample = if self.ratio_field.is_some() || self.rate_field.is_some() {
196            Sample::new_with_dynamic(
197                Self::NAME.to_string(),
198                sample_mode,
199                DynamicSampleFields {
200                    ratio_field: self.ratio_field.clone(),
201                    rate_field: self.rate_field.clone(),
202                },
203                self.group_by.clone(),
204                exclude,
205                self.sample_rate_key.clone(),
206            )
207        } else {
208            Sample::new(
209                Self::NAME.to_string(),
210                sample_mode,
211                self.key_field.clone(),
212                self.group_by.clone(),
213                exclude,
214                self.sample_rate_key.clone(),
215            )
216        };
217
218        Ok(Transform::function(sample))
219    }
220
221    fn input(&self) -> Input {
222        Input::new(DataType::Log | DataType::Trace)
223    }
224
225    fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
226        self.sample_rate()
227            .map(|_| ())
228            .map_err(|e| vec![e.to_string()])
229    }
230
231    fn outputs(
232        &self,
233        _: &TransformContext,
234        input_definitions: &[(OutputId, schema::Definition)],
235    ) -> Vec<TransformOutput> {
236        vec![TransformOutput::new(
237            DataType::Log | DataType::Trace,
238            input_definitions
239                .iter()
240                .map(|(output, definition)| {
241                    (
242                        output.clone(),
243                        definition.clone().with_source_metadata(
244                            SampleConfig::NAME,
245                            Some(LegacyKey::Overwrite(owned_value_path!("sample_rate"))),
246                            &owned_value_path!("sample_rate"),
247                            Kind::bytes(),
248                            None,
249                        ),
250                    )
251                })
252                .collect(),
253        )]
254    }
255}
256
257pub fn default_sample_rate_key() -> OptionalValuePath {
258    OptionalValuePath::from(owned_value_path!("sample_rate"))
259}
260
261#[cfg(test)]
262mod tests {
263    use crate::{
264        config::TransformConfig,
265        transforms::sample::config::{SampleConfig, SampleError},
266    };
267
268    #[test]
269    fn generate_config() {
270        crate::test_util::test_generate_config::<SampleConfig>();
271    }
272
273    #[test]
274    fn rejects_dynamic_ratio_only_configuration() {
275        let config = SampleConfig {
276            rate: None,
277            ratio: None,
278            ratio_field: Some("sample_rate".to_string()),
279            rate_field: None,
280            key_field: None,
281            sample_rate_key: super::default_sample_rate_key(),
282            group_by: None,
283            exclude: None,
284        };
285
286        let err = config.sample_rate().unwrap_err();
287        assert!(matches!(err, SampleError::MissingStaticConfiguration));
288    }
289
290    #[test]
291    fn rejects_dynamic_rate_only_configuration() {
292        let config = SampleConfig {
293            rate: None,
294            ratio: None,
295            ratio_field: None,
296            rate_field: Some("sample_rate_n".to_string()),
297            key_field: None,
298            sample_rate_key: super::default_sample_rate_key(),
299            group_by: None,
300            exclude: None,
301        };
302
303        let err = config.sample_rate().unwrap_err();
304        assert!(matches!(err, SampleError::MissingStaticConfiguration));
305    }
306
307    #[test]
308    fn validates_static_with_dynamic_configuration() {
309        let config = SampleConfig {
310            rate: Some(10),
311            ratio: None,
312            ratio_field: None,
313            rate_field: Some("sample_rate_n".to_string()),
314            key_field: None,
315            sample_rate_key: super::default_sample_rate_key(),
316            group_by: None,
317            exclude: None,
318        };
319
320        assert!(config.validate(&crate::schema::Definition::any()).is_ok());
321    }
322
323    #[test]
324    fn rejects_both_dynamic_fields_configuration() {
325        let config = SampleConfig {
326            rate: Some(10),
327            ratio: None,
328            ratio_field: Some("sample_rate".to_string()),
329            rate_field: Some("sample_rate_n".to_string()),
330            key_field: None,
331            sample_rate_key: super::default_sample_rate_key(),
332            group_by: None,
333            exclude: None,
334        };
335
336        let err = config.sample_rate().unwrap_err();
337        assert!(matches!(err, SampleError::InvalidDynamicConfiguration));
338    }
339
340    #[test]
341    fn rejects_key_field_with_dynamic_configuration() {
342        let config = SampleConfig {
343            rate: Some(10),
344            ratio: None,
345            ratio_field: Some("sample_ratio".to_string()),
346            rate_field: None,
347            key_field: Some("trace_id".to_string()),
348            sample_rate_key: super::default_sample_rate_key(),
349            group_by: None,
350            exclude: None,
351        };
352
353        let err = config.sample_rate().unwrap_err();
354        assert!(matches!(
355            err,
356            SampleError::InvalidKeyFieldDynamicCombination
357        ));
358    }
359}