1use std::{
2 cmp,
3 fmt::{self, Debug},
4 num::NonZeroUsize,
5 pin::Pin,
6 sync::{
7 Arc,
8 atomic::{AtomicUsize, Ordering},
9 },
10 time::Instant,
11};
12
13#[cfg(test)]
14use std::sync::Mutex;
15
16use async_stream::stream;
17use crossbeam_queue::{ArrayQueue, SegQueue};
18use futures::Stream;
19use metrics::{Gauge, Histogram};
20use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
21use vector_common::internal_event::{GaugeName, HistogramName};
22use vector_common::stats::TimeEwmaGauge;
23use vector_common::{gauge, histogram};
24
25use crate::{InMemoryBufferable, config::MemoryBufferSize};
26
27pub const DEFAULT_EWMA_HALF_LIFE_SECONDS: f64 = 5.0;
28
29#[derive(Clone, Copy, Debug)]
31pub enum BufferChannelKind {
32 Source,
33 Transform,
34}
35
36#[derive(Debug, PartialEq, Eq)]
38pub struct SendError<T>(pub T);
39
40impl<T> fmt::Display for SendError<T> {
41 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
42 write!(fmt, "receiver disconnected")
43 }
44}
45
46impl<T: fmt::Debug> std::error::Error for SendError<T> {}
47
48#[derive(Debug, PartialEq, Eq)]
50pub enum TrySendError<T> {
51 InsufficientCapacity(T),
52 Disconnected(T),
53}
54
55impl<T> TrySendError<T> {
56 pub fn into_inner(self) -> T {
57 match self {
58 Self::InsufficientCapacity(item) | Self::Disconnected(item) => item,
59 }
60 }
61}
62
63impl<T> fmt::Display for TrySendError<T> {
64 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
65 match self {
66 Self::InsufficientCapacity(_) => {
67 write!(fmt, "channel lacks sufficient capacity for send")
68 }
69 Self::Disconnected(_) => write!(fmt, "receiver disconnected"),
70 }
71 }
72}
73
74impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
75
76trait QueueImpl<T>: Send + Sync + fmt::Debug {
78 fn push(&self, item: T);
79 fn pop(&self) -> Option<T>;
80}
81
82impl<T> QueueImpl<T> for ArrayQueue<T>
83where
84 T: Send + Sync + fmt::Debug,
85{
86 fn push(&self, item: T) {
87 self.push(item)
88 .unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full."));
89 }
90
91 fn pop(&self) -> Option<T> {
92 self.pop()
93 }
94}
95
96impl<T> QueueImpl<T> for SegQueue<T>
97where
98 T: Send + Sync + fmt::Debug,
99{
100 fn push(&self, item: T) {
101 self.push(item);
102 }
103
104 fn pop(&self) -> Option<T> {
105 self.pop()
106 }
107}
108
109#[derive(Clone, Debug)]
110pub struct ChannelMetricMetadata {
111 kind: BufferChannelKind,
112 output: Option<String>,
113}
114
115impl ChannelMetricMetadata {
116 pub fn new(kind: BufferChannelKind, output: Option<String>) -> Self {
117 Self { kind, output }
118 }
119}
120
121#[derive(Clone, Debug)]
122struct Metrics {
123 histogram: Histogram,
124 gauge: Gauge,
125 mean_gauge: TimeEwmaGauge,
126 #[expect(dead_code)]
130 max_gauge: Gauge,
131 #[expect(dead_code)]
132 legacy_max_gauge: Gauge,
133 #[cfg(test)]
134 recorded_values: Arc<Mutex<Vec<usize>>>,
135}
136
137impl Metrics {
138 #[expect(clippy::cast_precision_loss)] fn new(
140 limit: MemoryBufferSize,
141 metadata: ChannelMetricMetadata,
142 ewma_half_life_seconds: Option<f64>,
143 ) -> Self {
144 let ewma_half_life_seconds =
145 ewma_half_life_seconds.unwrap_or(DEFAULT_EWMA_HALF_LIFE_SECONDS);
146 let ChannelMetricMetadata { kind, output } = metadata;
147
148 let (histogram_name, level_name, mean_name) = match kind {
149 BufferChannelKind::Source => (
150 HistogramName::SourceBufferUtilization,
151 GaugeName::SourceBufferUtilizationLevel,
152 GaugeName::SourceBufferUtilizationMean,
153 ),
154 BufferChannelKind::Transform => (
155 HistogramName::TransformBufferUtilization,
156 GaugeName::TransformBufferUtilizationLevel,
157 GaugeName::TransformBufferUtilizationMean,
158 ),
159 };
160
161 let (max_name, legacy_name, max_value) = match (kind, limit) {
162 (BufferChannelKind::Source, MemoryBufferSize::MaxEvents(n)) => (
163 GaugeName::SourceBufferMaxSizeEvents,
164 GaugeName::SourceBufferMaxEventSize,
165 n.get() as f64,
166 ),
167 (BufferChannelKind::Source, MemoryBufferSize::MaxSize(n)) => (
168 GaugeName::SourceBufferMaxSizeBytes,
169 GaugeName::SourceBufferMaxByteSize,
170 n.get() as f64,
171 ),
172 (BufferChannelKind::Transform, MemoryBufferSize::MaxEvents(n)) => (
173 GaugeName::TransformBufferMaxSizeEvents,
174 GaugeName::TransformBufferMaxEventSize,
175 n.get() as f64,
176 ),
177 (BufferChannelKind::Transform, MemoryBufferSize::MaxSize(n)) => (
178 GaugeName::TransformBufferMaxSizeBytes,
179 GaugeName::TransformBufferMaxByteSize,
180 n.get() as f64,
181 ),
182 };
183 #[cfg(test)]
184 let recorded_values = Arc::new(Mutex::new(Vec::new()));
185 if let Some(label_value) = output {
186 let max_gauge = gauge!(max_name, "output" => label_value.clone());
187 max_gauge.set(max_value);
188 let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
189 let legacy_max_gauge = gauge!(legacy_name, "output" => label_value.clone());
191 legacy_max_gauge.set(max_value);
192 Self {
193 histogram: histogram!(histogram_name, "output" => label_value.clone()),
194 gauge: gauge!(level_name, "output" => label_value.clone()),
195 mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
196 max_gauge,
197 legacy_max_gauge,
198 #[cfg(test)]
199 recorded_values,
200 }
201 } else {
202 let max_gauge = gauge!(max_name);
203 max_gauge.set(max_value);
204 let mean_gauge_handle = gauge!(mean_name);
205 let legacy_max_gauge = gauge!(legacy_name);
207 legacy_max_gauge.set(max_value);
208 Self {
209 histogram: histogram!(histogram_name),
210 gauge: gauge!(level_name),
211 mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
212 max_gauge,
213 legacy_max_gauge,
214 #[cfg(test)]
215 recorded_values,
216 }
217 }
218 }
219
220 #[expect(clippy::cast_precision_loss)]
221 fn record(&self, value: usize, reference: Instant) {
222 self.histogram.record(value as f64);
223 self.gauge.set(value as f64);
224 self.mean_gauge.record(value as f64, reference);
225 #[cfg(test)]
226 if let Ok(mut recorded) = self.recorded_values.lock() {
227 recorded.push(value);
228 }
229 }
230}
231
232#[derive(Debug)]
233struct Inner<T> {
234 data: Arc<dyn QueueImpl<(OwnedSemaphorePermit, T)>>,
235 limit: MemoryBufferSize,
236 limiter: Arc<Semaphore>,
237 read_waker: Arc<Notify>,
238 metrics: Option<Metrics>,
239 capacity: NonZeroUsize,
240}
241
242impl<T> Clone for Inner<T> {
243 fn clone(&self) -> Self {
244 Self {
245 data: self.data.clone(),
246 limit: self.limit,
247 limiter: self.limiter.clone(),
248 read_waker: self.read_waker.clone(),
249 metrics: self.metrics.clone(),
250 capacity: self.capacity,
251 }
252 }
253}
254
255impl<T: Send + Sync + Debug + 'static> Inner<T> {
256 fn new(
257 limit: MemoryBufferSize,
258 metric_metadata: Option<ChannelMetricMetadata>,
259 ewma_half_life_seconds: Option<f64>,
260 ) -> Self {
261 let read_waker = Arc::new(Notify::new());
262 let metrics =
263 metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_half_life_seconds));
264 match limit {
265 MemoryBufferSize::MaxEvents(max_events) => Inner {
266 data: Arc::new(ArrayQueue::new(max_events.get())),
267 limit,
268 limiter: Arc::new(Semaphore::new(max_events.get())),
269 read_waker,
270 metrics,
271 capacity: max_events,
272 },
273 MemoryBufferSize::MaxSize(max_bytes) => Inner {
274 data: Arc::new(SegQueue::new()),
275 limit,
276 limiter: Arc::new(Semaphore::new(max_bytes.get())),
277 read_waker,
278 metrics,
279 capacity: max_bytes,
280 },
281 }
282 }
283
284 fn send_with_permits(&mut self, size: usize, permits: OwnedSemaphorePermit, item: T) {
289 if let Some(metrics) = &self.metrics {
290 let utilization = size.max(self.used_capacity());
295 metrics.record(utilization, Instant::now());
296 }
297 self.data.push((permits, item));
298 self.read_waker.notify_one();
299 }
300}
301
302impl<T> Inner<T> {
303 fn used_capacity(&self) -> usize {
304 self.capacity.get() - self.limiter.available_permits()
305 }
306
307 fn pop_and_record(&self) -> Option<T> {
308 self.data.pop().map(|(permit, item)| {
309 if let Some(metrics) = &self.metrics {
310 let utilization = self.used_capacity().saturating_sub(permit.num_permits());
314 metrics.record(utilization, Instant::now());
315 }
316 drop(permit);
319 item
320 })
321 }
322}
323
324#[derive(Debug)]
325pub struct LimitedSender<T> {
326 inner: Inner<T>,
327 sender_count: Arc<AtomicUsize>,
328}
329
330impl<T: InMemoryBufferable> LimitedSender<T> {
331 #[allow(clippy::cast_possible_truncation)]
332 fn calc_required_permits(&self, item: &T) -> (usize, u32) {
333 let value = match self.inner.limit {
337 MemoryBufferSize::MaxSize(_) => item.allocated_bytes(),
338 MemoryBufferSize::MaxEvents(_) => item.event_count(),
339 };
340 let limit = self.inner.capacity.get();
341 (value, cmp::min(limit, value) as u32)
342 }
343
344 pub fn available_capacity(&self) -> usize {
346 self.inner.limiter.available_permits()
347 }
348
349 pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
356 let (size, permits_required) = self.calc_required_permits(&item);
358 match self
359 .inner
360 .limiter
361 .clone()
362 .acquire_many_owned(permits_required)
363 .await
364 {
365 Ok(permits) => {
366 self.inner.send_with_permits(size, permits, item);
367 trace!("Sent item.");
368 Ok(())
369 }
370 Err(_) => Err(SendError(item)),
371 }
372 }
373
374 pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
387 let (size, permits_required) = self.calc_required_permits(&item);
389 match self
390 .inner
391 .limiter
392 .clone()
393 .try_acquire_many_owned(permits_required)
394 {
395 Ok(permits) => {
396 self.inner.send_with_permits(size, permits, item);
397 trace!("Attempt to send item succeeded.");
398 Ok(())
399 }
400 Err(TryAcquireError::NoPermits) => Err(TrySendError::InsufficientCapacity(item)),
401 Err(TryAcquireError::Closed) => Err(TrySendError::Disconnected(item)),
402 }
403 }
404}
405
406impl<T> Clone for LimitedSender<T> {
407 fn clone(&self) -> Self {
408 self.sender_count.fetch_add(1, Ordering::SeqCst);
409
410 Self {
411 inner: self.inner.clone(),
412 sender_count: Arc::clone(&self.sender_count),
413 }
414 }
415}
416
417impl<T> Drop for LimitedSender<T> {
418 fn drop(&mut self) {
419 if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
421 self.inner.limiter.close();
422 self.inner.read_waker.notify_one();
423 }
424 }
425}
426
427#[derive(Debug)]
428pub struct LimitedReceiver<T> {
429 inner: Inner<T>,
430}
431
432impl<T: Send + 'static> LimitedReceiver<T> {
433 pub fn available_capacity(&self) -> usize {
435 self.inner.limiter.available_permits()
436 }
437
438 pub async fn next(&mut self) -> Option<T> {
439 loop {
440 if let Some(item) = self.inner.pop_and_record() {
441 return Some(item);
442 }
443
444 if self.inner.limiter.is_closed() {
447 if self.available_capacity() < self.inner.capacity.get() {
448 tokio::task::yield_now().await;
451 continue;
452 }
453 return None;
454 }
455
456 self.inner.read_waker.notified().await;
460 }
461 }
462
463 pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = T> + Send>> {
464 let mut receiver = self;
465 Box::pin(stream! {
466 while let Some(item) = receiver.next().await {
467 yield item;
468 }
469 })
470 }
471}
472
473impl<T> Drop for LimitedReceiver<T> {
474 fn drop(&mut self) {
475 self.inner.limiter.close();
479 }
480}
481
482pub fn limited<T: InMemoryBufferable + fmt::Debug>(
483 limit: MemoryBufferSize,
484 metric_metadata: Option<ChannelMetricMetadata>,
485 ewma_half_life_seconds: Option<f64>,
486) -> (LimitedSender<T>, LimitedReceiver<T>) {
487 let inner = Inner::new(limit, metric_metadata, ewma_half_life_seconds);
488
489 let sender = LimitedSender {
490 inner: inner.clone(),
491 sender_count: Arc::new(AtomicUsize::new(1)),
492 };
493 let receiver = LimitedReceiver { inner };
494
495 (sender, receiver)
496}
497
498#[cfg(test)]
499mod tests {
500 use std::num::NonZeroUsize;
501
502 use rand::{Rng as _, SeedableRng as _, rngs::SmallRng};
503 use tokio_test::{assert_pending, assert_ready, task::spawn};
504 use vector_common::byte_size_of::ByteSizeOf;
505
506 use super::{
507 BufferChannelKind, ChannelMetricMetadata, LimitedReceiver, LimitedSender, limited,
508 };
509 use crate::{
510 MemoryBufferSize,
511 test::MultiEventRecord,
512 topology::{channel::limited_queue::SendError, test_util::Sample},
513 };
514
515 #[tokio::test]
516 async fn send_receive() {
517 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
518 let (mut tx, mut rx) = limited(limit, None, None);
519
520 assert_eq!(2, tx.available_capacity());
521
522 let msg = Sample::new(42);
523
524 let mut send = spawn(async { tx.send(msg).await });
526
527 let mut recv = spawn(async { rx.next().await });
528
529 assert!(!send.is_woken());
531 assert!(!recv.is_woken());
532
533 assert_pending!(recv.poll());
535
536 assert_eq!(Ok(()), assert_ready!(send.poll()));
538
539 assert!(recv.is_woken());
541 assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
542 }
543
544 #[tokio::test]
545 async fn records_utilization() {
546 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
547 let (mut tx, mut rx) = limited(
548 limit,
549 Some(ChannelMetricMetadata::new(BufferChannelKind::Source, None)),
550 None,
551 );
552
553 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
554
555 tx.send(Sample::new(1)).await.expect("send should succeed");
556 let records = metrics.lock().unwrap().clone();
557 assert_eq!(records.len(), 1);
558 assert_eq!(records.last().copied(), Some(1));
559
560 assert_eq!(Sample::new(1), rx.next().await.unwrap());
561 let records = metrics.lock().unwrap();
562 assert_eq!(records.len(), 2);
563 assert_eq!(records.last().copied(), Some(0));
564 }
565
566 #[tokio::test]
567 async fn records_utilization_transform_channel() {
568 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
569 let (mut tx, mut rx) = limited(
570 limit,
571 Some(ChannelMetricMetadata::new(
572 BufferChannelKind::Transform,
573 None,
574 )),
575 None,
576 );
577
578 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
579
580 tx.send(Sample::new(1)).await.expect("send should succeed");
581 let records = metrics.lock().unwrap().clone();
582 assert_eq!(records.len(), 1);
583 assert_eq!(records.last().copied(), Some(1));
584
585 assert_eq!(Sample::new(1), rx.next().await.unwrap());
586 let records = metrics.lock().unwrap();
587 assert_eq!(records.len(), 2);
588 assert_eq!(records.last().copied(), Some(0));
589 }
590
591 #[tokio::test]
592 async fn oversized_send_records_true_utilization_via_normal_send_path() {
593 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
594 let (mut tx, mut rx) = limited(
595 limit,
596 Some(ChannelMetricMetadata::new(BufferChannelKind::Source, None)),
597 None,
598 );
599 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
600
601 let oversized = MultiEventRecord::new(3);
604 tx.send(oversized.clone())
605 .await
606 .expect("send should succeed");
607
608 let records = metrics.lock().unwrap().clone();
609 assert_eq!(records.len(), 1);
610 assert_eq!(records.last().copied(), Some(3));
611
612 assert_eq!(Some(oversized), rx.next().await);
613 let records = metrics.lock().unwrap().clone();
614 assert_eq!(records.len(), 2);
615 assert_eq!(records.last().copied(), Some(0));
616 }
617
618 #[test]
619 fn test_limiting_by_byte_size() {
620 let max_elements = 10;
621 let msg = Sample::new_with_heap_allocated_values(50);
622 let msg_size = msg.allocated_bytes();
623 let max_allowed_bytes = msg_size * max_elements;
624
625 let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap());
627 let (mut tx, mut rx) = limited(limit, None, None);
628
629 assert_eq!(max_allowed_bytes, tx.available_capacity());
630
631 for _ in 0..10 {
633 let msg_clone = msg.clone();
634 let mut f = spawn(async { tx.send(msg_clone).await });
635 assert_eq!(Ok(()), assert_ready!(f.poll()));
636 }
637 assert_eq!(0, tx.available_capacity());
639
640 let mut send_final = spawn({
642 let msg_clone = msg.clone();
643 async { tx.send(msg_clone).await }
644 });
645 assert_pending!(send_final.poll());
646
647 for _ in 0..10 {
649 let mut f = spawn(async { rx.next().await });
650 let value = assert_ready!(f.poll());
651 assert_eq!(value.allocated_bytes(), msg_size);
652 }
653 let mut recv = spawn(async { rx.next().await });
655 assert_pending!(recv.poll());
656 }
657
658 #[test]
659 fn sender_waits_for_more_capacity_when_none_available() {
660 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
661 let (mut tx, mut rx) = limited(limit, None, None);
662
663 assert_eq!(1, tx.available_capacity());
664
665 let msg1 = Sample::new(42);
666 let msg2 = Sample::new(43);
667
668 let mut send1 = spawn(async { tx.send(msg1).await });
670
671 let mut recv1 = spawn(async { rx.next().await });
672
673 assert!(!send1.is_woken());
675 assert!(!recv1.is_woken());
676
677 assert_pending!(recv1.poll());
679
680 assert_eq!(Ok(()), assert_ready!(send1.poll()));
682 drop(send1);
683
684 assert_eq!(0, tx.available_capacity());
685
686 assert!(recv1.is_woken());
689
690 let mut send2 = spawn(async { tx.send(msg2).await });
692
693 assert!(!send2.is_woken());
694 assert_pending!(send2.poll());
695
696 assert_eq!(Some(Sample::new(42)), assert_ready!(recv1.poll()));
698 drop(recv1);
699
700 assert_eq!(0, rx.available_capacity());
703 assert!(send2.is_woken());
704
705 let mut recv2 = spawn(async { rx.next().await });
706 assert_pending!(recv2.poll());
707
708 assert_eq!(Ok(()), assert_ready!(send2.poll()));
709 drop(send2);
710
711 assert_eq!(0, tx.available_capacity());
712
713 assert!(recv2.is_woken());
715 assert_eq!(Some(Sample::new(43)), assert_ready!(recv2.poll()));
716
717 assert_eq!(1, tx.available_capacity());
718 }
719
720 #[test]
721 fn sender_waits_for_more_capacity_when_partial_available() {
722 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap());
723 let (mut tx, mut rx) = limited(limit, None, None);
724
725 assert_eq!(7, tx.available_capacity());
726
727 let msgs1 = vec![
728 MultiEventRecord::new(1),
729 MultiEventRecord::new(2),
730 MultiEventRecord::new(3),
731 ];
732 let msg2 = MultiEventRecord::new(4);
733
734 let mut small_sends = spawn(async {
736 for msg in msgs1.clone() {
737 tx.send(msg).await?;
738 }
739
740 Ok::<_, SendError<MultiEventRecord>>(())
741 });
742
743 let mut recv1 = spawn(async { rx.next().await });
744
745 assert!(!small_sends.is_woken());
747 assert!(!recv1.is_woken());
748
749 assert_pending!(recv1.poll());
751
752 assert_eq!(Ok(()), assert_ready!(small_sends.poll()));
755 drop(small_sends);
756
757 assert_eq!(1, tx.available_capacity());
758
759 assert!(recv1.is_woken());
762
763 let mut send2 = spawn(tx.send(msg2.clone()));
765
766 assert!(!send2.is_woken());
767 assert_pending!(send2.poll());
768
769 assert_eq!(Some(&msgs1[0]), assert_ready!(recv1.poll()).as_ref());
772 drop(recv1);
773
774 assert_eq!(0, rx.available_capacity());
778
779 assert!(!send2.is_woken());
781
782 let mut recv2 = spawn(async { rx.next().await });
784 assert!(!recv2.is_woken());
785 assert_eq!(Some(&msgs1[1]), assert_ready!(recv2.poll()).as_ref());
786 drop(recv2);
787
788 assert_eq!(0, rx.available_capacity());
789
790 assert!(send2.is_woken());
791 assert_eq!(Ok(()), assert_ready!(send2.poll()));
792
793 let mut recv3 = spawn(async { rx.next().await });
795 assert!(!recv3.is_woken());
796 assert_eq!(Some(&msgs1[2]), assert_ready!(recv3.poll()).as_ref());
797 drop(recv3);
798
799 assert_eq!(3, rx.available_capacity());
800
801 let mut recv4 = spawn(async { rx.next().await });
802 assert!(!recv4.is_woken());
803 assert_eq!(Some(msg2), assert_ready!(recv4.poll()));
804 drop(recv4);
805
806 assert_eq!(7, rx.available_capacity());
807 }
808
809 #[test]
810 fn empty_receiver_returns_none_when_last_sender_drops() {
811 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
812 let (mut tx, mut rx) = limited(limit, None, None);
813
814 assert_eq!(1, tx.available_capacity());
815
816 let tx2 = tx.clone();
817 let msg = Sample::new(42);
818
819 let mut send = spawn(async { tx.send(msg).await });
821
822 let mut recv = spawn(async { rx.next().await });
823
824 assert!(!send.is_woken());
826 assert!(!recv.is_woken());
827
828 assert_pending!(recv.poll());
830
831 drop(tx2);
833 assert!(!recv.is_woken());
834 assert_pending!(recv.poll());
835
836 assert_eq!(Ok(()), assert_ready!(send.poll()));
840 drop(send);
841 drop(tx);
842
843 assert!(recv.is_woken());
844 assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
845 drop(recv);
846
847 let mut recv2 = spawn(async { rx.next().await });
848 assert!(!recv2.is_woken());
849 assert_eq!(None, assert_ready!(recv2.poll()));
850 }
851
852 #[test]
853 fn receiver_returns_none_once_empty_when_last_sender_drops() {
854 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
855 let (tx, mut rx) = limited::<Sample>(limit, None, None);
856
857 assert_eq!(1, tx.available_capacity());
858
859 let tx2 = tx.clone();
860
861 let mut recv = spawn(async { rx.next().await });
863
864 assert!(!recv.is_woken());
866
867 assert_pending!(recv.poll());
869
870 drop(tx);
872 assert!(!recv.is_woken());
873 assert_pending!(recv.poll());
874
875 drop(tx2);
878 assert!(recv.is_woken());
879 assert_eq!(None, assert_ready!(recv.poll()));
880 }
881
882 #[test]
883 fn oversized_send_allowed_when_empty() {
884 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
885 let (mut tx, mut rx) = limited(limit, None, None);
886
887 assert_eq!(1, tx.available_capacity());
888
889 let msg = MultiEventRecord::new(2);
890
891 let mut send = spawn(async { tx.send(msg.clone()).await });
893
894 let mut recv = spawn(async { rx.next().await });
895
896 assert!(!send.is_woken());
898 assert!(!recv.is_woken());
899
900 assert_eq!(Ok(()), assert_ready!(send.poll()));
903 drop(send);
904
905 assert_eq!(0, tx.available_capacity());
906
907 assert_eq!(Some(msg), assert_ready!(recv.poll()));
910 drop(recv);
911
912 assert_eq!(1, rx.available_capacity());
913 }
914
915 #[test]
916 fn oversized_send_allowed_when_partial_capacity() {
917 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
918 let (mut tx, mut rx) = limited(limit, None, None);
919
920 assert_eq!(2, tx.available_capacity());
921
922 let msg1 = MultiEventRecord::new(1);
923 let msg2 = MultiEventRecord::new(3);
924
925 let mut send = spawn(async { tx.send(msg1.clone()).await });
927
928 assert!(!send.is_woken());
930
931 assert_eq!(Ok(()), assert_ready!(send.poll()));
933 drop(send);
934
935 assert_eq!(1, tx.available_capacity());
936
937 let mut send2 = spawn(async { tx.send(msg2.clone()).await });
940
941 assert!(!send2.is_woken());
942 assert_pending!(send2.poll());
943
944 assert_eq!(0, rx.available_capacity());
945
946 let mut recv = spawn(async { rx.next().await });
949 assert!(!recv.is_woken());
950 assert!(!send2.is_woken());
951
952 assert_eq!(Some(msg1), assert_ready!(recv.poll()));
953 drop(recv);
954
955 assert_eq!(0, rx.available_capacity());
956
957 assert_eq!(Ok(()), assert_ready!(send2.poll()));
960 drop(send2);
961
962 assert_eq!(0, tx.available_capacity());
963
964 let mut recv2 = spawn(async { rx.next().await });
965 assert_eq!(Some(msg2), assert_ready!(recv2.poll()));
966
967 assert_eq!(2, tx.available_capacity());
968 }
969
970 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
971 async fn concurrent_send_receive_metrics_remain_valid() {
972 const ITEM_COUNT: usize = 4_000;
973
974 for size in 1..=100 {
976 let limit = NonZeroUsize::new(size * 10).unwrap();
977 let (tx, rx) = limited(
978 MemoryBufferSize::MaxEvents(limit),
979 Some(ChannelMetricMetadata::new(BufferChannelKind::Source, None)),
980 None,
981 );
982 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
983
984 let sender = tokio::spawn(send_samples(tx, ITEM_COUNT));
985 let receiver = tokio::spawn(receive_samples(rx, ITEM_COUNT));
986
987 sender.await.expect("sender task should not panic");
988 receiver.await.expect("receiver task should not panic");
989
990 let recorded = metrics.lock().unwrap().clone();
991 assert_eq!(
992 recorded.len(),
993 ITEM_COUNT * 2,
994 "expected one metric update per send and per receive"
995 );
996
997 let max_allowed = limit.get();
1000 let observed_max = recorded.iter().copied().max().unwrap_or_default();
1001 assert!(
1002 recorded.iter().all(|value| *value <= max_allowed),
1003 "observed utilization value above valid bound: max={observed_max}, allowed={max_allowed}"
1004 );
1005 }
1006 }
1007
1008 async fn send_samples(mut tx: LimitedSender<Sample>, item_count: usize) {
1009 let mut rng = SmallRng::from_rng(&mut rand::rng());
1010
1011 for i in 0..item_count {
1012 tx.send(Sample::new(i as u64))
1013 .await
1014 .expect("send should succeed");
1015 if rng.random::<u8>() % 8 == 0 {
1016 tokio::task::yield_now().await;
1017 }
1018 }
1019 }
1020
1021 async fn receive_samples(mut rx: LimitedReceiver<Sample>, item_count: usize) {
1022 let mut rng = SmallRng::from_rng(&mut rand::rng());
1023
1024 for i in 0..item_count {
1025 let next = rx
1026 .next()
1027 .await
1028 .expect("receiver should yield all sent items");
1029 assert_eq!(Sample::new(i as u64), next);
1030 if rng.random::<u8>() % 8 == 0 {
1031 tokio::task::yield_now().await;
1032 }
1033 }
1034 }
1035}