vector_core/
fanout.rs

1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{config::ComponentKey, event::EventArray};
11
12pub enum ControlMessage {
13    /// Adds a new sink to the fanout.
14    Add(ComponentKey, BufferSender<EventArray>),
15
16    /// Removes a sink from the fanout.
17    Remove(ComponentKey),
18
19    /// Pauses a sink in the fanout.
20    ///
21    /// If a fanout has any paused sinks, subsequent sends cannot proceed until all paused sinks
22    /// have been replaced.
23    Pause(ComponentKey),
24
25    /// Replaces a paused sink with its new sender.
26    Replace(ComponentKey, BufferSender<EventArray>),
27}
28
29impl fmt::Debug for ControlMessage {
30    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
31        write!(f, "ControlMessage::")?;
32        match self {
33            Self::Add(id, _) => write!(f, "Add({id:?})"),
34            Self::Remove(id) => write!(f, "Remove({id:?})"),
35            Self::Pause(id) => write!(f, "Pause({id:?})"),
36            Self::Replace(id, _) => write!(f, "Replace({id:?})"),
37        }
38    }
39}
40
41// TODO: We should really wrap this in a custom type that has dedicated methods for each operation
42// so that high-lever components don't need to do the raw channel sends, etc.
43pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
44
45pub struct Fanout {
46    senders: IndexMap<ComponentKey, Option<Sender>>,
47    control_channel: mpsc::UnboundedReceiver<ControlMessage>,
48}
49
50impl Fanout {
51    pub fn new() -> (Self, ControlChannel) {
52        let (control_tx, control_rx) = mpsc::unbounded_channel();
53
54        let fanout = Self {
55            senders: Default::default(),
56            control_channel: control_rx,
57        };
58
59        (fanout, control_tx)
60    }
61
62    /// Add a new sink as an output.
63    ///
64    /// # Panics
65    ///
66    /// Function will panic if a sink with the same ID is already present.
67    pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
68        assert!(
69            !self.senders.contains_key(&id),
70            "Adding duplicate output id to fanout: {id}"
71        );
72        self.senders.insert(id, Some(Sender::new(sink)));
73    }
74
75    fn remove(&mut self, id: &ComponentKey) {
76        assert!(
77            self.senders.shift_remove(id).is_some(),
78            "Removing nonexistent sink from fanout: {id}"
79        );
80    }
81
82    fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
83        match self.senders.get_mut(id) {
84            Some(sender) => {
85                // While a sink must be _known_ to be replaced, it must also be empty (previously
86                // paused or consumed when the `SendGroup` was created), otherwise an invalid
87                // sequence of control operations has been applied.
88                assert!(
89                    sender.replace(Sender::new(sink)).is_none(),
90                    "Replacing existing sink is not valid: {id}"
91                );
92            }
93            None => panic!("Replacing unknown sink from fanout: {id}"),
94        }
95    }
96
97    fn pause(&mut self, id: &ComponentKey) {
98        match self.senders.get_mut(id) {
99            Some(sender) => {
100                // A sink must be known and present to be replaced, otherwise an invalid sequence of
101                // control operations has been applied.
102                assert!(
103                    sender.take().is_some(),
104                    "Pausing nonexistent sink is not valid: {id}"
105                );
106            }
107            None => panic!("Pausing unknown sink from fanout: {id}"),
108        }
109    }
110
111    /// Waits for the next control message and applies it.
112    ///
113    /// Returns `true` if a message was processed, `false` if the control
114    /// channel was closed.
115    pub async fn recv_control_message(&mut self) -> bool {
116        match self.control_channel.recv().await {
117            Some(msg) => {
118                self.apply_control_message(msg);
119                true
120            }
121            None => false,
122        }
123    }
124
125    /// Apply a control message directly against this instance.
126    ///
127    /// This method should not be used if there is an active `SendGroup` being processed.
128    fn apply_control_message(&mut self, message: ControlMessage) {
129        trace!("Processing control message outside of send: {:?}", message);
130
131        match message {
132            ControlMessage::Add(id, sink) => self.add(id, sink),
133            ControlMessage::Remove(id) => self.remove(&id),
134            ControlMessage::Pause(id) => self.pause(&id),
135            ControlMessage::Replace(id, sink) => self.replace(&id, sink),
136        }
137    }
138
139    /// Waits for all paused sinks to be replaced.
140    ///
141    /// Control messages are processed until all senders have been replaced, so it is guaranteed
142    /// that when this method returns, all senders are ready for the next send to be triggered.
143    async fn wait_for_replacements(&mut self) {
144        while self.senders.values().any(Option::is_none) {
145            if let Some(msg) = self.control_channel.recv().await {
146                self.apply_control_message(msg);
147            } else {
148                // If the control channel is closed, there's nothing else we can do.
149
150                // TODO: It _seems_ like we should probably panic here, or at least return.
151                //
152                // Essentially, we should only land here if the control channel is closed but we
153                // haven't yet replaced all of the paused sinks... and we shouldn't have any paused
154                // sinks if Vector is stopping normally/gracefully, so like... we'd only get
155                // here during a configuration reload where we panicked in another thread due to
156                // an error of some sort, and the control channel got dropped, closed itself, and
157                // we're never going to be able to recover.
158                //
159                // The flipside is that by leaving it as-is, in the above hypothesized scenario,
160                // we'd avoid emitting additional panics/error logging when the root cause error was
161                // already doing so, like there's little value in knowing the fanout also hit an
162                // unrecoverable state if the whole process is about to come crashing down
163                // anyways... but it still does feel weird to have that encoded here by virtue of
164                // only a comment, and not an actual terminating expression. *shrug*
165            }
166        }
167    }
168
169    /// Send a stream of events to all connected sinks.
170    ///
171    /// This function will send events until the provided stream finishes. It will also block on the
172    /// resolution of any pending reload before proceeding with a send operation, similar to `send`.
173    ///
174    /// # Panics
175    ///
176    /// This method can panic if the fanout receives a control message that violates some invariant
177    /// about its current state (e.g. remove a nonexistent sink, etc.). This would imply a bug in
178    /// Vector's config reloading logic.
179    ///
180    /// # Errors
181    ///
182    /// If an error occurs while sending events to any of the connected sinks, an error variant will be
183    /// returned detailing the cause.
184    pub async fn send_stream(
185        &mut self,
186        events: impl Stream<Item = (EventArray, Instant)>,
187    ) -> crate::Result<()> {
188        tokio::pin!(events);
189        while let Some((event_array, send_reference)) = events.next().await {
190            self.send(event_array, Some(send_reference)).await?;
191        }
192        Ok(())
193    }
194
195    /// Send a batch of events to all connected sinks.
196    ///
197    /// This will block on the resolution of any pending reload before proceeding with the send
198    /// operation.
199    ///
200    /// # Panics
201    ///
202    /// This method can panic if the fanout receives a control message that violates some invariant
203    /// about its current state (e.g. remove a nonexistent sink, etc). This would imply a bug in
204    /// Vector's config reloading logic.
205    ///
206    /// # Errors
207    ///
208    /// If an error occurs while sending events to any of the connected sinks, an error variant will be
209    /// returned detailing the cause.
210    pub async fn send(
211        &mut self,
212        events: EventArray,
213        send_reference: Option<Instant>,
214    ) -> crate::Result<()> {
215        // First, process any available control messages in a non-blocking fashion.
216        while let Ok(message) = self.control_channel.try_recv() {
217            self.apply_control_message(message);
218        }
219
220        // Wait for any senders that are paused to be replaced first before continuing with the send.
221        self.wait_for_replacements().await;
222
223        // Nothing to send if we have no sender.
224        if self.senders.is_empty() {
225            trace!("No senders present.");
226            return Ok(());
227        }
228
229        // Keep track of whether the control channel has returned `Ready(None)`, and stop polling
230        // it once it has. If we don't do this check, it will continue to return `Ready(None)` any
231        // time it is polled, which can lead to a busy loop below.
232        //
233        // In real life this is likely a non-issue, but it can lead to strange behavior in tests if
234        // left unhandled.
235        let mut control_channel_open = true;
236
237        // Create our send group which arms all senders to send the given events, and handles
238        // adding/removing/replacing senders while the send is in-flight.
239        let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
240
241        loop {
242            tokio::select! {
243                // Semantically, it's not hugely important that this select is biased. It does,
244                // however, make testing simpler when you can count on control messages being
245                // processed first.
246                biased;
247
248                maybe_msg = self.control_channel.recv(), if control_channel_open => {
249                    trace!("Processing control message inside of send: {:?}", maybe_msg);
250
251                    // During a send operation, control messages must be applied via the
252                    // `SendGroup`, since it has exclusive access to the senders.
253                    match maybe_msg {
254                        Some(ControlMessage::Add(id, sink)) => {
255                            send_group.add(id, sink);
256                        },
257                        Some(ControlMessage::Remove(id)) => {
258                            send_group.remove(&id);
259                        },
260                        Some(ControlMessage::Pause(id)) => {
261                            send_group.pause(&id);
262                        },
263                        Some(ControlMessage::Replace(id, sink)) => {
264                            send_group.replace(&id, Sender::new(sink));
265                        },
266                        None => {
267                            // Control channel is closed, which means Vector is shutting down.
268                            control_channel_open = false;
269                        }
270                    }
271                }
272
273                result = send_group.send() => match result {
274                    Ok(()) => {
275                        trace!("Sent item to fanout.");
276                        break;
277                    },
278                    Err(e) => return Err(e),
279                }
280            }
281        }
282
283        Ok(())
284    }
285}
286
287struct SendGroup<'a> {
288    senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
289    sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
290}
291
292impl<'a> SendGroup<'a> {
293    fn new(
294        senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
295        events: EventArray,
296        send_reference: Option<Instant>,
297    ) -> Self {
298        // If we don't have a valid `Sender` for all sinks, then something went wrong in our logic
299        // to ensure we were starting with all valid/idle senders prior to initiating the send.
300        debug_assert!(senders.values().all(Option::is_some));
301
302        let last_sender_idx = senders.len().saturating_sub(1);
303        let mut events = Some(events);
304
305        // We generate a send future for each sender we have, which arms them with the events to
306        // send but also takes ownership of the sender itself, which we give back when the sender completes.
307        let mut sends = HashMap::new();
308        for (i, (key, sender)) in senders.iter_mut().enumerate() {
309            let mut sender = sender
310                .take()
311                .expect("sender must be present to initialize SendGroup");
312
313            // First, arm each sender with the item to actually send.
314            if i == last_sender_idx {
315                sender.input = events.take();
316            } else {
317                sender.input.clone_from(&events);
318            }
319            sender.send_reference = send_reference;
320
321            // Now generate a send for that sender which we'll drive to completion.
322            let send = async move {
323                sender.flush().await?;
324                Ok(sender)
325            };
326
327            sends.insert(key.clone(), ReusableBoxFuture::new(send));
328        }
329
330        Self { senders, sends }
331    }
332
333    fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
334        if let Some(send) = self.sends.remove(id) {
335            tokio::spawn(async move {
336                if let Err(e) = send.await {
337                    warn!(
338                        cause = %e,
339                        message = "Encountered error writing to component after detaching from topology.",
340                    );
341                }
342            });
343            true
344        } else {
345            false
346        }
347    }
348
349    #[allow(clippy::needless_pass_by_value)]
350    fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
351        // When we're in the middle of a send, we can only keep track of the new sink, but can't
352        // actually send to it, as we don't have the item to send... so only add it to `senders`.
353        assert!(
354            self.senders
355                .insert(id.clone(), Some(Sender::new(sink)))
356                .is_none(),
357            "Adding duplicate output id to fanout: {id}"
358        );
359    }
360
361    fn remove(&mut self, id: &ComponentKey) {
362        // We may or may not be removing a sender that we're try to drive a send against, so we have
363        // to also detach the send future for the sender if it exists, otherwise we'd be hanging
364        // around still trying to send to it.
365        assert!(
366            self.senders.shift_remove(id).is_some(),
367            "Removing nonexistent sink from fanout: {id}"
368        );
369
370        // Now try and detach the in-flight send, if it exists.
371        //
372        // We don't ensure that a send was or wasn't detached because this could be called either
373        // during an in-flight send _or_ after the send has completed.
374        self.try_detach_send(id);
375    }
376
377    fn replace(&mut self, id: &ComponentKey, sink: Sender) {
378        match self.senders.get_mut(id) {
379            Some(sender) => {
380                // While a sink must be _known_ to be replaced, it must also be empty (previously
381                // paused or consumed when the `SendGroup` was created), otherwise an invalid
382                // sequence of control operations has been applied.
383                assert!(
384                    sender.replace(sink).is_none(),
385                    "Replacing existing sink is not valid: {id}"
386                );
387            }
388            None => panic!("Replacing unknown sink from fanout: {id}"),
389        }
390    }
391
392    fn pause(&mut self, id: &ComponentKey) {
393        match self.senders.get_mut(id) {
394            Some(sender) => {
395                // If we don't currently own the `Sender` for the given component, that implies
396                // there is an in-flight send: a `SendGroup` cannot be created without all
397                // participating components having a send operation triggered.
398                //
399                // As such, `try_detach_send` should always succeed here, as pausing only occurs
400                // when a component is being _replaced_, and should not be called multiple times.
401                if sender.take().is_none() {
402                    assert!(
403                        self.try_detach_send(id),
404                        "Pausing already-paused sink is invalid: {id}"
405                    );
406                }
407            }
408            None => panic!("Pausing unknown sink from fanout: {id}"),
409        }
410    }
411
412    async fn send(&mut self) -> crate::Result<()> {
413        // Right now, we do a linear scan of all sends, polling each send once in order to avoid
414        // waiting forever, such that we can let our control messages get picked up while sends are
415        // waiting.
416        loop {
417            if self.sends.is_empty() {
418                break;
419            }
420
421            let mut done = Vec::new();
422            for (key, send) in &mut self.sends {
423                if let Poll::Ready(result) = poll!(send.get_pin()) {
424                    let sender = result?;
425
426                    // The send completed, so we restore the sender and mark ourselves so that this
427                    // future gets dropped.
428                    done.push((key.clone(), sender));
429                }
430            }
431
432            for (key, sender) in done {
433                self.sends.remove(&key);
434                self.replace(&key, sender);
435            }
436
437            if !self.sends.is_empty() {
438                // We manually yield ourselves because we've polled all of the sends at this point,
439                // so if any are left, then we're scheduled for a wake-up... this is a really poor
440                // approximation of what `FuturesUnordered` is doing.
441                pending!();
442            }
443        }
444
445        Ok(())
446    }
447}
448
449struct Sender {
450    inner: BufferSender<EventArray>,
451    input: Option<EventArray>,
452    send_reference: Option<Instant>,
453}
454
455impl Sender {
456    fn new(inner: BufferSender<EventArray>) -> Self {
457        Self {
458            inner,
459            input: None,
460            send_reference: None,
461        }
462    }
463
464    async fn flush(&mut self) -> crate::Result<()> {
465        let send_reference = self.send_reference.take();
466        if let Some(input) = self.input.take() {
467            self.inner.send(input, send_reference).await?;
468            self.inner.flush().await?;
469        }
470
471        Ok(())
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use std::{mem, num::NonZeroUsize};
478
479    use futures::poll;
480    use tokio::sync::mpsc::UnboundedSender;
481    use tokio_test::{assert_pending, assert_ready, task::spawn};
482    use tracing::Span;
483    use vector_buffers::{
484        WhenFull,
485        topology::{
486            builder::TopologyBuilder,
487            channel::{BufferReceiver, BufferSender},
488        },
489    };
490    use vrl::value::Value;
491
492    use super::{ControlMessage, Fanout};
493    use crate::{
494        config::ComponentKey,
495        event::{Event, EventArray, EventContainer, LogEvent},
496        test_util::{collect_ready, collect_ready_events},
497    };
498
499    fn build_sender_pair(
500        capacity: usize,
501    ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
502        TopologyBuilder::standalone_memory(
503            NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
504            WhenFull::Block,
505            &Span::current(),
506            None,
507            None,
508        )
509    }
510
511    fn build_sender_pairs(
512        capacities: &[usize],
513    ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
514        let mut pairs = Vec::new();
515        for capacity in capacities {
516            pairs.push(build_sender_pair(*capacity));
517        }
518        pairs
519    }
520
521    fn fanout_from_senders(
522        capacities: &[usize],
523    ) -> (
524        Fanout,
525        UnboundedSender<ControlMessage>,
526        Vec<BufferReceiver<EventArray>>,
527    ) {
528        let (mut fanout, control) = Fanout::new();
529        let pairs = build_sender_pairs(capacities);
530
531        let mut receivers = Vec::new();
532        for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
533            fanout.add(ComponentKey::from(i.to_string()), sender);
534            receivers.push(receiver);
535        }
536
537        (fanout, control, receivers)
538    }
539
540    fn add_sender_to_fanout(
541        fanout: &mut Fanout,
542        receivers: &mut Vec<BufferReceiver<EventArray>>,
543        sender_id: usize,
544        capacity: usize,
545    ) {
546        let (sender, receiver) = build_sender_pair(capacity);
547        receivers.push(receiver);
548
549        fanout.add(ComponentKey::from(sender_id.to_string()), sender);
550    }
551
552    fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
553        control
554            .send(ControlMessage::Remove(ComponentKey::from(
555                sender_id.to_string(),
556            )))
557            .expect("sending control message should not fail");
558    }
559
560    fn replace_sender_in_fanout(
561        control: &UnboundedSender<ControlMessage>,
562        receivers: &mut [BufferReceiver<EventArray>],
563        sender_id: usize,
564        capacity: usize,
565    ) -> BufferReceiver<EventArray> {
566        let (sender, receiver) = build_sender_pair(capacity);
567        let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
568
569        control
570            .send(ControlMessage::Pause(ComponentKey::from(
571                sender_id.to_string(),
572            )))
573            .expect("sending control message should not fail");
574
575        control
576            .send(ControlMessage::Replace(
577                ComponentKey::from(sender_id.to_string()),
578                sender,
579            ))
580            .expect("sending control message should not fail");
581
582        old_receiver
583    }
584
585    fn start_sender_replace(
586        control: &UnboundedSender<ControlMessage>,
587        receivers: &mut [BufferReceiver<EventArray>],
588        sender_id: usize,
589        capacity: usize,
590    ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
591        let (sender, receiver) = build_sender_pair(capacity);
592        let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
593
594        control
595            .send(ControlMessage::Pause(ComponentKey::from(
596                sender_id.to_string(),
597            )))
598            .expect("sending control message should not fail");
599
600        (old_receiver, sender)
601    }
602
603    fn finish_sender_resume(
604        control: &UnboundedSender<ControlMessage>,
605        sender_id: usize,
606        sender: BufferSender<EventArray>,
607    ) {
608        control
609            .send(ControlMessage::Replace(
610                ComponentKey::from(sender_id.to_string()),
611                sender,
612            ))
613            .expect("sending control message should not fail");
614    }
615
616    fn unwrap_log_event_message<E>(event: E) -> String
617    where
618        E: EventContainer,
619    {
620        let event = event
621            .into_events()
622            .next()
623            .expect("must have at least one event");
624        let event = event.into_log();
625        event
626            .get("message")
627            .and_then(Value::as_bytes)
628            .and_then(|b| String::from_utf8(b.to_vec()).ok())
629            .expect("must be valid log event with `message` field")
630    }
631
632    #[tokio::test]
633    async fn fanout_writes_to_all() {
634        let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]);
635        let events = make_event_array(2);
636
637        let clones = events.clone();
638        fanout.send(clones, None).await.expect("should not fail");
639
640        for receiver in receivers {
641            assert_eq!(
642                collect_ready(receiver.into_stream()),
643                std::slice::from_ref(&events)
644            );
645        }
646    }
647
648    #[tokio::test]
649    async fn fanout_notready() {
650        let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]);
651        let events = make_events(2);
652
653        // First send should immediately complete because all senders have capacity:
654        let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
655        assert_ready!(first_send.poll()).expect("should not fail");
656        drop(first_send);
657
658        // Second send should return pending because sender B is now full:
659        let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
660        assert_pending!(second_send.poll());
661
662        // Now read an item from each receiver to free up capacity for the second sender:
663        for receiver in &mut receivers {
664            assert_eq!(Some(events[0].clone().into()), receiver.next().await);
665        }
666
667        // Now our second send should actually be able to complete:
668        assert_ready!(second_send.poll()).expect("should not fail");
669        drop(second_send);
670
671        // And make sure the second item comes through:
672        for receiver in &mut receivers {
673            assert_eq!(Some(events[1].clone().into()), receiver.next().await);
674        }
675    }
676
677    #[tokio::test]
678    async fn fanout_grow() {
679        let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]);
680        let events = make_events(3);
681
682        // Send in the first two events to our initial two senders:
683        fanout
684            .send(events[0].clone().into(), None)
685            .await
686            .expect("should not fail");
687        fanout
688            .send(events[1].clone().into(), None)
689            .await
690            .expect("should not fail");
691
692        // Now add a third sender:
693        add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4);
694
695        // Send in the last event which all three senders will now get:
696        fanout
697            .send(events[2].clone().into(), None)
698            .await
699            .expect("should not fail");
700
701        // Make sure the first two senders got all three events, but the third sender only got the
702        // last event:
703        let expected_events = [&events, &events, &events[2..]];
704        for (i, receiver) in receivers.into_iter().enumerate() {
705            assert_eq!(
706                collect_ready_events(receiver.into_stream()),
707                expected_events[i]
708            );
709        }
710    }
711
712    #[tokio::test]
713    async fn fanout_shrink() {
714        let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]);
715        let events = make_events(3);
716
717        // Send in the first two events to our initial two senders:
718        fanout
719            .send(events[0].clone().into(), None)
720            .await
721            .expect("should not fail");
722        fanout
723            .send(events[1].clone().into(), None)
724            .await
725            .expect("should not fail");
726
727        // Now remove the second sender:
728        remove_sender_from_fanout(&control, 1);
729
730        // Send in the last event which only the first sender will get:
731        fanout
732            .send(events[2].clone().into(), None)
733            .await
734            .expect("should not fail");
735
736        // Make sure the first sender got all three events, but the second sender only got the first two:
737        let expected_events = [&events, &events[..2]];
738        for (i, receiver) in receivers.into_iter().enumerate() {
739            assert_eq!(
740                collect_ready_events(receiver.into_stream()),
741                expected_events[i]
742            );
743        }
744    }
745
746    #[tokio::test]
747    async fn fanout_shrink_when_notready() {
748        // This test exercises that when we're waiting for a send to complete, we can correctly
749        // remove a sink whether or not it is the one that the send operation is still waiting on.
750        //
751        // This means that if we remove a sink that a current send is blocked on, we should be able
752        // to immediately proceed.
753        let events = make_events(2);
754        let expected_first_event = unwrap_log_event_message(events[0].clone());
755        let expected_second_event = unwrap_log_event_message(events[1].clone());
756
757        let cases = [
758            // Sender ID to drop, whether the second send should succeed after dropping, and the
759            // final "last event" a receiver should see after the second send:
760            (
761                0,
762                false,
763                [
764                    expected_second_event.clone(),
765                    expected_first_event.clone(),
766                    expected_second_event.clone(),
767                ],
768            ),
769            (
770                1,
771                true,
772                [
773                    expected_second_event.clone(),
774                    expected_second_event.clone(),
775                    expected_second_event.clone(),
776                ],
777            ),
778            (
779                2,
780                false,
781                [
782                    expected_second_event.clone(),
783                    expected_first_event.clone(),
784                    expected_second_event.clone(),
785                ],
786            ),
787        ];
788
789        for (sender_id, should_complete, expected_last_seen) in cases {
790            let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]);
791
792            // First send should immediately complete because all senders have capacity:
793            let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
794            assert_ready!(first_send.poll()).expect("should not fail");
795            drop(first_send);
796
797            // Second send should return pending because sender B is now full:
798            let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
799            assert_pending!(second_send.poll());
800
801            // Now drop our chosen sender and assert that polling the second send behaves as expected:
802            remove_sender_from_fanout(&control, sender_id);
803
804            if should_complete {
805                assert_ready!(second_send.poll()).expect("should not fail");
806            } else {
807                assert_pending!(second_send.poll());
808            }
809            drop(second_send);
810
811            // Now grab the last value available to each receiver and assert it's the second event.
812            drop(fanout);
813
814            let mut last_seen = Vec::new();
815            for receiver in &mut receivers {
816                let mut events = Vec::new();
817                while let Some(event) = receiver.next().await {
818                    events.insert(0, event);
819                }
820
821                last_seen.push(unwrap_log_event_message(events.remove(0)));
822            }
823
824            assert_eq!(&expected_last_seen[..], &last_seen);
825        }
826    }
827
828    #[tokio::test]
829    async fn fanout_no_sinks() {
830        let (mut fanout, _) = Fanout::new();
831        let events = make_events(2);
832
833        fanout
834            .send(events[0].clone().into(), None)
835            .await
836            .expect("should not fail");
837        fanout
838            .send(events[1].clone().into(), None)
839            .await
840            .expect("should not fail");
841    }
842
843    #[tokio::test]
844    async fn fanout_replace() {
845        let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]);
846        let events = make_events(3);
847
848        // First two sends should immediately complete because all senders have capacity:
849        fanout
850            .send(events[0].clone().into(), None)
851            .await
852            .expect("should not fail");
853        fanout
854            .send(events[1].clone().into(), None)
855            .await
856            .expect("should not fail");
857
858        // Replace the first sender with a brand new one before polling again:
859        let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4);
860
861        // And do the third send which should also complete since all senders still have capacity:
862        fanout
863            .send(events[2].clone().into(), None)
864            .await
865            .expect("should not fail");
866
867        // Now make sure that the new "first" sender only got the third event, but that the second and
868        // third sender got all three events:
869        let expected_events = [&events[2..], &events, &events];
870        for (i, receiver) in receivers.into_iter().enumerate() {
871            assert_eq!(
872                collect_ready_events(receiver.into_stream()),
873                expected_events[i]
874            );
875        }
876
877        // And make sure our original "first" sender got the first two events:
878        assert_eq!(
879            collect_ready_events(old_first_receiver.into_stream()),
880            &events[..2]
881        );
882    }
883
884    #[tokio::test]
885    async fn fanout_wait() {
886        let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]);
887        let events = make_events(3);
888
889        // First two sends should immediately complete because all senders have capacity:
890        let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
891        assert_ready!(poll!(send1)).expect("should not fail");
892        let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
893        assert_ready!(poll!(send2)).expect("should not fail");
894
895        // Now do an empty replace on the second sender, which we'll test to make sure that `Fanout`
896        // doesn't let any writes through until we replace it properly.  We get back the receiver
897        // we've replaced, but also the sender that we want to eventually install:
898        let (old_first_receiver, new_first_sender) =
899            start_sender_replace(&control, &mut receivers, 0, 4);
900
901        // Third send should return pending because now we have an in-flight replacement:
902        let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
903        assert_pending!(third_send.poll());
904
905        // Finish our sender replacement, which should wake up the third send and allow it to
906        // actually complete:
907        finish_sender_resume(&control, 0, new_first_sender);
908        assert!(third_send.is_woken());
909        assert_ready!(third_send.poll()).expect("should not fail");
910
911        // Make sure the original first sender got the first two events, the new first sender got
912        // the last event, and the second sender got all three:
913        assert_eq!(
914            collect_ready_events(old_first_receiver.into_stream()),
915            &events[0..2]
916        );
917
918        let expected_events = [&events[2..], &events];
919        for (i, receiver) in receivers.into_iter().enumerate() {
920            assert_eq!(
921                collect_ready_events(receiver.into_stream()),
922                expected_events[i]
923            );
924        }
925    }
926
927    fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
928        (0..count).map(|i| LogEvent::from(format!("line {i}")))
929    }
930
931    fn make_events(count: usize) -> Vec<Event> {
932        make_events_inner(count).map(Into::into).collect()
933    }
934
935    fn make_event_array(count: usize) -> EventArray {
936        make_events_inner(count).collect::<Vec<_>>().into()
937    }
938}