codecs/decoding/framing/
chunked_gelf.rs

1use std::{
2    any::Any,
3    collections::HashMap,
4    io::Read,
5    sync::{Arc, Mutex},
6    time::Duration,
7};
8
9use bytes::{Buf, Bytes, BytesMut};
10use derivative::Derivative;
11use flate2::read::{MultiGzDecoder, ZlibDecoder};
12use snafu::{ResultExt, Snafu, ensure};
13use tokio::{self, task::JoinHandle};
14use tokio_util::codec::Decoder;
15use tracing::{debug, trace, warn};
16use vector_common::constants::{GZIP_MAGIC, ZLIB_MAGIC};
17use vector_config::configurable_component;
18
19use super::{BoxedFramingError, FramingError};
20use crate::{BytesDecoder, StreamDecodingError};
21
22const GELF_MAGIC: &[u8] = &[0x1e, 0x0f];
23const GELF_MAX_TOTAL_CHUNKS: u8 = 128;
24const DEFAULT_TIMEOUT_SECS: f64 = 5.0;
25
26const fn default_timeout_secs() -> f64 {
27    DEFAULT_TIMEOUT_SECS
28}
29
30/// Config used to build a `ChunkedGelfDecoder`.
31#[configurable_component]
32#[derive(Debug, Clone, Default)]
33pub struct ChunkedGelfDecoderConfig {
34    /// Options for the chunked GELF decoder.
35    #[serde(default)]
36    pub chunked_gelf: ChunkedGelfDecoderOptions,
37}
38
39impl ChunkedGelfDecoderConfig {
40    /// Build the `ChunkedGelfDecoder` from this configuration.
41    pub fn build(&self) -> ChunkedGelfDecoder {
42        ChunkedGelfDecoder::new(
43            self.chunked_gelf.timeout_secs,
44            self.chunked_gelf.pending_messages_limit,
45            self.chunked_gelf.max_length,
46            self.chunked_gelf.decompression,
47        )
48    }
49}
50
51/// Options for building a `ChunkedGelfDecoder`.
52#[configurable_component]
53#[derive(Clone, Debug, Derivative)]
54#[derivative(Default)]
55pub struct ChunkedGelfDecoderOptions {
56    /// The timeout, in seconds, for a message to be fully received. If the timeout is reached, the
57    /// decoder drops all received chunks for the timed-out message.
58    #[serde(default = "default_timeout_secs")]
59    #[derivative(Default(value = "default_timeout_secs()"))]
60    pub timeout_secs: f64,
61
62    /// The maximum number of pending incomplete messages. If this limit is reached, the decoder starts
63    /// dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded.
64    /// If this option is not set, the decoder does not limit the number of pending messages and the memory usage
65    /// of its messages buffer can grow unbounded. This matches Graylog Server's behavior.
66    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
67    pub pending_messages_limit: Option<usize>,
68
69    /// The maximum length of a single GELF message, in bytes. Messages longer than this length are
70    /// dropped. If this option is not set, the decoder does not limit the length of messages and
71    /// the per-message memory is unbounded.
72    ///
73    /// **Note**: A message can be composed of multiple chunks, and this limit applies to the whole
74    /// message, not to individual chunks.
75    ///
76    /// This limit takes into account only the message payload. GELF header bytes are excluded from the calculation.
77    /// The message payload is the concatenation of all chunk payloads.
78    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
79    pub max_length: Option<usize>,
80
81    /// Decompression configuration for GELF messages.
82    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
83    pub decompression: ChunkedGelfDecompressionConfig,
84}
85
86/// Decompression options for ChunkedGelfDecoder.
87#[configurable_component]
88#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
89pub enum ChunkedGelfDecompressionConfig {
90    /// Automatically detect the decompression method based on the magic bytes of the message.
91    #[default]
92    Auto,
93    /// Use Gzip decompression.
94    Gzip,
95    /// Use Zlib decompression.
96    Zlib,
97    /// Do not decompress the message.
98    None,
99}
100
101impl ChunkedGelfDecompressionConfig {
102    pub fn get_decompression(&self, data: &Bytes) -> ChunkedGelfDecompression {
103        match self {
104            Self::Auto => ChunkedGelfDecompression::from_magic(data),
105            Self::Gzip => ChunkedGelfDecompression::Gzip,
106            Self::Zlib => ChunkedGelfDecompression::Zlib,
107            Self::None => ChunkedGelfDecompression::None,
108        }
109    }
110}
111
112#[derive(Debug)]
113struct MessageState {
114    total_chunks: u8,
115    chunks: [Bytes; GELF_MAX_TOTAL_CHUNKS as usize],
116    chunks_bitmap: u128,
117    current_length: usize,
118    timeout_task: JoinHandle<()>,
119}
120
121impl MessageState {
122    pub const fn new(total_chunks: u8, timeout_task: JoinHandle<()>) -> Self {
123        Self {
124            total_chunks,
125            chunks: [const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize],
126            chunks_bitmap: 0,
127            current_length: 0,
128            timeout_task,
129        }
130    }
131
132    fn is_chunk_present(&self, sequence_number: u8) -> bool {
133        let chunk_bitmap_id = 1 << sequence_number;
134        self.chunks_bitmap & chunk_bitmap_id != 0
135    }
136
137    fn add_chunk(&mut self, sequence_number: u8, chunk: Bytes) {
138        let chunk_bitmap_id = 1 << sequence_number;
139        self.chunks_bitmap |= chunk_bitmap_id;
140        self.current_length += chunk.remaining();
141        self.chunks[sequence_number as usize] = chunk;
142    }
143
144    fn is_complete(&self) -> bool {
145        self.chunks_bitmap.count_ones() == self.total_chunks as u32
146    }
147
148    fn current_length(&self) -> usize {
149        self.current_length
150    }
151
152    fn retrieve_message(&self) -> Option<Bytes> {
153        if self.is_complete() {
154            self.timeout_task.abort();
155            let chunks = &self.chunks[0..self.total_chunks as usize];
156            let mut message = BytesMut::new();
157            for chunk in chunks {
158                message.extend_from_slice(chunk);
159            }
160            Some(message.freeze())
161        } else {
162            None
163        }
164    }
165}
166
167#[derive(Debug, PartialEq, Eq)]
168pub enum ChunkedGelfDecompression {
169    Gzip,
170    Zlib,
171    None,
172}
173
174impl ChunkedGelfDecompression {
175    pub fn from_magic(data: &Bytes) -> Self {
176        if data.starts_with(GZIP_MAGIC) {
177            trace!("Detected Gzip compression");
178            return Self::Gzip;
179        }
180
181        if data.starts_with(ZLIB_MAGIC) {
182            // Based on https://datatracker.ietf.org/doc/html/rfc1950#section-2.2
183            if let Some([first_byte, second_byte]) = data.get(0..2)
184                && (*first_byte as u16 * 256 + *second_byte as u16).is_multiple_of(31)
185            {
186                trace!("Detected Zlib compression");
187                return Self::Zlib;
188            };
189
190            warn!(
191                "Detected Zlib magic bytes but the header is invalid: {:?}",
192                data.get(0..2)
193            );
194        };
195
196        trace!("No compression detected",);
197        Self::None
198    }
199
200    pub fn decompress(&self, data: Bytes) -> Result<Bytes, ChunkedGelfDecompressionError> {
201        let decompressed = match self {
202            Self::Gzip => {
203                let mut decoder = MultiGzDecoder::new(data.reader());
204                let mut decompressed = Vec::new();
205                decoder
206                    .read_to_end(&mut decompressed)
207                    .context(GzipDecompressionSnafu)?;
208                Bytes::from(decompressed)
209            }
210            Self::Zlib => {
211                let mut decoder = ZlibDecoder::new(data.reader());
212                let mut decompressed = Vec::new();
213                decoder
214                    .read_to_end(&mut decompressed)
215                    .context(ZlibDecompressionSnafu)?;
216                Bytes::from(decompressed)
217            }
218            Self::None => data,
219        };
220        Ok(decompressed)
221    }
222}
223
224#[derive(Debug, Snafu)]
225pub enum ChunkedGelfDecompressionError {
226    #[snafu(display("Gzip decompression error: {source}"))]
227    GzipDecompression { source: std::io::Error },
228    #[snafu(display("Zlib decompression error: {source}"))]
229    ZlibDecompression { source: std::io::Error },
230}
231
232#[derive(Debug, Snafu)]
233pub enum ChunkedGelfDecoderError {
234    #[snafu(display("Invalid chunk header with less than 10 bytes: 0x{header:0x}"))]
235    InvalidChunkHeader { header: Bytes },
236    #[snafu(display(
237        "Received chunk with message id {message_id} and sequence number {sequence_number} has an invalid total chunks value of {total_chunks}. It must be between 1 and {GELF_MAX_TOTAL_CHUNKS}."
238    ))]
239    InvalidTotalChunks {
240        message_id: u64,
241        sequence_number: u8,
242        total_chunks: u8,
243    },
244    #[snafu(display(
245        "Received chunk with message id {message_id} and sequence number {sequence_number} has a sequence number greater than its total chunks value of {total_chunks}"
246    ))]
247    InvalidSequenceNumber {
248        message_id: u64,
249        sequence_number: u8,
250        total_chunks: u8,
251    },
252    #[snafu(display(
253        "Pending messages limit of {pending_messages_limit} reached while processing chunk with message id {message_id} and sequence number {sequence_number}"
254    ))]
255    PendingMessagesLimitReached {
256        message_id: u64,
257        sequence_number: u8,
258        pending_messages_limit: usize,
259    },
260    #[snafu(display(
261        "Received chunk with message id {message_id} and sequence number {sequence_number} has different total chunks values: original total chunks value is {original_total_chunks} and received total chunks value is {received_total_chunks}"
262    ))]
263    TotalChunksMismatch {
264        message_id: u64,
265        sequence_number: u8,
266        original_total_chunks: u8,
267        received_total_chunks: u8,
268    },
269    #[snafu(display(
270        "Message with id {message_id} has exceeded the maximum message length and it will be dropped: got {length} bytes and max message length is {max_length} bytes. Discarding all buffered chunks of that message"
271    ))]
272    MaxLengthExceed {
273        message_id: u64,
274        sequence_number: u8,
275        length: usize,
276        max_length: usize,
277    },
278    #[snafu(display("Error while decompressing message. {source}"))]
279    Decompression {
280        source: ChunkedGelfDecompressionError,
281    },
282}
283
284impl StreamDecodingError for ChunkedGelfDecoderError {
285    fn can_continue(&self) -> bool {
286        true
287    }
288}
289
290impl FramingError for ChunkedGelfDecoderError {
291    fn as_any(&self) -> &dyn Any {
292        self as &dyn Any
293    }
294}
295
296/// A codec for handling GELF messages that may be chunked. The implementation is based on [Graylog's GELF documentation](https://go2docs.graylog.org/5-0/getting_in_log_data/gelf.html#GELFviaUDP)
297/// and [Graylog's go-gelf library](https://github.com/Graylog2/go-gelf/blob/v1/gelf/reader.go).
298#[derive(Debug, Clone)]
299pub struct ChunkedGelfDecoder {
300    // We have to use this decoder to read all the bytes from the buffer first and don't let tokio
301    // read it buffered, as tokio FramedRead will not always call the decode method with the
302    // whole message. (see https://docs.rs/tokio-util/latest/src/tokio_util/codec/framed_impl.rs.html#26).
303    // This limitation is due to the fact that the GELF format does not specify the length of the
304    // message, so we have to read all the bytes from the message (datagram)
305    bytes_decoder: BytesDecoder,
306    decompression_config: ChunkedGelfDecompressionConfig,
307    state: Arc<Mutex<HashMap<u64, MessageState>>>,
308    timeout: Duration,
309    pending_messages_limit: Option<usize>,
310    max_length: Option<usize>,
311}
312
313impl ChunkedGelfDecoder {
314    /// Creates a new `ChunkedGelfDecoder`.
315    pub fn new(
316        timeout_secs: f64,
317        pending_messages_limit: Option<usize>,
318        max_length: Option<usize>,
319        decompression_config: ChunkedGelfDecompressionConfig,
320    ) -> Self {
321        Self {
322            bytes_decoder: BytesDecoder::new(),
323            decompression_config,
324            state: Arc::new(Mutex::new(HashMap::new())),
325            timeout: Duration::from_secs_f64(timeout_secs),
326            pending_messages_limit,
327            max_length,
328        }
329    }
330
331    /// Decode a GELF chunk
332    pub fn decode_chunk(
333        &mut self,
334        mut chunk: Bytes,
335    ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
336        // Encoding scheme:
337        //
338        // +------------+-----------------+--------------+----------------------+
339        // | Message id | Sequence number | Total chunks |    Chunk payload     |
340        // +------------+-----------------+--------------+----------------------+
341        // | 64 bits    | 8 bits          | 8 bits       | remaining bits       |
342        // +------------+-----------------+--------------+----------------------+
343        //
344        // As this codec is oriented for UDP, the chunks (datagrams) are not guaranteed to be received in order,
345        // nor to be received at all. So, we have to store the chunks in a buffer (state field) until we receive
346        // all the chunks of a message. When we receive all the chunks of a message, we can concatenate them
347        // and return the complete payload.
348
349        // We need 10 bytes to read the message id, sequence number and total chunks
350        ensure!(
351            chunk.remaining() >= 10,
352            InvalidChunkHeaderSnafu { header: chunk }
353        );
354
355        let message_id = chunk.get_u64();
356        let sequence_number = chunk.get_u8();
357        let total_chunks = chunk.get_u8();
358
359        ensure!(
360            total_chunks > 0 && total_chunks <= GELF_MAX_TOTAL_CHUNKS,
361            InvalidTotalChunksSnafu {
362                message_id,
363                sequence_number,
364                total_chunks
365            }
366        );
367
368        ensure!(
369            sequence_number < total_chunks,
370            InvalidSequenceNumberSnafu {
371                message_id,
372                sequence_number,
373                total_chunks
374            }
375        );
376
377        let mut state_lock = self.state.lock().expect("poisoned lock");
378
379        if let Some(pending_messages_limit) = self.pending_messages_limit {
380            ensure!(
381                state_lock.len() < pending_messages_limit,
382                PendingMessagesLimitReachedSnafu {
383                    message_id,
384                    sequence_number,
385                    pending_messages_limit
386                }
387            );
388        }
389
390        let message_state = state_lock.entry(message_id).or_insert_with(|| {
391            // We need to spawn a task that will clear the message state after a certain time
392            // otherwise we will have a memory leak due to messages that never complete
393            let state = Arc::clone(&self.state);
394            let timeout = self.timeout;
395            let timeout_handle = tokio::spawn(async move {
396                tokio::time::sleep(timeout).await;
397                let mut state_lock = state.lock().expect("poisoned lock");
398                if state_lock.remove(&message_id).is_some() {
399                    warn!(
400                        message_id = message_id,
401                        timeout_secs = timeout.as_secs_f64(),
402                        "Message was not fully received within the timeout window. Discarding it."
403                    );
404                }
405            });
406            MessageState::new(total_chunks, timeout_handle)
407        });
408
409        ensure!(
410            message_state.total_chunks == total_chunks,
411            TotalChunksMismatchSnafu {
412                message_id,
413                sequence_number,
414                original_total_chunks: message_state.total_chunks,
415                received_total_chunks: total_chunks
416            }
417        );
418
419        if message_state.is_chunk_present(sequence_number) {
420            debug!(
421                message_id = message_id,
422                sequence_number = sequence_number,
423                "Received a duplicate chunk. Ignoring it."
424            );
425            return Ok(None);
426        }
427
428        message_state.add_chunk(sequence_number, chunk);
429
430        if let Some(max_length) = self.max_length {
431            let length = message_state.current_length();
432            if length > max_length {
433                state_lock.remove(&message_id);
434                return Err(ChunkedGelfDecoderError::MaxLengthExceed {
435                    message_id,
436                    sequence_number,
437                    length,
438                    max_length,
439                });
440            }
441        }
442
443        if let Some(message) = message_state.retrieve_message() {
444            state_lock.remove(&message_id);
445            Ok(Some(message))
446        } else {
447            Ok(None)
448        }
449    }
450
451    /// Decode a GELF message that may be chunked or not. The source bytes are expected to be
452    /// datagram-based (or message-based), so it must not contain multiple GELF messages
453    /// delimited by '\0', such as it would be in a stream-based protocol.
454    pub fn decode_message(
455        &mut self,
456        mut src: Bytes,
457    ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
458        let message = if src.starts_with(GELF_MAGIC) {
459            trace!("Received a chunked GELF message based on the magic bytes");
460            src.advance(2);
461            self.decode_chunk(src)?
462        } else {
463            trace!(
464                "Received an unchunked GELF message. First two bytes of message: {:?}",
465                &src[0..2]
466            );
467            Some(src)
468        };
469
470        // We can have both chunked and unchunked messages that are compressed
471        message
472            .map(|message| {
473                self.decompression_config
474                    .get_decompression(&message)
475                    .decompress(message)
476                    .context(DecompressionSnafu)
477            })
478            .transpose()
479    }
480}
481
482impl Default for ChunkedGelfDecoder {
483    fn default() -> Self {
484        Self::new(
485            DEFAULT_TIMEOUT_SECS,
486            None,
487            None,
488            ChunkedGelfDecompressionConfig::Auto,
489        )
490    }
491}
492
493impl Decoder for ChunkedGelfDecoder {
494    type Item = Bytes;
495
496    type Error = BoxedFramingError;
497
498    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
499        if src.is_empty() {
500            return Ok(None);
501        }
502
503        Ok(self
504            .bytes_decoder
505            .decode(src)?
506            .and_then(|frame| self.decode_message(frame).transpose())
507            .transpose()?)
508    }
509    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
510        if buf.is_empty() {
511            return Ok(None);
512        }
513
514        Ok(self
515            .bytes_decoder
516            .decode_eof(buf)?
517            .and_then(|frame| self.decode_message(frame).transpose())
518            .transpose()?)
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use std::{fmt::Write as FmtWrite, io::Write as IoWrite};
525
526    use bytes::{BufMut, BytesMut};
527    use flate2::write::{GzEncoder, ZlibEncoder};
528    use rand::{SeedableRng, rngs::SmallRng, seq::SliceRandom};
529    use rstest::{fixture, rstest};
530    use tracing_test::traced_test;
531
532    use super::*;
533
534    pub enum Compression {
535        Gzip,
536        Zlib,
537    }
538
539    impl Compression {
540        pub fn compress(&self, payload: &impl AsRef<[u8]>) -> Bytes {
541            self.compress_with_level(payload, flate2::Compression::default())
542        }
543
544        pub fn compress_with_level(
545            &self,
546            payload: &impl AsRef<[u8]>,
547            level: flate2::Compression,
548        ) -> Bytes {
549            match self {
550                Compression::Gzip => {
551                    let mut encoder = GzEncoder::new(Vec::new(), level);
552                    encoder
553                        .write_all(payload.as_ref())
554                        .expect("failed to write to encoder");
555                    encoder.finish().expect("failed to finish encoder").into()
556                }
557                Compression::Zlib => {
558                    let mut encoder = ZlibEncoder::new(Vec::new(), level);
559                    encoder
560                        .write_all(payload.as_ref())
561                        .expect("failed to write to encoder");
562                    encoder.finish().expect("failed to finish encoder").into()
563                }
564            }
565        }
566    }
567
568    fn create_chunk(
569        message_id: u64,
570        sequence_number: u8,
571        total_chunks: u8,
572        payload: &impl AsRef<[u8]>,
573    ) -> BytesMut {
574        let mut chunk = BytesMut::new();
575        chunk.put_slice(GELF_MAGIC);
576        chunk.put_u64(message_id);
577        chunk.put_u8(sequence_number);
578        chunk.put_u8(total_chunks);
579        chunk.extend_from_slice(payload.as_ref());
580        chunk
581    }
582
583    #[fixture]
584    fn unchunked_message() -> (BytesMut, String) {
585        let payload = "foo";
586        (BytesMut::from(payload), payload.to_string())
587    }
588
589    #[fixture]
590    fn two_chunks_message() -> ([BytesMut; 2], String) {
591        let message_id = 1u64;
592        let total_chunks = 2u8;
593
594        let first_sequence_number = 0u8;
595        let first_payload = "foo";
596        let first_chunk = create_chunk(
597            message_id,
598            first_sequence_number,
599            total_chunks,
600            &first_payload,
601        );
602
603        let second_sequence_number = 1u8;
604        let second_payload = "bar";
605        let second_chunk = create_chunk(
606            message_id,
607            second_sequence_number,
608            total_chunks,
609            &second_payload,
610        );
611
612        (
613            [first_chunk, second_chunk],
614            format!("{first_payload}{second_payload}"),
615        )
616    }
617
618    #[fixture]
619    fn three_chunks_message() -> ([BytesMut; 3], String) {
620        let message_id = 2u64;
621        let total_chunks = 3u8;
622
623        let first_sequence_number = 0u8;
624        let first_payload = "foo";
625        let first_chunk = create_chunk(
626            message_id,
627            first_sequence_number,
628            total_chunks,
629            &first_payload,
630        );
631
632        let second_sequence_number = 1u8;
633        let second_payload = "bar";
634        let second_chunk = create_chunk(
635            message_id,
636            second_sequence_number,
637            total_chunks,
638            &second_payload,
639        );
640
641        let third_sequence_number = 2u8;
642        let third_payload = "baz";
643        let third_chunk = create_chunk(
644            message_id,
645            third_sequence_number,
646            total_chunks,
647            &third_payload,
648        );
649
650        (
651            [first_chunk, second_chunk, third_chunk],
652            format!("{first_payload}{second_payload}{third_payload}"),
653        )
654    }
655
656    fn downcast_framing_error(error: &BoxedFramingError) -> &ChunkedGelfDecoderError {
657        error
658            .as_any()
659            .downcast_ref::<ChunkedGelfDecoderError>()
660            .expect("Expected ChunkedGelfDecoderError to be downcasted")
661    }
662
663    #[rstest]
664    #[tokio::test]
665    async fn decode_chunked(two_chunks_message: ([BytesMut; 2], String)) {
666        let (mut chunks, expected_message) = two_chunks_message;
667        let mut decoder = ChunkedGelfDecoder::default();
668
669        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
670        assert!(frame.is_none());
671
672        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
673        assert_eq!(frame, Some(Bytes::from(expected_message)));
674    }
675
676    #[rstest]
677    #[tokio::test]
678    async fn decode_unchunked(unchunked_message: (BytesMut, String)) {
679        let (mut message, expected_message) = unchunked_message;
680        let mut decoder = ChunkedGelfDecoder::default();
681
682        let frame = decoder.decode_eof(&mut message).unwrap();
683        assert_eq!(frame, Some(Bytes::from(expected_message)));
684    }
685
686    #[rstest]
687    #[tokio::test]
688    async fn decode_unordered_chunks(two_chunks_message: ([BytesMut; 2], String)) {
689        let (mut chunks, expected_message) = two_chunks_message;
690        let mut decoder = ChunkedGelfDecoder::default();
691
692        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
693        assert!(frame.is_none());
694
695        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
696        assert_eq!(frame, Some(Bytes::from(expected_message)));
697    }
698
699    #[rstest]
700    #[tokio::test]
701    async fn decode_unordered_messages(
702        two_chunks_message: ([BytesMut; 2], String),
703        three_chunks_message: ([BytesMut; 3], String),
704    ) {
705        let (mut two_chunks, two_chunks_expected) = two_chunks_message;
706        let (mut three_chunks, three_chunks_expected) = three_chunks_message;
707        let mut decoder = ChunkedGelfDecoder::default();
708
709        let frame = decoder.decode_eof(&mut three_chunks[2]).unwrap();
710        assert!(frame.is_none());
711
712        let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
713        assert!(frame.is_none());
714
715        let frame = decoder.decode_eof(&mut three_chunks[0]).unwrap();
716        assert!(frame.is_none());
717
718        let frame = decoder.decode_eof(&mut two_chunks[1]).unwrap();
719        assert_eq!(frame, Some(Bytes::from(two_chunks_expected)));
720
721        let frame = decoder.decode_eof(&mut three_chunks[1]).unwrap();
722        assert_eq!(frame, Some(Bytes::from(three_chunks_expected)));
723    }
724
725    #[rstest]
726    #[tokio::test]
727    async fn decode_mixed_chunked_and_unchunked_messages(
728        unchunked_message: (BytesMut, String),
729        two_chunks_message: ([BytesMut; 2], String),
730    ) {
731        let (mut unchunked_message, expected_unchunked_message) = unchunked_message;
732        let (mut chunks, expected_chunked_message) = two_chunks_message;
733        let mut decoder = ChunkedGelfDecoder::default();
734
735        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
736        assert!(frame.is_none());
737
738        let frame = decoder.decode_eof(&mut unchunked_message).unwrap();
739        assert_eq!(frame, Some(Bytes::from(expected_unchunked_message)));
740
741        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
742        assert_eq!(frame, Some(Bytes::from(expected_chunked_message)));
743    }
744
745    #[tokio::test]
746    async fn decode_shuffled_messages() {
747        let mut rng = SmallRng::seed_from_u64(42);
748        let total_chunks = 100u8;
749        let first_message_id = 1u64;
750        let first_payload = "first payload";
751        let second_message_id = 2u64;
752        let second_payload = "second payload";
753        let first_message_chunks = (0..total_chunks).map(|sequence_number| {
754            create_chunk(
755                first_message_id,
756                sequence_number,
757                total_chunks,
758                &first_payload,
759            )
760        });
761        let second_message_chunks = (0..total_chunks).map(|sequence_number| {
762            create_chunk(
763                second_message_id,
764                sequence_number,
765                total_chunks,
766                &second_payload,
767            )
768        });
769        let expected_first_message = first_payload.repeat(total_chunks as usize);
770        let expected_second_message = second_payload.repeat(total_chunks as usize);
771        let mut merged_chunks = first_message_chunks
772            .chain(second_message_chunks)
773            .collect::<Vec<_>>();
774        merged_chunks.shuffle(&mut rng);
775        let mut decoder = ChunkedGelfDecoder::default();
776
777        let mut count = 0;
778        let first_retrieved_message = loop {
779            assert!(count < 2 * total_chunks as usize);
780            if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
781                break message;
782            } else {
783                count += 1;
784            }
785        };
786        let second_retrieved_message = loop {
787            assert!(count < 2 * total_chunks as usize);
788            if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
789                break message;
790            } else {
791                count += 1
792            }
793        };
794
795        assert_eq!(second_retrieved_message, expected_first_message);
796        assert_eq!(first_retrieved_message, expected_second_message);
797    }
798
799    #[rstest]
800    #[tokio::test(start_paused = true)]
801    #[traced_test]
802    async fn decode_timeout(two_chunks_message: ([BytesMut; 2], String)) {
803        let (mut chunks, _) = two_chunks_message;
804        let mut decoder = ChunkedGelfDecoder::default();
805
806        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
807        assert!(frame.is_none());
808        assert!(!decoder.state.lock().unwrap().is_empty());
809
810        // The message state should be cleared after a certain time
811        tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
812        assert!(decoder.state.lock().unwrap().is_empty());
813        assert!(logs_contain(
814            "Message was not fully received within the timeout window. Discarding it."
815        ));
816
817        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
818        assert!(frame.is_none());
819
820        tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
821        assert!(decoder.state.lock().unwrap().is_empty());
822        assert!(logs_contain(
823            "Message was not fully received within the timeout window. Discarding it"
824        ));
825    }
826
827    #[tokio::test]
828    async fn decode_empty_input() {
829        let mut src = BytesMut::new();
830        let mut decoder = ChunkedGelfDecoder::default();
831
832        let frame = decoder.decode_eof(&mut src).unwrap();
833        assert!(frame.is_none());
834    }
835
836    #[tokio::test]
837    async fn decode_chunk_with_invalid_header() {
838        let mut src = BytesMut::new();
839        src.extend_from_slice(GELF_MAGIC);
840        // Invalid chunk header with less than 10 bytes
841        let invalid_chunk = [0x12, 0x34];
842        src.extend_from_slice(&invalid_chunk);
843        let mut decoder = ChunkedGelfDecoder::default();
844        let frame = decoder.decode_eof(&mut src);
845
846        let error = frame.unwrap_err();
847        let downcasted_error = downcast_framing_error(&error);
848        assert!(matches!(
849            downcasted_error,
850            ChunkedGelfDecoderError::InvalidChunkHeader { .. }
851        ));
852    }
853
854    #[tokio::test]
855    async fn decode_chunk_with_invalid_total_chunks() {
856        let message_id = 1u64;
857        let sequence_number = 1u8;
858        let invalid_total_chunks = GELF_MAX_TOTAL_CHUNKS + 1;
859        let payload = "foo";
860        let mut chunk = create_chunk(message_id, sequence_number, invalid_total_chunks, &payload);
861        let mut decoder = ChunkedGelfDecoder::default();
862
863        let frame = decoder.decode_eof(&mut chunk);
864        let error = frame.unwrap_err();
865        let downcasted_error = downcast_framing_error(&error);
866        assert!(matches!(
867            downcasted_error,
868            ChunkedGelfDecoderError::InvalidTotalChunks {
869                message_id: 1,
870                sequence_number: 1,
871                total_chunks: 129,
872            }
873        ));
874    }
875
876    #[tokio::test]
877    async fn decode_chunk_with_invalid_sequence_number() {
878        let message_id = 1u64;
879        let total_chunks = 2u8;
880        let invalid_sequence_number = total_chunks + 1;
881        let payload = "foo";
882        let mut chunk = create_chunk(message_id, invalid_sequence_number, total_chunks, &payload);
883        let mut decoder = ChunkedGelfDecoder::default();
884
885        let frame = decoder.decode_eof(&mut chunk);
886        let error = frame.unwrap_err();
887        let downcasted_error = downcast_framing_error(&error);
888        assert!(matches!(
889            downcasted_error,
890            ChunkedGelfDecoderError::InvalidSequenceNumber {
891                message_id: 1,
892                sequence_number: 3,
893                total_chunks: 2,
894            }
895        ));
896    }
897
898    #[rstest]
899    #[tokio::test]
900    async fn decode_reached_pending_messages_limit(
901        two_chunks_message: ([BytesMut; 2], String),
902        three_chunks_message: ([BytesMut; 3], String),
903    ) {
904        let (mut two_chunks, _) = two_chunks_message;
905        let (mut three_chunks, _) = three_chunks_message;
906        let mut decoder = ChunkedGelfDecoder {
907            pending_messages_limit: Some(1),
908            ..Default::default()
909        };
910
911        let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
912        assert!(frame.is_none());
913        assert!(decoder.state.lock().unwrap().len() == 1);
914
915        let frame = decoder.decode_eof(&mut three_chunks[0]);
916        let error = frame.unwrap_err();
917        let downcasted_error = downcast_framing_error(&error);
918        assert!(matches!(
919            downcasted_error,
920            ChunkedGelfDecoderError::PendingMessagesLimitReached {
921                message_id: 2u64,
922                sequence_number: 0u8,
923                pending_messages_limit: 1,
924            }
925        ));
926        assert!(decoder.state.lock().unwrap().len() == 1);
927    }
928
929    #[rstest]
930    #[tokio::test]
931    async fn decode_chunk_with_different_total_chunks() {
932        let message_id = 1u64;
933        let sequence_number = 0u8;
934        let total_chunks = 2u8;
935        let payload = "foo";
936        let mut first_chunk = create_chunk(message_id, sequence_number, total_chunks, &payload);
937        let mut second_chunk =
938            create_chunk(message_id, sequence_number + 1, total_chunks + 1, &payload);
939        let mut decoder = ChunkedGelfDecoder::default();
940
941        let frame = decoder.decode_eof(&mut first_chunk).unwrap();
942        assert!(frame.is_none());
943
944        let frame = decoder.decode_eof(&mut second_chunk);
945        let error = frame.unwrap_err();
946        let downcasted_error = downcast_framing_error(&error);
947        assert!(matches!(
948            downcasted_error,
949            ChunkedGelfDecoderError::TotalChunksMismatch {
950                message_id: 1,
951                sequence_number: 1,
952                original_total_chunks: 2,
953                received_total_chunks: 3,
954            }
955        ));
956    }
957
958    #[rstest]
959    #[tokio::test]
960    async fn decode_message_greater_than_max_length(two_chunks_message: ([BytesMut; 2], String)) {
961        let (mut chunks, _) = two_chunks_message;
962        let mut decoder = ChunkedGelfDecoder {
963            max_length: Some(5),
964            ..Default::default()
965        };
966
967        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
968        assert!(frame.is_none());
969        let frame = decoder.decode_eof(&mut chunks[1]);
970        let error = frame.unwrap_err();
971        let downcasted_error = downcast_framing_error(&error);
972        assert!(matches!(
973            downcasted_error,
974            ChunkedGelfDecoderError::MaxLengthExceed {
975                message_id: 1,
976                sequence_number: 1,
977                length: 6,
978                max_length: 5,
979            }
980        ));
981        assert_eq!(decoder.state.lock().unwrap().len(), 0);
982    }
983
984    #[rstest]
985    #[tokio::test]
986    #[traced_test]
987    async fn decode_duplicated_chunk(two_chunks_message: ([BytesMut; 2], String)) {
988        let (mut chunks, _) = two_chunks_message;
989        let mut decoder = ChunkedGelfDecoder::default();
990
991        let frame = decoder.decode_eof(&mut chunks[0].clone()).unwrap();
992        assert!(frame.is_none());
993
994        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
995        assert!(frame.is_none());
996        assert!(logs_contain("Received a duplicate chunk. Ignoring it."));
997    }
998
999    #[tokio::test]
1000    #[rstest]
1001    #[case::gzip(Compression::Gzip)]
1002    #[case::zlib(Compression::Zlib)]
1003    async fn decode_compressed_unchunked_message(#[case] compression: Compression) {
1004        let payload = (0..100).fold(String::new(), |mut payload, n| {
1005            write!(payload, "foo{n}").unwrap();
1006            payload
1007        });
1008        let compressed_payload = compression.compress(&payload);
1009        let mut decoder = ChunkedGelfDecoder::default();
1010
1011        let frame = decoder
1012            .decode_eof(&mut compressed_payload.into())
1013            .expect("decoding should not fail")
1014            .expect("decoding should return a frame");
1015
1016        assert_eq!(frame, payload);
1017    }
1018
1019    #[tokio::test]
1020    #[rstest]
1021    #[case::gzip(Compression::Gzip)]
1022    #[case::zlib(Compression::Zlib)]
1023    async fn decode_compressed_chunked_message(#[case] compression: Compression) {
1024        let message_id = 1u64;
1025        let max_chunk_size = 5;
1026        let payload = (0..100).fold(String::new(), |mut payload, n| {
1027            write!(payload, "foo{n}").unwrap();
1028            payload
1029        });
1030        let compressed_payload = compression.compress(&payload);
1031        let total_chunks = compressed_payload.len().div_ceil(max_chunk_size) as u8;
1032        assert!(total_chunks < GELF_MAX_TOTAL_CHUNKS);
1033        let mut chunks = compressed_payload
1034            .chunks(max_chunk_size)
1035            .enumerate()
1036            .map(|(i, chunk)| create_chunk(message_id, i as u8, total_chunks, &chunk))
1037            .collect::<Vec<_>>();
1038        let (last_chunk, first_chunks) =
1039            chunks.split_last_mut().expect("chunks should not be empty");
1040        let mut decoder = ChunkedGelfDecoder::default();
1041
1042        for chunk in first_chunks {
1043            let frame = decoder.decode_eof(chunk).expect("decoding should not fail");
1044            assert!(frame.is_none());
1045        }
1046        let frame = decoder
1047            .decode_eof(last_chunk)
1048            .expect("decoding should not fail")
1049            .expect("decoding should return a frame");
1050
1051        assert_eq!(frame, payload);
1052    }
1053
1054    #[tokio::test]
1055    async fn decode_malformed_gzip_message() {
1056        let mut compressed_payload = BytesMut::new();
1057        compressed_payload.extend(GZIP_MAGIC);
1058        compressed_payload.extend(&[0x12, 0x34, 0x56, 0x78]);
1059        let mut decoder = ChunkedGelfDecoder::default();
1060
1061        let error = decoder
1062            .decode_eof(&mut compressed_payload)
1063            .expect_err("decoding should fail");
1064
1065        let downcasted_error = downcast_framing_error(&error);
1066        assert!(matches!(
1067            downcasted_error,
1068            ChunkedGelfDecoderError::Decompression {
1069                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1070            }
1071        ));
1072    }
1073
1074    #[tokio::test]
1075    async fn decode_malformed_zlib_message() {
1076        let mut compressed_payload = BytesMut::new();
1077        compressed_payload.extend(ZLIB_MAGIC);
1078        compressed_payload.extend(&[0x9c, 0x12, 0x00, 0xFF]);
1079        let mut decoder = ChunkedGelfDecoder::default();
1080
1081        let error = decoder
1082            .decode_eof(&mut compressed_payload)
1083            .expect_err("decoding should fail");
1084
1085        let downcasted_error = downcast_framing_error(&error);
1086        assert!(matches!(
1087            downcasted_error,
1088            ChunkedGelfDecoderError::Decompression {
1089                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1090            }
1091        ));
1092    }
1093
1094    #[tokio::test]
1095    async fn decode_zlib_payload_with_zlib_decoder() {
1096        let payload = "foo";
1097        let compressed_payload = Compression::Zlib.compress(&payload);
1098        let mut decoder = ChunkedGelfDecoder {
1099            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1100            ..Default::default()
1101        };
1102
1103        let frame = decoder
1104            .decode_eof(&mut compressed_payload.into())
1105            .expect("decoding should not fail")
1106            .expect("decoding should return a frame");
1107
1108        assert_eq!(frame, payload);
1109    }
1110
1111    #[tokio::test]
1112    async fn decode_gzip_payload_with_zlib_decoder() {
1113        let payload = "foo";
1114        let compressed_payload = Compression::Gzip.compress(&payload);
1115        let mut decoder = ChunkedGelfDecoder {
1116            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1117            ..Default::default()
1118        };
1119
1120        let error = decoder
1121            .decode_eof(&mut compressed_payload.into())
1122            .expect_err("decoding should fail");
1123
1124        let downcasted_error = downcast_framing_error(&error);
1125        assert!(matches!(
1126            downcasted_error,
1127            ChunkedGelfDecoderError::Decompression {
1128                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1129            }
1130        ));
1131    }
1132
1133    #[tokio::test]
1134    async fn decode_uncompressed_payload_with_zlib_decoder() {
1135        let payload = "foo";
1136        let mut decoder = ChunkedGelfDecoder {
1137            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1138            ..Default::default()
1139        };
1140
1141        let error = decoder
1142            .decode_eof(&mut payload.into())
1143            .expect_err("decoding should fail");
1144
1145        let downcasted_error = downcast_framing_error(&error);
1146        assert!(matches!(
1147            downcasted_error,
1148            ChunkedGelfDecoderError::Decompression {
1149                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1150            }
1151        ));
1152    }
1153
1154    #[tokio::test]
1155    async fn decode_gzip_payload_with_gzip_decoder() {
1156        let payload = "foo";
1157        let compressed_payload = Compression::Gzip.compress(&payload);
1158        let mut decoder = ChunkedGelfDecoder {
1159            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1160            ..Default::default()
1161        };
1162
1163        let frame = decoder
1164            .decode_eof(&mut compressed_payload.into())
1165            .expect("decoding should not fail")
1166            .expect("decoding should return a frame");
1167
1168        assert_eq!(frame, payload);
1169    }
1170
1171    #[tokio::test]
1172    async fn decode_zlib_payload_with_gzip_decoder() {
1173        let payload = "foo";
1174        let compressed_payload = Compression::Zlib.compress(&payload);
1175        let mut decoder = ChunkedGelfDecoder {
1176            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1177            ..Default::default()
1178        };
1179
1180        let error = decoder
1181            .decode_eof(&mut compressed_payload.into())
1182            .expect_err("decoding should fail");
1183
1184        let downcasted_error = downcast_framing_error(&error);
1185        assert!(matches!(
1186            downcasted_error,
1187            ChunkedGelfDecoderError::Decompression {
1188                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1189            }
1190        ));
1191    }
1192
1193    #[tokio::test]
1194    async fn decode_uncompressed_payload_with_gzip_decoder() {
1195        let payload = "foo";
1196        let mut decoder = ChunkedGelfDecoder {
1197            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1198            ..Default::default()
1199        };
1200
1201        let error = decoder
1202            .decode_eof(&mut payload.into())
1203            .expect_err("decoding should fail");
1204
1205        let downcasted_error = downcast_framing_error(&error);
1206        assert!(matches!(
1207            downcasted_error,
1208            ChunkedGelfDecoderError::Decompression {
1209                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1210            }
1211        ));
1212    }
1213
1214    #[tokio::test]
1215    #[rstest]
1216    #[case::gzip(Compression::Gzip)]
1217    #[case::zlib(Compression::Zlib)]
1218    async fn decode_compressed_payload_with_no_decompression_decoder(
1219        #[case] compression: Compression,
1220    ) {
1221        let payload = "foo";
1222        let compressed_payload = compression.compress(&payload);
1223        let mut decoder = ChunkedGelfDecoder {
1224            decompression_config: ChunkedGelfDecompressionConfig::None,
1225            ..Default::default()
1226        };
1227
1228        let frame = decoder
1229            .decode_eof(&mut compressed_payload.clone().into())
1230            .expect("decoding should not fail")
1231            .expect("decoding should return a frame");
1232
1233        assert_eq!(frame, compressed_payload);
1234    }
1235
1236    #[test]
1237    fn detect_gzip_compression() {
1238        let payload = "foo";
1239
1240        for level in 0..=9 {
1241            let level = flate2::Compression::new(level);
1242            let compressed_payload = Compression::Gzip.compress_with_level(&payload, level);
1243            let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1244            assert_eq!(
1245                actual,
1246                ChunkedGelfDecompression::Gzip,
1247                "Failed for level {}",
1248                level.level()
1249            );
1250        }
1251    }
1252
1253    #[test]
1254    fn detect_zlib_compression() {
1255        let payload = "foo";
1256
1257        for level in 0..=9 {
1258            let level = flate2::Compression::new(level);
1259            let compressed_payload = Compression::Zlib.compress_with_level(&payload, level);
1260            let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1261            assert_eq!(
1262                actual,
1263                ChunkedGelfDecompression::Zlib,
1264                "Failed for level {}",
1265                level.level()
1266            );
1267        }
1268    }
1269
1270    #[test]
1271    fn detect_no_compression() {
1272        let payload = "foo";
1273
1274        let detected_compression = ChunkedGelfDecompression::from_magic(&payload.into());
1275
1276        assert_eq!(detected_compression, ChunkedGelfDecompression::None);
1277    }
1278}