vector_buffers/topology/channel/
limited_queue.rs

1use std::{
2    cmp,
3    fmt::{self, Debug},
4    num::NonZeroUsize,
5    pin::Pin,
6    sync::{
7        Arc,
8        atomic::{AtomicUsize, Ordering},
9    },
10    time::Instant,
11};
12
13#[cfg(test)]
14use std::sync::Mutex;
15
16use async_stream::stream;
17use crossbeam_queue::{ArrayQueue, SegQueue};
18use futures::Stream;
19use metrics::{Gauge, Histogram};
20use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
21use vector_common::internal_event::{GaugeName, HistogramName};
22use vector_common::stats::TimeEwmaGauge;
23use vector_common::{gauge, histogram};
24
25use crate::{InMemoryBufferable, config::MemoryBufferSize};
26
27pub const DEFAULT_EWMA_HALF_LIFE_SECONDS: f64 = 5.0;
28
29/// Identifies which buffer channel is being instrumented, determining the metric name prefix.
30#[derive(Clone, Copy, Debug)]
31pub enum BufferChannelKind {
32    Source,
33    Transform,
34}
35
36/// Error returned by `LimitedSender::send` when the receiver has disconnected.
37#[derive(Debug, PartialEq, Eq)]
38pub struct SendError<T>(pub T);
39
40impl<T> fmt::Display for SendError<T> {
41    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
42        write!(fmt, "receiver disconnected")
43    }
44}
45
46impl<T: fmt::Debug> std::error::Error for SendError<T> {}
47
48/// Error returned by `LimitedSender::try_send`.
49#[derive(Debug, PartialEq, Eq)]
50pub enum TrySendError<T> {
51    InsufficientCapacity(T),
52    Disconnected(T),
53}
54
55impl<T> TrySendError<T> {
56    pub fn into_inner(self) -> T {
57        match self {
58            Self::InsufficientCapacity(item) | Self::Disconnected(item) => item,
59        }
60    }
61}
62
63impl<T> fmt::Display for TrySendError<T> {
64    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
65        match self {
66            Self::InsufficientCapacity(_) => {
67                write!(fmt, "channel lacks sufficient capacity for send")
68            }
69            Self::Disconnected(_) => write!(fmt, "receiver disconnected"),
70        }
71    }
72}
73
74impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
75
76// Trait over common queue operations so implementation can be chosen at initialization phase
77trait QueueImpl<T>: Send + Sync + fmt::Debug {
78    fn push(&self, item: T);
79    fn pop(&self) -> Option<T>;
80}
81
82impl<T> QueueImpl<T> for ArrayQueue<T>
83where
84    T: Send + Sync + fmt::Debug,
85{
86    fn push(&self, item: T) {
87        self.push(item)
88            .unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full."));
89    }
90
91    fn pop(&self) -> Option<T> {
92        self.pop()
93    }
94}
95
96impl<T> QueueImpl<T> for SegQueue<T>
97where
98    T: Send + Sync + fmt::Debug,
99{
100    fn push(&self, item: T) {
101        self.push(item);
102    }
103
104    fn pop(&self) -> Option<T> {
105        self.pop()
106    }
107}
108
109#[derive(Clone, Debug)]
110pub struct ChannelMetricMetadata {
111    kind: BufferChannelKind,
112    output: Option<String>,
113}
114
115impl ChannelMetricMetadata {
116    pub fn new(kind: BufferChannelKind, output: Option<String>) -> Self {
117        Self { kind, output }
118    }
119}
120
121#[derive(Clone, Debug)]
122struct Metrics {
123    histogram: Histogram,
124    gauge: Gauge,
125    mean_gauge: TimeEwmaGauge,
126    // We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
127    // since the value is static, we never need to update it. The compiler detects this as an unused
128    // field, so we need to suppress the warning here.
129    #[expect(dead_code)]
130    max_gauge: Gauge,
131    #[expect(dead_code)]
132    legacy_max_gauge: Gauge,
133    #[cfg(test)]
134    recorded_values: Arc<Mutex<Vec<usize>>>,
135}
136
137impl Metrics {
138    #[expect(clippy::cast_precision_loss)] // We have to convert buffer sizes for a gauge, it's okay to lose precision here.
139    fn new(
140        limit: MemoryBufferSize,
141        metadata: ChannelMetricMetadata,
142        ewma_half_life_seconds: Option<f64>,
143    ) -> Self {
144        let ewma_half_life_seconds =
145            ewma_half_life_seconds.unwrap_or(DEFAULT_EWMA_HALF_LIFE_SECONDS);
146        let ChannelMetricMetadata { kind, output } = metadata;
147
148        let (histogram_name, level_name, mean_name) = match kind {
149            BufferChannelKind::Source => (
150                HistogramName::SourceBufferUtilization,
151                GaugeName::SourceBufferUtilizationLevel,
152                GaugeName::SourceBufferUtilizationMean,
153            ),
154            BufferChannelKind::Transform => (
155                HistogramName::TransformBufferUtilization,
156                GaugeName::TransformBufferUtilizationLevel,
157                GaugeName::TransformBufferUtilizationMean,
158            ),
159        };
160
161        let (max_name, legacy_name, max_value) = match (kind, limit) {
162            (BufferChannelKind::Source, MemoryBufferSize::MaxEvents(n)) => (
163                GaugeName::SourceBufferMaxSizeEvents,
164                GaugeName::SourceBufferMaxEventSize,
165                n.get() as f64,
166            ),
167            (BufferChannelKind::Source, MemoryBufferSize::MaxSize(n)) => (
168                GaugeName::SourceBufferMaxSizeBytes,
169                GaugeName::SourceBufferMaxByteSize,
170                n.get() as f64,
171            ),
172            (BufferChannelKind::Transform, MemoryBufferSize::MaxEvents(n)) => (
173                GaugeName::TransformBufferMaxSizeEvents,
174                GaugeName::TransformBufferMaxEventSize,
175                n.get() as f64,
176            ),
177            (BufferChannelKind::Transform, MemoryBufferSize::MaxSize(n)) => (
178                GaugeName::TransformBufferMaxSizeBytes,
179                GaugeName::TransformBufferMaxByteSize,
180                n.get() as f64,
181            ),
182        };
183        #[cfg(test)]
184        let recorded_values = Arc::new(Mutex::new(Vec::new()));
185        if let Some(label_value) = output {
186            let max_gauge = gauge!(max_name, "output" => label_value.clone());
187            max_gauge.set(max_value);
188            let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
189            // DEPRECATED: buffer-bytes-events-metrics
190            let legacy_max_gauge = gauge!(legacy_name, "output" => label_value.clone());
191            legacy_max_gauge.set(max_value);
192            Self {
193                histogram: histogram!(histogram_name, "output" => label_value.clone()),
194                gauge: gauge!(level_name, "output" => label_value.clone()),
195                mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
196                max_gauge,
197                legacy_max_gauge,
198                #[cfg(test)]
199                recorded_values,
200            }
201        } else {
202            let max_gauge = gauge!(max_name);
203            max_gauge.set(max_value);
204            let mean_gauge_handle = gauge!(mean_name);
205            // DEPRECATED: buffer-bytes-events-metrics
206            let legacy_max_gauge = gauge!(legacy_name);
207            legacy_max_gauge.set(max_value);
208            Self {
209                histogram: histogram!(histogram_name),
210                gauge: gauge!(level_name),
211                mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
212                max_gauge,
213                legacy_max_gauge,
214                #[cfg(test)]
215                recorded_values,
216            }
217        }
218    }
219
220    #[expect(clippy::cast_precision_loss)]
221    fn record(&self, value: usize, reference: Instant) {
222        self.histogram.record(value as f64);
223        self.gauge.set(value as f64);
224        self.mean_gauge.record(value as f64, reference);
225        #[cfg(test)]
226        if let Ok(mut recorded) = self.recorded_values.lock() {
227            recorded.push(value);
228        }
229    }
230}
231
232#[derive(Debug)]
233struct Inner<T> {
234    data: Arc<dyn QueueImpl<(OwnedSemaphorePermit, T)>>,
235    limit: MemoryBufferSize,
236    limiter: Arc<Semaphore>,
237    read_waker: Arc<Notify>,
238    metrics: Option<Metrics>,
239    capacity: NonZeroUsize,
240}
241
242impl<T> Clone for Inner<T> {
243    fn clone(&self) -> Self {
244        Self {
245            data: self.data.clone(),
246            limit: self.limit,
247            limiter: self.limiter.clone(),
248            read_waker: self.read_waker.clone(),
249            metrics: self.metrics.clone(),
250            capacity: self.capacity,
251        }
252    }
253}
254
255impl<T: Send + Sync + Debug + 'static> Inner<T> {
256    fn new(
257        limit: MemoryBufferSize,
258        metric_metadata: Option<ChannelMetricMetadata>,
259        ewma_half_life_seconds: Option<f64>,
260    ) -> Self {
261        let read_waker = Arc::new(Notify::new());
262        let metrics =
263            metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_half_life_seconds));
264        match limit {
265            MemoryBufferSize::MaxEvents(max_events) => Inner {
266                data: Arc::new(ArrayQueue::new(max_events.get())),
267                limit,
268                limiter: Arc::new(Semaphore::new(max_events.get())),
269                read_waker,
270                metrics,
271                capacity: max_events,
272            },
273            MemoryBufferSize::MaxSize(max_bytes) => Inner {
274                data: Arc::new(SegQueue::new()),
275                limit,
276                limiter: Arc::new(Semaphore::new(max_bytes.get())),
277                read_waker,
278                metrics,
279                capacity: max_bytes,
280            },
281        }
282    }
283
284    /// Records a send after acquiring all required permits.
285    ///
286    /// The `size` value is the true utilization contribution of `item`, which may exceed the number
287    /// of permits acquired for oversized payloads.
288    fn send_with_permits(&mut self, size: usize, permits: OwnedSemaphorePermit, item: T) {
289        if let Some(metrics) = &self.metrics {
290            // For normal items, capacity - available_permits() exactly represents the total queued
291            // utilization (including this item's just-acquired permits). For oversized items that
292            // acquired fewer permits than their true size, `size` is the correct utilization since
293            // the queue must have been empty for the oversized acquire to succeed.
294            let utilization = size.max(self.used_capacity());
295            metrics.record(utilization, Instant::now());
296        }
297        self.data.push((permits, item));
298        self.read_waker.notify_one();
299    }
300}
301
302impl<T> Inner<T> {
303    fn used_capacity(&self) -> usize {
304        self.capacity.get() - self.limiter.available_permits()
305    }
306
307    fn pop_and_record(&self) -> Option<T> {
308        self.data.pop().map(|(permit, item)| {
309            if let Some(metrics) = &self.metrics {
310                // Compute remaining utilization from the semaphore state. Since our permits haven't
311                // been released yet, used_capacity is stable against racing senders acquiring those
312                // permits.
313                let utilization = self.used_capacity().saturating_sub(permit.num_permits());
314                metrics.record(utilization, Instant::now());
315            }
316            // Release permits after recording so a waiting sender cannot enqueue a new item
317            // before this pop's utilization measurement is taken.
318            drop(permit);
319            item
320        })
321    }
322}
323
324#[derive(Debug)]
325pub struct LimitedSender<T> {
326    inner: Inner<T>,
327    sender_count: Arc<AtomicUsize>,
328}
329
330impl<T: InMemoryBufferable> LimitedSender<T> {
331    #[allow(clippy::cast_possible_truncation)]
332    fn calc_required_permits(&self, item: &T) -> (usize, u32) {
333        // We have to limit the number of permits we ask for to the overall limit since we're always
334        // willing to store more items than the limit if the queue is entirely empty, because
335        // otherwise we might deadlock ourselves by not being able to send a single item.
336        let value = match self.inner.limit {
337            MemoryBufferSize::MaxSize(_) => item.allocated_bytes(),
338            MemoryBufferSize::MaxEvents(_) => item.event_count(),
339        };
340        let limit = self.inner.capacity.get();
341        (value, cmp::min(limit, value) as u32)
342    }
343
344    /// Gets the number of items that this channel could accept.
345    pub fn available_capacity(&self) -> usize {
346        self.inner.limiter.available_permits()
347    }
348
349    /// Sends an item into the channel.
350    ///
351    /// # Errors
352    ///
353    /// If the receiver has disconnected (does not exist anymore), then `Err(SendError)` be returned
354    /// with the given `item`.
355    pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
356        // Calculate how many permits we need, and wait until we can acquire all of them.
357        let (size, permits_required) = self.calc_required_permits(&item);
358        match self
359            .inner
360            .limiter
361            .clone()
362            .acquire_many_owned(permits_required)
363            .await
364        {
365            Ok(permits) => {
366                self.inner.send_with_permits(size, permits, item);
367                trace!("Sent item.");
368                Ok(())
369            }
370            Err(_) => Err(SendError(item)),
371        }
372    }
373
374    /// Attempts to send an item into the channel.
375    ///
376    /// # Errors
377    ///
378    /// If the receiver has disconnected (does not exist anymore), then
379    /// `Err(TrySendError::Disconnected)` be returned with the given `item`. If the channel has
380    /// insufficient capacity for the item, then `Err(TrySendError::InsufficientCapacity)` will be
381    /// returned with the given `item`.
382    ///
383    /// # Panics
384    ///
385    /// Will panic if adding ack amount overflows.
386    pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
387        // Calculate how many permits we need, and try to acquire them all without waiting.
388        let (size, permits_required) = self.calc_required_permits(&item);
389        match self
390            .inner
391            .limiter
392            .clone()
393            .try_acquire_many_owned(permits_required)
394        {
395            Ok(permits) => {
396                self.inner.send_with_permits(size, permits, item);
397                trace!("Attempt to send item succeeded.");
398                Ok(())
399            }
400            Err(TryAcquireError::NoPermits) => Err(TrySendError::InsufficientCapacity(item)),
401            Err(TryAcquireError::Closed) => Err(TrySendError::Disconnected(item)),
402        }
403    }
404}
405
406impl<T> Clone for LimitedSender<T> {
407    fn clone(&self) -> Self {
408        self.sender_count.fetch_add(1, Ordering::SeqCst);
409
410        Self {
411            inner: self.inner.clone(),
412            sender_count: Arc::clone(&self.sender_count),
413        }
414    }
415}
416
417impl<T> Drop for LimitedSender<T> {
418    fn drop(&mut self) {
419        // If we're the last sender to drop, close the semaphore on our way out the door.
420        if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
421            self.inner.limiter.close();
422            self.inner.read_waker.notify_one();
423        }
424    }
425}
426
427#[derive(Debug)]
428pub struct LimitedReceiver<T> {
429    inner: Inner<T>,
430}
431
432impl<T: Send + 'static> LimitedReceiver<T> {
433    /// Gets the number of items that this channel could accept.
434    pub fn available_capacity(&self) -> usize {
435        self.inner.limiter.available_permits()
436    }
437
438    pub async fn next(&mut self) -> Option<T> {
439        loop {
440            if let Some(item) = self.inner.pop_and_record() {
441                return Some(item);
442            }
443
444            // There wasn't an item for us to pop, so see if the channel is actually closed.  If so,
445            // then it's time for us to close up shop as well.
446            if self.inner.limiter.is_closed() {
447                if self.available_capacity() < self.inner.capacity.get() {
448                    // We only terminate when closed and fully drained. A close can race with queue
449                    // visibility while items/in-flight permits still exist.
450                    tokio::task::yield_now().await;
451                    continue;
452                }
453                return None;
454            }
455
456            // We're not closed, so we need to wait for a writer to tell us they made some
457            // progress.  This might end up being a spurious wakeup since `Notify` will
458            // store a wake-up if there are no waiters, but oh well.
459            self.inner.read_waker.notified().await;
460        }
461    }
462
463    pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = T> + Send>> {
464        let mut receiver = self;
465        Box::pin(stream! {
466            while let Some(item) = receiver.next().await {
467                yield item;
468            }
469        })
470    }
471}
472
473impl<T> Drop for LimitedReceiver<T> {
474    fn drop(&mut self) {
475        // Notify senders that the channel is now closed by closing the semaphore.  Any pending
476        // acquisitions will be awoken and notified that the semaphore is closed, and further new
477        // sends will immediately see the semaphore is closed.
478        self.inner.limiter.close();
479    }
480}
481
482pub fn limited<T: InMemoryBufferable + fmt::Debug>(
483    limit: MemoryBufferSize,
484    metric_metadata: Option<ChannelMetricMetadata>,
485    ewma_half_life_seconds: Option<f64>,
486) -> (LimitedSender<T>, LimitedReceiver<T>) {
487    let inner = Inner::new(limit, metric_metadata, ewma_half_life_seconds);
488
489    let sender = LimitedSender {
490        inner: inner.clone(),
491        sender_count: Arc::new(AtomicUsize::new(1)),
492    };
493    let receiver = LimitedReceiver { inner };
494
495    (sender, receiver)
496}
497
498#[cfg(test)]
499mod tests {
500    use std::num::NonZeroUsize;
501
502    use rand::{Rng as _, SeedableRng as _, rngs::SmallRng};
503    use tokio_test::{assert_pending, assert_ready, task::spawn};
504    use vector_common::byte_size_of::ByteSizeOf;
505
506    use super::{
507        BufferChannelKind, ChannelMetricMetadata, LimitedReceiver, LimitedSender, limited,
508    };
509    use crate::{
510        MemoryBufferSize,
511        test::MultiEventRecord,
512        topology::{channel::limited_queue::SendError, test_util::Sample},
513    };
514
515    #[tokio::test]
516    async fn send_receive() {
517        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
518        let (mut tx, mut rx) = limited(limit, None, None);
519
520        assert_eq!(2, tx.available_capacity());
521
522        let msg = Sample::new(42);
523
524        // Create our send and receive futures.
525        let mut send = spawn(async { tx.send(msg).await });
526
527        let mut recv = spawn(async { rx.next().await });
528
529        // Nobody should be woken up.
530        assert!(!send.is_woken());
531        assert!(!recv.is_woken());
532
533        // Try polling our receive, which should be pending because we haven't anything yet.
534        assert_pending!(recv.poll());
535
536        // We should immediately be able to complete a send as there is available capacity.
537        assert_eq!(Ok(()), assert_ready!(send.poll()));
538
539        // Now our receive should have been woken up, and should immediately be ready.
540        assert!(recv.is_woken());
541        assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
542    }
543
544    #[tokio::test]
545    async fn records_utilization() {
546        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
547        let (mut tx, mut rx) = limited(
548            limit,
549            Some(ChannelMetricMetadata::new(BufferChannelKind::Source, None)),
550            None,
551        );
552
553        let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
554
555        tx.send(Sample::new(1)).await.expect("send should succeed");
556        let records = metrics.lock().unwrap().clone();
557        assert_eq!(records.len(), 1);
558        assert_eq!(records.last().copied(), Some(1));
559
560        assert_eq!(Sample::new(1), rx.next().await.unwrap());
561        let records = metrics.lock().unwrap();
562        assert_eq!(records.len(), 2);
563        assert_eq!(records.last().copied(), Some(0));
564    }
565
566    #[tokio::test]
567    async fn records_utilization_transform_channel() {
568        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
569        let (mut tx, mut rx) = limited(
570            limit,
571            Some(ChannelMetricMetadata::new(
572                BufferChannelKind::Transform,
573                None,
574            )),
575            None,
576        );
577
578        let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
579
580        tx.send(Sample::new(1)).await.expect("send should succeed");
581        let records = metrics.lock().unwrap().clone();
582        assert_eq!(records.len(), 1);
583        assert_eq!(records.last().copied(), Some(1));
584
585        assert_eq!(Sample::new(1), rx.next().await.unwrap());
586        let records = metrics.lock().unwrap();
587        assert_eq!(records.len(), 2);
588        assert_eq!(records.last().copied(), Some(0));
589    }
590
591    #[tokio::test]
592    async fn oversized_send_records_true_utilization_via_normal_send_path() {
593        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
594        let (mut tx, mut rx) = limited(
595            limit,
596            Some(ChannelMetricMetadata::new(BufferChannelKind::Source, None)),
597            None,
598        );
599        let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
600
601        // Normal send path: permits are capped to the limit (2), but utilization should reflect
602        // the true item contribution (3).
603        let oversized = MultiEventRecord::new(3);
604        tx.send(oversized.clone())
605            .await
606            .expect("send should succeed");
607
608        let records = metrics.lock().unwrap().clone();
609        assert_eq!(records.len(), 1);
610        assert_eq!(records.last().copied(), Some(3));
611
612        assert_eq!(Some(oversized), rx.next().await);
613        let records = metrics.lock().unwrap().clone();
614        assert_eq!(records.len(), 2);
615        assert_eq!(records.last().copied(), Some(0));
616    }
617
618    #[test]
619    fn test_limiting_by_byte_size() {
620        let max_elements = 10;
621        let msg = Sample::new_with_heap_allocated_values(50);
622        let msg_size = msg.allocated_bytes();
623        let max_allowed_bytes = msg_size * max_elements;
624
625        // With this configuration a maximum of exactly 10 messages can fit in the channel
626        let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap());
627        let (mut tx, mut rx) = limited(limit, None, None);
628
629        assert_eq!(max_allowed_bytes, tx.available_capacity());
630
631        // Send 10 messages into the channel, filling it
632        for _ in 0..10 {
633            let msg_clone = msg.clone();
634            let mut f = spawn(async { tx.send(msg_clone).await });
635            assert_eq!(Ok(()), assert_ready!(f.poll()));
636        }
637        // With the 10th message in the channel no space should be left
638        assert_eq!(0, tx.available_capacity());
639
640        // Attemting to produce one more then the max capacity should block
641        let mut send_final = spawn({
642            let msg_clone = msg.clone();
643            async { tx.send(msg_clone).await }
644        });
645        assert_pending!(send_final.poll());
646
647        // Read all data from the channel, assert final states are as expected
648        for _ in 0..10 {
649            let mut f = spawn(async { rx.next().await });
650            let value = assert_ready!(f.poll());
651            assert_eq!(value.allocated_bytes(), msg_size);
652        }
653        // Channel should have no more data
654        let mut recv = spawn(async { rx.next().await });
655        assert_pending!(recv.poll());
656    }
657
658    #[test]
659    fn sender_waits_for_more_capacity_when_none_available() {
660        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
661        let (mut tx, mut rx) = limited(limit, None, None);
662
663        assert_eq!(1, tx.available_capacity());
664
665        let msg1 = Sample::new(42);
666        let msg2 = Sample::new(43);
667
668        // Create our send and receive futures.
669        let mut send1 = spawn(async { tx.send(msg1).await });
670
671        let mut recv1 = spawn(async { rx.next().await });
672
673        // Nobody should be woken up.
674        assert!(!send1.is_woken());
675        assert!(!recv1.is_woken());
676
677        // Try polling our receive, which should be pending because we haven't anything yet.
678        assert_pending!(recv1.poll());
679
680        // We should immediately be able to complete a send as there is available capacity.
681        assert_eq!(Ok(()), assert_ready!(send1.poll()));
682        drop(send1);
683
684        assert_eq!(0, tx.available_capacity());
685
686        // Now our receive should have been woken up, and should immediately be ready... but we
687        // aren't going to read the value just yet.
688        assert!(recv1.is_woken());
689
690        // Now trigger a second send, which should block as there's no available capacity.
691        let mut send2 = spawn(async { tx.send(msg2).await });
692
693        assert!(!send2.is_woken());
694        assert_pending!(send2.poll());
695
696        // Now if we receive the item, our second send should be woken up and be able to send in.
697        assert_eq!(Some(Sample::new(42)), assert_ready!(recv1.poll()));
698        drop(recv1);
699
700        // Since the second send was already waiting for permits, the semaphore returns them
701        // directly to our waiting send, which should now be woken up and able to complete:
702        assert_eq!(0, rx.available_capacity());
703        assert!(send2.is_woken());
704
705        let mut recv2 = spawn(async { rx.next().await });
706        assert_pending!(recv2.poll());
707
708        assert_eq!(Ok(()), assert_ready!(send2.poll()));
709        drop(send2);
710
711        assert_eq!(0, tx.available_capacity());
712
713        // And the final receive to get our second send:
714        assert!(recv2.is_woken());
715        assert_eq!(Some(Sample::new(43)), assert_ready!(recv2.poll()));
716
717        assert_eq!(1, tx.available_capacity());
718    }
719
720    #[test]
721    fn sender_waits_for_more_capacity_when_partial_available() {
722        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap());
723        let (mut tx, mut rx) = limited(limit, None, None);
724
725        assert_eq!(7, tx.available_capacity());
726
727        let msgs1 = vec![
728            MultiEventRecord::new(1),
729            MultiEventRecord::new(2),
730            MultiEventRecord::new(3),
731        ];
732        let msg2 = MultiEventRecord::new(4);
733
734        // Create our send and receive futures.
735        let mut small_sends = spawn(async {
736            for msg in msgs1.clone() {
737                tx.send(msg).await?;
738            }
739
740            Ok::<_, SendError<MultiEventRecord>>(())
741        });
742
743        let mut recv1 = spawn(async { rx.next().await });
744
745        // Nobody should be woken up.
746        assert!(!small_sends.is_woken());
747        assert!(!recv1.is_woken());
748
749        // Try polling our receive, which should be pending because we haven't anything yet.
750        assert_pending!(recv1.poll());
751
752        // We should immediately be able to complete our three event sends, which we have
753        // available capacity for, but will consume all but one of the available slots.
754        assert_eq!(Ok(()), assert_ready!(small_sends.poll()));
755        drop(small_sends);
756
757        assert_eq!(1, tx.available_capacity());
758
759        // Now our receive should have been woken up, and should immediately be ready, but we won't
760        // receive just yet.
761        assert!(recv1.is_woken());
762
763        // Now trigger a second send that has four events, and needs to wait for two receives to happen.
764        let mut send2 = spawn(tx.send(msg2.clone()));
765
766        assert!(!send2.is_woken());
767        assert_pending!(send2.poll());
768
769        // Now if we receive the first item, our second send should be woken up but still not able
770        // to send.
771        assert_eq!(Some(&msgs1[0]), assert_ready!(recv1.poll()).as_ref());
772        drop(recv1);
773
774        // Callers waiting to acquire permits have the permits immediately transfer to them when one
775        // (or more) are released, so we expect this to be zero until we send and then read the
776        // third item.
777        assert_eq!(0, rx.available_capacity());
778
779        // We don't get woken up until all permits have been acquired.
780        assert!(!send2.is_woken());
781
782        // Our second read should unlock enough available capacity for the second send once complete.
783        let mut recv2 = spawn(async { rx.next().await });
784        assert!(!recv2.is_woken());
785        assert_eq!(Some(&msgs1[1]), assert_ready!(recv2.poll()).as_ref());
786        drop(recv2);
787
788        assert_eq!(0, rx.available_capacity());
789
790        assert!(send2.is_woken());
791        assert_eq!(Ok(()), assert_ready!(send2.poll()));
792
793        // And just make sure we see those last two sends.
794        let mut recv3 = spawn(async { rx.next().await });
795        assert!(!recv3.is_woken());
796        assert_eq!(Some(&msgs1[2]), assert_ready!(recv3.poll()).as_ref());
797        drop(recv3);
798
799        assert_eq!(3, rx.available_capacity());
800
801        let mut recv4 = spawn(async { rx.next().await });
802        assert!(!recv4.is_woken());
803        assert_eq!(Some(msg2), assert_ready!(recv4.poll()));
804        drop(recv4);
805
806        assert_eq!(7, rx.available_capacity());
807    }
808
809    #[test]
810    fn empty_receiver_returns_none_when_last_sender_drops() {
811        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
812        let (mut tx, mut rx) = limited(limit, None, None);
813
814        assert_eq!(1, tx.available_capacity());
815
816        let tx2 = tx.clone();
817        let msg = Sample::new(42);
818
819        // Create our send and receive futures.
820        let mut send = spawn(async { tx.send(msg).await });
821
822        let mut recv = spawn(async { rx.next().await });
823
824        // Nobody should be woken up.
825        assert!(!send.is_woken());
826        assert!(!recv.is_woken());
827
828        // Try polling our receive, which should be pending because we haven't anything yet.
829        assert_pending!(recv.poll());
830
831        // Now drop our second sender, which shouldn't do anything yet.
832        drop(tx2);
833        assert!(!recv.is_woken());
834        assert_pending!(recv.poll());
835
836        // Now drop our second sender, but not before doing a send, which should trigger closing the
837        // semaphore which should let the receiver complete with no further waiting: one item and
838        // then `None`.
839        assert_eq!(Ok(()), assert_ready!(send.poll()));
840        drop(send);
841        drop(tx);
842
843        assert!(recv.is_woken());
844        assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
845        drop(recv);
846
847        let mut recv2 = spawn(async { rx.next().await });
848        assert!(!recv2.is_woken());
849        assert_eq!(None, assert_ready!(recv2.poll()));
850    }
851
852    #[test]
853    fn receiver_returns_none_once_empty_when_last_sender_drops() {
854        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
855        let (tx, mut rx) = limited::<Sample>(limit, None, None);
856
857        assert_eq!(1, tx.available_capacity());
858
859        let tx2 = tx.clone();
860
861        // Create our receive future.
862        let mut recv = spawn(async { rx.next().await });
863
864        // Nobody should be woken up.
865        assert!(!recv.is_woken());
866
867        // Try polling our receive, which should be pending because we haven't anything yet.
868        assert_pending!(recv.poll());
869
870        // Now drop our first sender, which shouldn't do anything yet.
871        drop(tx);
872        assert!(!recv.is_woken());
873        assert_pending!(recv.poll());
874
875        // Now drop our second sender, which should trigger closing the semaphore which should let
876        // the receive complete as there are no items to read.
877        drop(tx2);
878        assert!(recv.is_woken());
879        assert_eq!(None, assert_ready!(recv.poll()));
880    }
881
882    #[test]
883    fn oversized_send_allowed_when_empty() {
884        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
885        let (mut tx, mut rx) = limited(limit, None, None);
886
887        assert_eq!(1, tx.available_capacity());
888
889        let msg = MultiEventRecord::new(2);
890
891        // Create our send and receive futures.
892        let mut send = spawn(async { tx.send(msg.clone()).await });
893
894        let mut recv = spawn(async { rx.next().await });
895
896        // Nobody should be woken up.
897        assert!(!send.is_woken());
898        assert!(!recv.is_woken());
899
900        // We should immediately be able to complete our send, which we don't have full
901        // available capacity for, but will consume all of the available slots.
902        assert_eq!(Ok(()), assert_ready!(send.poll()));
903        drop(send);
904
905        assert_eq!(0, tx.available_capacity());
906
907        // Now we should be able to get back the oversized item, but our capacity should not be
908        // greater than what we started with.
909        assert_eq!(Some(msg), assert_ready!(recv.poll()));
910        drop(recv);
911
912        assert_eq!(1, rx.available_capacity());
913    }
914
915    #[test]
916    fn oversized_send_allowed_when_partial_capacity() {
917        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
918        let (mut tx, mut rx) = limited(limit, None, None);
919
920        assert_eq!(2, tx.available_capacity());
921
922        let msg1 = MultiEventRecord::new(1);
923        let msg2 = MultiEventRecord::new(3);
924
925        // Create our send future.
926        let mut send = spawn(async { tx.send(msg1.clone()).await });
927
928        // Nobody should be woken up.
929        assert!(!send.is_woken());
930
931        // We should immediately be able to complete our send, which will only use up a single slot.
932        assert_eq!(Ok(()), assert_ready!(send.poll()));
933        drop(send);
934
935        assert_eq!(1, tx.available_capacity());
936
937        // Now we'll trigger another send which has an oversized item.  It shouldn't be able to send
938        // until all permits are available.
939        let mut send2 = spawn(async { tx.send(msg2.clone()).await });
940
941        assert!(!send2.is_woken());
942        assert_pending!(send2.poll());
943
944        assert_eq!(0, rx.available_capacity());
945
946        // Now do a receive which should return the one consumed slot, essentially allowing all
947        // permits to be acquired by the blocked send.
948        let mut recv = spawn(async { rx.next().await });
949        assert!(!recv.is_woken());
950        assert!(!send2.is_woken());
951
952        assert_eq!(Some(msg1), assert_ready!(recv.poll()));
953        drop(recv);
954
955        assert_eq!(0, rx.available_capacity());
956
957        // Now our blocked send should be able to proceed, and we should be able to read back the
958        // item.
959        assert_eq!(Ok(()), assert_ready!(send2.poll()));
960        drop(send2);
961
962        assert_eq!(0, tx.available_capacity());
963
964        let mut recv2 = spawn(async { rx.next().await });
965        assert_eq!(Some(msg2), assert_ready!(recv2.poll()));
966
967        assert_eq!(2, tx.available_capacity());
968    }
969
970    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
971    async fn concurrent_send_receive_metrics_remain_valid() {
972        const ITEM_COUNT: usize = 4_000;
973
974        // Try different sizes of the buffer, from 10 to 1000 events.
975        for size in 1..=100 {
976            let limit = NonZeroUsize::new(size * 10).unwrap();
977            let (tx, rx) = limited(
978                MemoryBufferSize::MaxEvents(limit),
979                Some(ChannelMetricMetadata::new(BufferChannelKind::Source, None)),
980                None,
981            );
982            let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
983
984            let sender = tokio::spawn(send_samples(tx, ITEM_COUNT));
985            let receiver = tokio::spawn(receive_samples(rx, ITEM_COUNT));
986
987            sender.await.expect("sender task should not panic");
988            receiver.await.expect("receiver task should not panic");
989
990            let recorded = metrics.lock().unwrap().clone();
991            assert_eq!(
992                recorded.len(),
993                ITEM_COUNT * 2,
994                "expected one metric update per send and per receive"
995            );
996
997            // For MaxEvents with single-event messages, the occupancy counter tracks exact
998            // utilization, so values must stay within [0, limit].
999            let max_allowed = limit.get();
1000            let observed_max = recorded.iter().copied().max().unwrap_or_default();
1001            assert!(
1002                recorded.iter().all(|value| *value <= max_allowed),
1003                "observed utilization value above valid bound: max={observed_max}, allowed={max_allowed}"
1004            );
1005        }
1006    }
1007
1008    async fn send_samples(mut tx: LimitedSender<Sample>, item_count: usize) {
1009        let mut rng = SmallRng::from_rng(&mut rand::rng());
1010
1011        for i in 0..item_count {
1012            tx.send(Sample::new(i as u64))
1013                .await
1014                .expect("send should succeed");
1015            if rng.random::<u8>() % 8 == 0 {
1016                tokio::task::yield_now().await;
1017            }
1018        }
1019    }
1020
1021    async fn receive_samples(mut rx: LimitedReceiver<Sample>, item_count: usize) {
1022        let mut rng = SmallRng::from_rng(&mut rand::rng());
1023
1024        for i in 0..item_count {
1025            let next = rx
1026                .next()
1027                .await
1028                .expect("receiver should yield all sent items");
1029            assert_eq!(Sample::new(i as u64), next);
1030            if rng.random::<u8>() % 8 == 0 {
1031                tokio::task::yield_now().await;
1032            }
1033        }
1034    }
1035}