vector/internal_events/
adaptive_concurrency.rs1use std::time::Duration;
2
3use metrics::Histogram;
4use vector_lib::{histogram, internal_event::HistogramName};
5
6#[derive(Clone, Copy)]
7pub struct AdaptiveConcurrencyLimitData {
8 pub concurrency: u64,
9 pub reached_limit: bool,
10 pub had_back_pressure: bool,
11 pub current_rtt: Option<Duration>,
12 pub past_rtt: Duration,
13 pub past_rtt_deviation: Duration,
14}
15
16registered_event! {
17 AdaptiveConcurrencyLimit => {
18 limit: Histogram = histogram!(HistogramName::AdaptiveConcurrencyLimit),
22 reached_limit: Histogram = histogram!(HistogramName::AdaptiveConcurrencyReachedLimit),
23 back_pressure: Histogram = histogram!(HistogramName::AdaptiveConcurrencyBackPressure),
24 past_rtt_mean: Histogram = histogram!(HistogramName::AdaptiveConcurrencyPastRttMean),
25 }
26
27 fn emit(&self, data: AdaptiveConcurrencyLimitData) {
28 self.limit.record(data.concurrency as f64);
29 let reached_limit = if data.reached_limit { 1.0 } else { Default::default() };
30 self.reached_limit.record(reached_limit);
31 let back_pressure = if data.had_back_pressure { 1.0 } else { Default::default() };
32 self.back_pressure.record(back_pressure);
33 self.past_rtt_mean.record(data.past_rtt);
34 }
36}
37
38registered_event! {
39 AdaptiveConcurrencyInFlight => {
40 in_flight: Histogram = histogram!(HistogramName::AdaptiveConcurrencyInFlight),
41 }
42
43 fn emit(&self, in_flight: u64) {
44 self.in_flight.record(in_flight as f64);
45 }
46}
47
48registered_event! {
49 AdaptiveConcurrencyObservedRtt => {
50 observed_rtt: Histogram = histogram!(HistogramName::AdaptiveConcurrencyObservedRtt),
51 }
52
53 fn emit(&self, rtt: Duration) {
54 self.observed_rtt.record(rtt);
55 }
56}
57
58registered_event! {
59 AdaptiveConcurrencyAveragedRtt => {
60 averaged_rtt: Histogram = histogram!(HistogramName::AdaptiveConcurrencyAveragedRtt),
61 }
62
63 fn emit(&self, rtt: Duration) {
64 self.averaged_rtt.record(rtt);
65 }
66}