vector/transforms/sample/
transform.rs

1use std::{
2    collections::HashMap,
3    fmt,
4    hash::{Hash, Hasher},
5    num::NonZeroU64,
6};
7
8use vector_lib::{
9    config::LegacyKey,
10    lookup::{OwnedTargetPath, lookup_v2::OptionalValuePath},
11};
12
13use crate::{
14    conditions::Condition,
15    event::{Event, Value},
16    internal_events::SampleEventDiscarded,
17    sinks::prelude::TemplateRenderingError,
18    template::Template,
19    transforms::{FunctionTransform, OutputBuffer},
20};
21
22/// Exists only for backwards compatability purposes so that the value of sample_rate_key is
23/// consistent after the internal implementation of the Sample class was modified to work in terms
24/// of percentages
25#[derive(Clone, Debug)]
26pub enum SampleMode {
27    Rate {
28        rate: u64,
29        counters: HashMap<Option<String>, u64>,
30    },
31    Ratio {
32        ratio: f64,
33        values: HashMap<Option<String>, f64>,
34        hash_ratio_threshold: u64,
35    },
36}
37
38impl SampleMode {
39    pub fn new_rate(rate: u64) -> Self {
40        Self::Rate {
41            rate,
42            counters: HashMap::default(),
43        }
44    }
45
46    pub fn new_ratio(ratio: f64) -> Self {
47        Self::Ratio {
48            ratio,
49            values: HashMap::default(),
50            // Supports the 'key_field' option, assuming an equal distribution of values for a given
51            // field, hashing its contents this component should output events according to the
52            // configured ratio.
53            //
54            // To do one option would be to convert the hash to a number between 0 and 1 and compare
55            // to the ratio. However to address issues with precision, here the ratio is scaled to
56            // meet the width of the type of the hash.
57            hash_ratio_threshold: (ratio * (u64::MAX as u128) as f64) as u64,
58        }
59    }
60
61    fn increment(&mut self, group_by_key: Option<String>, value: Option<&Value>) -> bool {
62        let threshold_exceeded = match self {
63            Self::Rate { rate, counters } => {
64                let counter_value = counters.entry(group_by_key).or_default();
65                let old_counter_value = *counter_value;
66                *counter_value += 1;
67                old_counter_value % *rate == 0
68            }
69            Self::Ratio { ratio, values, .. } => {
70                let value = values.entry(group_by_key).or_insert(1.0 - *ratio);
71                let increment: f64 = *value + *ratio;
72                *value = if increment >= 1.0 {
73                    increment - 1.0
74                } else {
75                    increment
76                };
77                increment >= 1.0
78            }
79        };
80        if let Some(value) = value {
81            self.hash_within_ratio(value.to_string_lossy().as_bytes())
82        } else {
83            threshold_exceeded
84        }
85    }
86
87    fn hash_within_ratio(&self, value: &[u8]) -> bool {
88        let hash = seahash::hash(value);
89        match self {
90            Self::Rate { rate, .. } => hash.is_multiple_of(*rate),
91            Self::Ratio {
92                hash_ratio_threshold,
93                ..
94            } => hash <= *hash_ratio_threshold,
95        }
96    }
97}
98
99enum EventSampleMode {
100    Ratio(f64),
101    Rate(NonZeroU64),
102}
103
104impl EventSampleMode {
105    fn sample_rate_label(&self) -> String {
106        match self {
107            Self::Ratio(ratio) => ratio.to_string(),
108            Self::Rate(rate) => rate.to_string(),
109        }
110    }
111}
112
113#[derive(Clone, Default)]
114pub struct DynamicSampleFields {
115    pub ratio_field: Option<String>,
116    pub rate_field: Option<String>,
117}
118
119impl fmt::Display for SampleMode {
120    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121        // Avoids the print of an additional '.0' which was not performed in the previous
122        // implementation
123        match self {
124            Self::Rate { rate, .. } => write!(f, "{rate}"),
125            Self::Ratio { ratio, .. } => write!(f, "{ratio}"),
126        }
127    }
128}
129
130#[derive(Clone)]
131pub enum SampleKeySource {
132    Static {
133        key_field: Option<String>,
134        group_by: Option<Template>,
135    },
136    Dynamic {
137        fields: DynamicSampleFields,
138        group_by: Option<Template>,
139    },
140}
141
142#[derive(Clone)]
143pub struct Sample {
144    name: String,
145    static_mode: SampleMode,
146    key_source: SampleKeySource,
147    dynamic_event_counters: HashMap<Option<String>, u64>,
148    exclude: Option<Condition>,
149    sample_rate_key: OptionalValuePath,
150}
151
152impl Sample {
153    // This function is dead code when the feature flag `transforms-impl-sample` is specified but not
154    // `transforms-sample`.
155    #![allow(dead_code)]
156    pub fn new(
157        name: String,
158        static_mode: SampleMode,
159        key_field: Option<String>,
160        group_by: Option<Template>,
161        exclude: Option<Condition>,
162        sample_rate_key: OptionalValuePath,
163    ) -> Self {
164        Self::new_with_source(
165            name,
166            static_mode,
167            SampleKeySource::Static {
168                key_field,
169                group_by,
170            },
171            exclude,
172            sample_rate_key,
173        )
174    }
175
176    pub fn new_with_dynamic(
177        name: String,
178        static_mode: SampleMode,
179        fields: DynamicSampleFields,
180        group_by: Option<Template>,
181        exclude: Option<Condition>,
182        sample_rate_key: OptionalValuePath,
183    ) -> Self {
184        Self::new_with_source(
185            name,
186            static_mode,
187            SampleKeySource::Dynamic { fields, group_by },
188            exclude,
189            sample_rate_key,
190        )
191    }
192
193    fn new_with_source(
194        name: String,
195        static_mode: SampleMode,
196        key_source: SampleKeySource,
197        exclude: Option<Condition>,
198        sample_rate_key: OptionalValuePath,
199    ) -> Self {
200        Self {
201            name,
202            static_mode,
203            key_source,
204            dynamic_event_counters: HashMap::default(),
205            exclude,
206            sample_rate_key,
207        }
208    }
209
210    #[cfg(test)]
211    pub fn ratio(&self) -> f64 {
212        match &self.static_mode {
213            SampleMode::Rate { rate, .. } => 1.0f64 / *rate as f64,
214            SampleMode::Ratio { ratio, .. } => *ratio,
215        }
216    }
217
218    fn dynamic_sample_hash(group_by_key: Option<&str>, counter: u64) -> u64 {
219        let mut hasher = seahash::SeaHasher::new();
220        group_by_key.hash(&mut hasher);
221        counter.hash(&mut hasher);
222        hasher.finish()
223    }
224
225    fn sample_with_dynamic_ratio(&mut self, ratio: f64, group_by_key: Option<String>) -> bool {
226        let counter_value = self
227            .dynamic_event_counters
228            .entry(group_by_key.clone())
229            .or_default();
230        let old_counter_value = *counter_value;
231        *counter_value += 1;
232
233        let hash = Self::dynamic_sample_hash(group_by_key.as_deref(), old_counter_value);
234        let hash_ratio_threshold = (ratio * (u64::MAX as u128) as f64) as u64;
235        hash <= hash_ratio_threshold
236    }
237
238    fn event_ratio(&self, event: &Event) -> Option<f64> {
239        let ratio_field = match &self.key_source {
240            SampleKeySource::Dynamic { fields, .. } => fields.ratio_field.as_ref()?,
241            SampleKeySource::Static { .. } => return None,
242        };
243
244        let value = self.get_event_value(event, ratio_field.as_str())?;
245
246        let ratio = match value {
247            Value::Integer(value) => *value as f64,
248            Value::Float(value) => value.into_inner(),
249            Value::Bytes(bytes) => std::str::from_utf8(bytes).ok()?.parse::<f64>().ok()?,
250            _ => return None,
251        };
252
253        (ratio > 0.0 && ratio <= 1.0).then_some(ratio)
254    }
255
256    fn event_rate(&self, event: &Event) -> Option<NonZeroU64> {
257        let rate_field = match &self.key_source {
258            SampleKeySource::Dynamic { fields, .. } => fields.rate_field.as_ref()?,
259            SampleKeySource::Static { .. } => return None,
260        };
261
262        let value = self.get_event_value(event, rate_field.as_str())?;
263
264        match value {
265            Value::Integer(value) => u64::try_from(*value).ok().and_then(NonZeroU64::new),
266            Value::Bytes(bytes) => std::str::from_utf8(bytes).ok()?.parse::<NonZeroU64>().ok(),
267            _ => None,
268        }
269    }
270
271    fn get_event_value<'a>(&self, event: &'a Event, path: &str) -> Option<&'a Value> {
272        match event {
273            Event::Log(event) => event.parse_path_and_get_value(path).ok().flatten(),
274            Event::Trace(event) => event.parse_path_and_get_value(path).ok().flatten(),
275            Event::Metric(_) => panic!("component can never receive metric events"),
276        }
277    }
278
279    fn event_sample_mode(&self, event: &Event) -> Option<EventSampleMode> {
280        self.event_ratio(event)
281            .map(EventSampleMode::Ratio)
282            .or_else(|| self.event_rate(event).map(EventSampleMode::Rate))
283    }
284
285    fn sample_with_dynamic_rate(&mut self, rate: NonZeroU64, group_by_key: Option<String>) -> bool {
286        let counter_value = self
287            .dynamic_event_counters
288            .entry(group_by_key.clone())
289            .or_default();
290        let old_counter_value = *counter_value;
291        *counter_value += 1;
292        let hash = Self::dynamic_sample_hash(group_by_key.as_deref(), old_counter_value);
293
294        hash.is_multiple_of(rate.get())
295    }
296
297    fn group_by_key(&self, event: &Event) -> Option<String> {
298        let group_by = match &self.key_source {
299            SampleKeySource::Static { group_by, .. } => group_by.as_ref()?,
300            SampleKeySource::Dynamic { group_by, .. } => group_by.as_ref()?,
301        };
302
303        match event {
304            Event::Log(event) => group_by.render_string(event),
305            Event::Trace(event) => group_by.render_string(event),
306            Event::Metric(_) => panic!("component can never receive metric events"),
307        }
308        .map_err(|error| {
309            emit!(TemplateRenderingError {
310                error,
311                field: Some("group_by"),
312                drop_event: false,
313            })
314        })
315        .ok()
316    }
317
318    fn static_key_value<'a>(&self, event: &'a Event) -> Option<&'a Value> {
319        let key_field = match &self.key_source {
320            SampleKeySource::Static { key_field, .. } => key_field.as_ref()?,
321            SampleKeySource::Dynamic { .. } => return None,
322        };
323
324        self.get_event_value(event, key_field)
325    }
326}
327
328impl FunctionTransform for Sample {
329    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
330        let mut event = {
331            if let Some(condition) = self.exclude.as_ref() {
332                let (result, event) = condition.check(event);
333                if result {
334                    output.push(event);
335                    return;
336                } else {
337                    event
338                }
339            } else {
340                event
341            }
342        };
343
344        let group_by_key = self.group_by_key(&event);
345        let value = self.static_key_value(&event);
346
347        let event_sample_mode = self.event_sample_mode(&event);
348        let sample_rate = event_sample_mode
349            .as_ref()
350            .map(EventSampleMode::sample_rate_label)
351            .unwrap_or_else(|| self.static_mode.to_string());
352
353        let should_sample = match event_sample_mode {
354            Some(EventSampleMode::Ratio(ratio)) => {
355                self.sample_with_dynamic_ratio(ratio, group_by_key)
356            }
357            Some(EventSampleMode::Rate(rate)) => self.sample_with_dynamic_rate(rate, group_by_key),
358            None => self.static_mode.increment(group_by_key, value),
359        };
360
361        if should_sample {
362            if let Some(path) = &self.sample_rate_key.path {
363                match event {
364                    Event::Log(ref mut event) => {
365                        event.namespace().insert_source_metadata(
366                            self.name.as_str(),
367                            event,
368                            Some(LegacyKey::Overwrite(path)),
369                            path,
370                            sample_rate.clone(),
371                        );
372                    }
373                    Event::Trace(ref mut event) => {
374                        event.insert(&OwnedTargetPath::event(path.clone()), sample_rate);
375                    }
376                    Event::Metric(_) => panic!("component can never receive metric events"),
377                };
378            }
379            output.push(event);
380        } else {
381            emit!(SampleEventDiscarded);
382        }
383    }
384}