vector/internal_events/
open.rs1use std::{
2 hint,
3 sync::{
4 Arc,
5 atomic::{AtomicUsize, Ordering},
6 },
7};
8
9use vector_lib::{
10 NamedInternalEvent, gauge,
11 internal_event::{GaugeName, InternalEvent},
12};
13
14#[derive(Debug, NamedInternalEvent)]
15pub struct ConnectionOpen {
16 pub count: usize,
17}
18
19impl InternalEvent for ConnectionOpen {
20 fn emit(self) {
21 gauge!(GaugeName::OpenConnections).set(self.count as f64);
22 }
23}
24
25#[derive(Debug, NamedInternalEvent)]
26pub struct EndpointsActive {
27 pub count: usize,
28}
29
30impl InternalEvent for EndpointsActive {
31 fn emit(self) {
32 gauge!(GaugeName::ActiveEndpoints).set(self.count as f64);
33 }
34}
35
36#[derive(Clone)]
37pub struct OpenGauge {
38 gauge: Arc<AtomicUsize>,
39}
40
41impl OpenGauge {
42 pub fn new() -> Self {
43 OpenGauge {
44 gauge: Arc::default(),
45 }
46 }
47
48 pub fn open<E: Fn(usize)>(self, emitter: E) -> OpenToken<E> {
51 gauge_add(&self.gauge, 1, &emitter);
52 OpenToken {
53 gauge: self.gauge,
54 emitter,
55 }
56 }
57
58 #[cfg(all(feature = "sources-utils-net-unix", unix))]
59 pub fn any_open(&self) -> bool {
60 self.gauge.load(Ordering::Acquire) != 0
61 }
62}
63
64impl Default for OpenGauge {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70pub struct OpenToken<E: Fn(usize)> {
71 gauge: Arc<AtomicUsize>,
72 emitter: E,
73}
74
75impl<E: Fn(usize)> Drop for OpenToken<E> {
76 fn drop(&mut self) {
77 gauge_add(&self.gauge, -1, &self.emitter);
78 }
79}
80
81fn gauge_add(gauge: &AtomicUsize, add: isize, emitter: impl Fn(usize)) {
85 let mut value = gauge.load(Ordering::Acquire);
102 loop {
103 let new_value = (value as isize + add) as usize;
104 emitter(new_value);
105 match gauge.compare_exchange_weak(value, new_value, Ordering::AcqRel, Ordering::Acquire) {
112 Ok(_) => break,
113 Err(x) => {
114 hint::spin_loop();
115 value = x;
116 }
117 }
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use std::{mem::drop, thread};
124
125 use super::*;
126
127 #[test]
129 fn eventually_consistent() {
130 let n = 8;
131 let m = 1000;
132 let gauge = OpenGauge::new();
133 let value = Arc::new(AtomicUsize::new(0));
134
135 let handles = (0..n)
136 .map(|_| {
137 let gauge = gauge.clone();
138 let value = Arc::clone(&value);
139 thread::spawn(move || {
140 let mut work = 0;
141 for _ in 0..m {
142 let token = gauge
143 .clone()
144 .open(|count| value.store(count, Ordering::Release));
145 for i in 0..100 {
147 work += i;
148 }
149 drop(token);
150 }
151 work
152 })
153 })
154 .collect::<Vec<_>>();
155
156 for handle in handles {
157 handle.join().unwrap();
158 }
159
160 assert_eq!(0, value.load(Ordering::Acquire));
161 }
162}