vector/internal_events/
adaptive_concurrency.rs

1use std::time::Duration;
2
3use metrics::Histogram;
4use vector_lib::{histogram, internal_event::HistogramName};
5
6#[derive(Clone, Copy)]
7pub struct AdaptiveConcurrencyLimitData {
8    pub concurrency: u64,
9    pub reached_limit: bool,
10    pub had_back_pressure: bool,
11    pub current_rtt: Option<Duration>,
12    pub past_rtt: Duration,
13    pub past_rtt_deviation: Duration,
14}
15
16registered_event! {
17    AdaptiveConcurrencyLimit => {
18        // These are histograms, as they may have a number of different
19        // values over each reporting interval, and each of those values
20        // is valuable for diagnosis.
21        limit: Histogram = histogram!(HistogramName::AdaptiveConcurrencyLimit),
22        reached_limit: Histogram = histogram!(HistogramName::AdaptiveConcurrencyReachedLimit),
23        back_pressure: Histogram = histogram!(HistogramName::AdaptiveConcurrencyBackPressure),
24        past_rtt_mean: Histogram = histogram!(HistogramName::AdaptiveConcurrencyPastRttMean),
25    }
26
27    fn emit(&self, data: AdaptiveConcurrencyLimitData) {
28        self.limit.record(data.concurrency as f64);
29        let reached_limit = if data.reached_limit { 1.0 } else { Default::default() };
30        self.reached_limit.record(reached_limit);
31        let back_pressure = if data.had_back_pressure { 1.0 } else { Default::default() };
32        self.back_pressure.record(back_pressure);
33        self.past_rtt_mean.record(data.past_rtt);
34        // past_rtt_deviation is unrecorded
35    }
36}
37
38registered_event! {
39    AdaptiveConcurrencyInFlight => {
40        in_flight: Histogram = histogram!(HistogramName::AdaptiveConcurrencyInFlight),
41    }
42
43    fn emit(&self, in_flight: u64) {
44        self.in_flight.record(in_flight as f64);
45    }
46}
47
48registered_event! {
49    AdaptiveConcurrencyObservedRtt => {
50        observed_rtt: Histogram = histogram!(HistogramName::AdaptiveConcurrencyObservedRtt),
51    }
52
53    fn emit(&self, rtt: Duration) {
54        self.observed_rtt.record(rtt);
55    }
56}
57
58registered_event! {
59    AdaptiveConcurrencyAveragedRtt => {
60        averaged_rtt: Histogram = histogram!(HistogramName::AdaptiveConcurrencyAveragedRtt),
61    }
62
63    fn emit(&self, rtt: Duration) {
64        self.averaged_rtt.record(rtt);
65    }
66}