1use std::{
2 any::Any,
3 collections::HashMap,
4 io::Read,
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9use bytes::{Buf, Bytes, BytesMut};
10use derivative::Derivative;
11use flate2::read::{MultiGzDecoder, ZlibDecoder};
12use snafu::{ResultExt, Snafu, ensure};
13use tokio::{self, task::JoinHandle};
14use tokio_util::codec::Decoder;
15use tracing::{debug, trace, warn};
16use vector_common::constants::{GZIP_MAGIC, ZLIB_MAGIC};
17use vector_config::configurable_component;
18
19use super::{BoxedFramingError, FramingError};
20use crate::{BytesDecoder, StreamDecodingError};
21
22const GELF_MAGIC: &[u8] = &[0x1e, 0x0f];
23const GELF_MAX_TOTAL_CHUNKS: u8 = 128;
24const DEFAULT_TIMEOUT_SECS: f64 = 5.0;
25
26const fn default_timeout_secs() -> f64 {
27 DEFAULT_TIMEOUT_SECS
28}
29
30#[configurable_component]
32#[derive(Debug, Clone, Default)]
33pub struct ChunkedGelfDecoderConfig {
34 #[serde(default)]
36 pub chunked_gelf: ChunkedGelfDecoderOptions,
37}
38
39impl ChunkedGelfDecoderConfig {
40 pub fn build(&self) -> ChunkedGelfDecoder {
42 ChunkedGelfDecoder::new(
43 self.chunked_gelf.timeout_secs,
44 self.chunked_gelf.pending_messages_limit,
45 self.chunked_gelf.max_length,
46 self.chunked_gelf.decompression,
47 )
48 }
49}
50
51#[configurable_component]
53#[derive(Clone, Debug, Derivative)]
54#[derivative(Default)]
55pub struct ChunkedGelfDecoderOptions {
56 #[serde(default = "default_timeout_secs")]
59 #[derivative(Default(value = "default_timeout_secs()"))]
60 pub timeout_secs: f64,
61
62 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
67 pub pending_messages_limit: Option<usize>,
68
69 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
79 pub max_length: Option<usize>,
80
81 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
83 pub decompression: ChunkedGelfDecompressionConfig,
84}
85
86#[configurable_component]
88#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
89pub enum ChunkedGelfDecompressionConfig {
90 #[default]
92 Auto,
93 Gzip,
95 Zlib,
97 None,
99}
100
101impl ChunkedGelfDecompressionConfig {
102 pub fn get_decompression(&self, data: &Bytes) -> ChunkedGelfDecompression {
103 match self {
104 Self::Auto => ChunkedGelfDecompression::from_magic(data),
105 Self::Gzip => ChunkedGelfDecompression::Gzip,
106 Self::Zlib => ChunkedGelfDecompression::Zlib,
107 Self::None => ChunkedGelfDecompression::None,
108 }
109 }
110}
111
112#[derive(Debug)]
113struct MessageState {
114 total_chunks: u8,
115 chunks: [Bytes; GELF_MAX_TOTAL_CHUNKS as usize],
116 chunks_bitmap: u128,
117 current_length: usize,
118 timeout_task: JoinHandle<()>,
119}
120
121impl MessageState {
122 pub const fn new(total_chunks: u8, timeout_task: JoinHandle<()>) -> Self {
123 Self {
124 total_chunks,
125 chunks: [const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize],
126 chunks_bitmap: 0,
127 current_length: 0,
128 timeout_task,
129 }
130 }
131
132 fn is_chunk_present(&self, sequence_number: u8) -> bool {
133 let chunk_bitmap_id = 1 << sequence_number;
134 self.chunks_bitmap & chunk_bitmap_id != 0
135 }
136
137 fn add_chunk(&mut self, sequence_number: u8, chunk: Bytes) {
138 let chunk_bitmap_id = 1 << sequence_number;
139 self.chunks_bitmap |= chunk_bitmap_id;
140 self.current_length += chunk.remaining();
141 self.chunks[sequence_number as usize] = chunk;
142 }
143
144 fn is_complete(&self) -> bool {
145 self.chunks_bitmap.count_ones() == self.total_chunks as u32
146 }
147
148 fn current_length(&self) -> usize {
149 self.current_length
150 }
151
152 fn retrieve_message(&self) -> Option<Bytes> {
153 if self.is_complete() {
154 self.timeout_task.abort();
155 let chunks = &self.chunks[0..self.total_chunks as usize];
156 let mut message = BytesMut::new();
157 for chunk in chunks {
158 message.extend_from_slice(chunk);
159 }
160 Some(message.freeze())
161 } else {
162 None
163 }
164 }
165}
166
167#[derive(Debug, PartialEq, Eq)]
168pub enum ChunkedGelfDecompression {
169 Gzip,
170 Zlib,
171 None,
172}
173
174impl ChunkedGelfDecompression {
175 pub fn from_magic(data: &Bytes) -> Self {
176 if data.starts_with(GZIP_MAGIC) {
177 trace!("Detected Gzip compression");
178 return Self::Gzip;
179 }
180
181 if data.starts_with(ZLIB_MAGIC) {
182 if let Some([first_byte, second_byte]) = data.get(0..2)
184 && (*first_byte as u16 * 256 + *second_byte as u16).is_multiple_of(31)
185 {
186 trace!("Detected Zlib compression");
187 return Self::Zlib;
188 };
189
190 warn!(
191 "Detected Zlib magic bytes but the header is invalid: {:?}",
192 data.get(0..2)
193 );
194 };
195
196 trace!("No compression detected",);
197 Self::None
198 }
199
200 pub fn decompress(&self, data: Bytes) -> Result<Bytes, ChunkedGelfDecompressionError> {
201 let decompressed = match self {
202 Self::Gzip => {
203 let mut decoder = MultiGzDecoder::new(data.reader());
204 let mut decompressed = Vec::new();
205 decoder
206 .read_to_end(&mut decompressed)
207 .context(GzipDecompressionSnafu)?;
208 Bytes::from(decompressed)
209 }
210 Self::Zlib => {
211 let mut decoder = ZlibDecoder::new(data.reader());
212 let mut decompressed = Vec::new();
213 decoder
214 .read_to_end(&mut decompressed)
215 .context(ZlibDecompressionSnafu)?;
216 Bytes::from(decompressed)
217 }
218 Self::None => data,
219 };
220 Ok(decompressed)
221 }
222}
223
224#[derive(Debug, Snafu)]
225pub enum ChunkedGelfDecompressionError {
226 #[snafu(display("Gzip decompression error: {source}"))]
227 GzipDecompression { source: std::io::Error },
228 #[snafu(display("Zlib decompression error: {source}"))]
229 ZlibDecompression { source: std::io::Error },
230}
231
232#[derive(Debug, Snafu)]
233pub enum ChunkedGelfDecoderError {
234 #[snafu(display("Invalid chunk header with less than 10 bytes: 0x{header:0x}"))]
235 InvalidChunkHeader { header: Bytes },
236 #[snafu(display(
237 "Received chunk with message id {message_id} and sequence number {sequence_number} has an invalid total chunks value of {total_chunks}. It must be between 1 and {GELF_MAX_TOTAL_CHUNKS}."
238 ))]
239 InvalidTotalChunks {
240 message_id: u64,
241 sequence_number: u8,
242 total_chunks: u8,
243 },
244 #[snafu(display(
245 "Received chunk with message id {message_id} and sequence number {sequence_number} has a sequence number greater than its total chunks value of {total_chunks}"
246 ))]
247 InvalidSequenceNumber {
248 message_id: u64,
249 sequence_number: u8,
250 total_chunks: u8,
251 },
252 #[snafu(display(
253 "Pending messages limit of {pending_messages_limit} reached while processing chunk with message id {message_id} and sequence number {sequence_number}"
254 ))]
255 PendingMessagesLimitReached {
256 message_id: u64,
257 sequence_number: u8,
258 pending_messages_limit: usize,
259 },
260 #[snafu(display(
261 "Received chunk with message id {message_id} and sequence number {sequence_number} has different total chunks values: original total chunks value is {original_total_chunks} and received total chunks value is {received_total_chunks}"
262 ))]
263 TotalChunksMismatch {
264 message_id: u64,
265 sequence_number: u8,
266 original_total_chunks: u8,
267 received_total_chunks: u8,
268 },
269 #[snafu(display(
270 "Message with id {message_id} has exceeded the maximum message length and it will be dropped: got {length} bytes and max message length is {max_length} bytes. Discarding all buffered chunks of that message"
271 ))]
272 MaxLengthExceed {
273 message_id: u64,
274 sequence_number: u8,
275 length: usize,
276 max_length: usize,
277 },
278 #[snafu(display("Error while decompressing message. {source}"))]
279 Decompression {
280 source: ChunkedGelfDecompressionError,
281 },
282}
283
284impl StreamDecodingError for ChunkedGelfDecoderError {
285 fn can_continue(&self) -> bool {
286 true
287 }
288}
289
290impl FramingError for ChunkedGelfDecoderError {
291 fn as_any(&self) -> &dyn Any {
292 self as &dyn Any
293 }
294}
295
296#[derive(Debug, Clone)]
299pub struct ChunkedGelfDecoder {
300 bytes_decoder: BytesDecoder,
306 decompression_config: ChunkedGelfDecompressionConfig,
307 state: Arc<Mutex<HashMap<u64, MessageState>>>,
308 timeout: Duration,
309 pending_messages_limit: Option<usize>,
310 max_length: Option<usize>,
311}
312
313impl ChunkedGelfDecoder {
314 pub fn new(
316 timeout_secs: f64,
317 pending_messages_limit: Option<usize>,
318 max_length: Option<usize>,
319 decompression_config: ChunkedGelfDecompressionConfig,
320 ) -> Self {
321 Self {
322 bytes_decoder: BytesDecoder::new(),
323 decompression_config,
324 state: Arc::new(Mutex::new(HashMap::new())),
325 timeout: Duration::from_secs_f64(timeout_secs),
326 pending_messages_limit,
327 max_length,
328 }
329 }
330
331 pub fn decode_chunk(
333 &mut self,
334 mut chunk: Bytes,
335 ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
336 ensure!(
351 chunk.remaining() >= 10,
352 InvalidChunkHeaderSnafu { header: chunk }
353 );
354
355 let message_id = chunk.get_u64();
356 let sequence_number = chunk.get_u8();
357 let total_chunks = chunk.get_u8();
358
359 ensure!(
360 total_chunks > 0 && total_chunks <= GELF_MAX_TOTAL_CHUNKS,
361 InvalidTotalChunksSnafu {
362 message_id,
363 sequence_number,
364 total_chunks
365 }
366 );
367
368 ensure!(
369 sequence_number < total_chunks,
370 InvalidSequenceNumberSnafu {
371 message_id,
372 sequence_number,
373 total_chunks
374 }
375 );
376
377 let mut state_lock = self.state.lock().expect("poisoned lock");
378
379 if let Some(pending_messages_limit) = self.pending_messages_limit {
380 ensure!(
381 state_lock.len() < pending_messages_limit,
382 PendingMessagesLimitReachedSnafu {
383 message_id,
384 sequence_number,
385 pending_messages_limit
386 }
387 );
388 }
389
390 let message_state = state_lock.entry(message_id).or_insert_with(|| {
391 let state = Arc::clone(&self.state);
394 let timeout = self.timeout;
395 let timeout_handle = tokio::spawn(async move {
396 tokio::time::sleep(timeout).await;
397 let mut state_lock = state.lock().expect("poisoned lock");
398 if state_lock.remove(&message_id).is_some() {
399 warn!(
400 message_id = message_id,
401 timeout_secs = timeout.as_secs_f64(),
402 "Message was not fully received within the timeout window. Discarding it."
403 );
404 }
405 });
406 MessageState::new(total_chunks, timeout_handle)
407 });
408
409 ensure!(
410 message_state.total_chunks == total_chunks,
411 TotalChunksMismatchSnafu {
412 message_id,
413 sequence_number,
414 original_total_chunks: message_state.total_chunks,
415 received_total_chunks: total_chunks
416 }
417 );
418
419 if message_state.is_chunk_present(sequence_number) {
420 debug!(
421 message_id = message_id,
422 sequence_number = sequence_number,
423 "Received a duplicate chunk. Ignoring it."
424 );
425 return Ok(None);
426 }
427
428 message_state.add_chunk(sequence_number, chunk);
429
430 if let Some(max_length) = self.max_length {
431 let length = message_state.current_length();
432 if length > max_length {
433 state_lock.remove(&message_id);
434 return Err(ChunkedGelfDecoderError::MaxLengthExceed {
435 message_id,
436 sequence_number,
437 length,
438 max_length,
439 });
440 }
441 }
442
443 if let Some(message) = message_state.retrieve_message() {
444 state_lock.remove(&message_id);
445 Ok(Some(message))
446 } else {
447 Ok(None)
448 }
449 }
450
451 pub fn decode_message(
455 &mut self,
456 mut src: Bytes,
457 ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
458 let message = if src.starts_with(GELF_MAGIC) {
459 trace!("Received a chunked GELF message based on the magic bytes");
460 src.advance(2);
461 self.decode_chunk(src)?
462 } else {
463 trace!(
464 "Received an unchunked GELF message. First two bytes of message: {:?}",
465 &src[0..2]
466 );
467 Some(src)
468 };
469
470 message
472 .map(|message| {
473 self.decompression_config
474 .get_decompression(&message)
475 .decompress(message)
476 .context(DecompressionSnafu)
477 })
478 .transpose()
479 }
480}
481
482impl Default for ChunkedGelfDecoder {
483 fn default() -> Self {
484 Self::new(
485 DEFAULT_TIMEOUT_SECS,
486 None,
487 None,
488 ChunkedGelfDecompressionConfig::Auto,
489 )
490 }
491}
492
493impl Decoder for ChunkedGelfDecoder {
494 type Item = Bytes;
495
496 type Error = BoxedFramingError;
497
498 fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
499 if src.is_empty() {
500 return Ok(None);
501 }
502
503 Ok(self
504 .bytes_decoder
505 .decode(src)?
506 .and_then(|frame| self.decode_message(frame).transpose())
507 .transpose()?)
508 }
509 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
510 if buf.is_empty() {
511 return Ok(None);
512 }
513
514 Ok(self
515 .bytes_decoder
516 .decode_eof(buf)?
517 .and_then(|frame| self.decode_message(frame).transpose())
518 .transpose()?)
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use std::{fmt::Write as FmtWrite, io::Write as IoWrite};
525
526 use bytes::{BufMut, BytesMut};
527 use flate2::write::{GzEncoder, ZlibEncoder};
528 use rand::{SeedableRng, rngs::SmallRng, seq::SliceRandom};
529 use rstest::{fixture, rstest};
530 use tracing_test::traced_test;
531
532 use super::*;
533
534 pub enum Compression {
535 Gzip,
536 Zlib,
537 }
538
539 impl Compression {
540 pub fn compress(&self, payload: &impl AsRef<[u8]>) -> Bytes {
541 self.compress_with_level(payload, flate2::Compression::default())
542 }
543
544 pub fn compress_with_level(
545 &self,
546 payload: &impl AsRef<[u8]>,
547 level: flate2::Compression,
548 ) -> Bytes {
549 match self {
550 Compression::Gzip => {
551 let mut encoder = GzEncoder::new(Vec::new(), level);
552 encoder
553 .write_all(payload.as_ref())
554 .expect("failed to write to encoder");
555 encoder.finish().expect("failed to finish encoder").into()
556 }
557 Compression::Zlib => {
558 let mut encoder = ZlibEncoder::new(Vec::new(), level);
559 encoder
560 .write_all(payload.as_ref())
561 .expect("failed to write to encoder");
562 encoder.finish().expect("failed to finish encoder").into()
563 }
564 }
565 }
566 }
567
568 fn create_chunk(
569 message_id: u64,
570 sequence_number: u8,
571 total_chunks: u8,
572 payload: &impl AsRef<[u8]>,
573 ) -> BytesMut {
574 let mut chunk = BytesMut::new();
575 chunk.put_slice(GELF_MAGIC);
576 chunk.put_u64(message_id);
577 chunk.put_u8(sequence_number);
578 chunk.put_u8(total_chunks);
579 chunk.extend_from_slice(payload.as_ref());
580 chunk
581 }
582
583 #[fixture]
584 fn unchunked_message() -> (BytesMut, String) {
585 let payload = "foo";
586 (BytesMut::from(payload), payload.to_string())
587 }
588
589 #[fixture]
590 fn two_chunks_message() -> ([BytesMut; 2], String) {
591 let message_id = 1u64;
592 let total_chunks = 2u8;
593
594 let first_sequence_number = 0u8;
595 let first_payload = "foo";
596 let first_chunk = create_chunk(
597 message_id,
598 first_sequence_number,
599 total_chunks,
600 &first_payload,
601 );
602
603 let second_sequence_number = 1u8;
604 let second_payload = "bar";
605 let second_chunk = create_chunk(
606 message_id,
607 second_sequence_number,
608 total_chunks,
609 &second_payload,
610 );
611
612 (
613 [first_chunk, second_chunk],
614 format!("{first_payload}{second_payload}"),
615 )
616 }
617
618 #[fixture]
619 fn three_chunks_message() -> ([BytesMut; 3], String) {
620 let message_id = 2u64;
621 let total_chunks = 3u8;
622
623 let first_sequence_number = 0u8;
624 let first_payload = "foo";
625 let first_chunk = create_chunk(
626 message_id,
627 first_sequence_number,
628 total_chunks,
629 &first_payload,
630 );
631
632 let second_sequence_number = 1u8;
633 let second_payload = "bar";
634 let second_chunk = create_chunk(
635 message_id,
636 second_sequence_number,
637 total_chunks,
638 &second_payload,
639 );
640
641 let third_sequence_number = 2u8;
642 let third_payload = "baz";
643 let third_chunk = create_chunk(
644 message_id,
645 third_sequence_number,
646 total_chunks,
647 &third_payload,
648 );
649
650 (
651 [first_chunk, second_chunk, third_chunk],
652 format!("{first_payload}{second_payload}{third_payload}"),
653 )
654 }
655
656 fn downcast_framing_error(error: &BoxedFramingError) -> &ChunkedGelfDecoderError {
657 error
658 .as_any()
659 .downcast_ref::<ChunkedGelfDecoderError>()
660 .expect("Expected ChunkedGelfDecoderError to be downcasted")
661 }
662
663 #[rstest]
664 #[tokio::test]
665 async fn decode_chunked(two_chunks_message: ([BytesMut; 2], String)) {
666 let (mut chunks, expected_message) = two_chunks_message;
667 let mut decoder = ChunkedGelfDecoder::default();
668
669 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
670 assert!(frame.is_none());
671
672 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
673 assert_eq!(frame, Some(Bytes::from(expected_message)));
674 }
675
676 #[rstest]
677 #[tokio::test]
678 async fn decode_unchunked(unchunked_message: (BytesMut, String)) {
679 let (mut message, expected_message) = unchunked_message;
680 let mut decoder = ChunkedGelfDecoder::default();
681
682 let frame = decoder.decode_eof(&mut message).unwrap();
683 assert_eq!(frame, Some(Bytes::from(expected_message)));
684 }
685
686 #[rstest]
687 #[tokio::test]
688 async fn decode_unordered_chunks(two_chunks_message: ([BytesMut; 2], String)) {
689 let (mut chunks, expected_message) = two_chunks_message;
690 let mut decoder = ChunkedGelfDecoder::default();
691
692 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
693 assert!(frame.is_none());
694
695 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
696 assert_eq!(frame, Some(Bytes::from(expected_message)));
697 }
698
699 #[rstest]
700 #[tokio::test]
701 async fn decode_unordered_messages(
702 two_chunks_message: ([BytesMut; 2], String),
703 three_chunks_message: ([BytesMut; 3], String),
704 ) {
705 let (mut two_chunks, two_chunks_expected) = two_chunks_message;
706 let (mut three_chunks, three_chunks_expected) = three_chunks_message;
707 let mut decoder = ChunkedGelfDecoder::default();
708
709 let frame = decoder.decode_eof(&mut three_chunks[2]).unwrap();
710 assert!(frame.is_none());
711
712 let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
713 assert!(frame.is_none());
714
715 let frame = decoder.decode_eof(&mut three_chunks[0]).unwrap();
716 assert!(frame.is_none());
717
718 let frame = decoder.decode_eof(&mut two_chunks[1]).unwrap();
719 assert_eq!(frame, Some(Bytes::from(two_chunks_expected)));
720
721 let frame = decoder.decode_eof(&mut three_chunks[1]).unwrap();
722 assert_eq!(frame, Some(Bytes::from(three_chunks_expected)));
723 }
724
725 #[rstest]
726 #[tokio::test]
727 async fn decode_mixed_chunked_and_unchunked_messages(
728 unchunked_message: (BytesMut, String),
729 two_chunks_message: ([BytesMut; 2], String),
730 ) {
731 let (mut unchunked_message, expected_unchunked_message) = unchunked_message;
732 let (mut chunks, expected_chunked_message) = two_chunks_message;
733 let mut decoder = ChunkedGelfDecoder::default();
734
735 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
736 assert!(frame.is_none());
737
738 let frame = decoder.decode_eof(&mut unchunked_message).unwrap();
739 assert_eq!(frame, Some(Bytes::from(expected_unchunked_message)));
740
741 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
742 assert_eq!(frame, Some(Bytes::from(expected_chunked_message)));
743 }
744
745 #[tokio::test]
746 async fn decode_shuffled_messages() {
747 let mut rng = SmallRng::seed_from_u64(42);
748 let total_chunks = 100u8;
749 let first_message_id = 1u64;
750 let first_payload = "first payload";
751 let second_message_id = 2u64;
752 let second_payload = "second payload";
753 let first_message_chunks = (0..total_chunks).map(|sequence_number| {
754 create_chunk(
755 first_message_id,
756 sequence_number,
757 total_chunks,
758 &first_payload,
759 )
760 });
761 let second_message_chunks = (0..total_chunks).map(|sequence_number| {
762 create_chunk(
763 second_message_id,
764 sequence_number,
765 total_chunks,
766 &second_payload,
767 )
768 });
769 let expected_first_message = first_payload.repeat(total_chunks as usize);
770 let expected_second_message = second_payload.repeat(total_chunks as usize);
771 let mut merged_chunks = first_message_chunks
772 .chain(second_message_chunks)
773 .collect::<Vec<_>>();
774 merged_chunks.shuffle(&mut rng);
775 let mut decoder = ChunkedGelfDecoder::default();
776
777 let mut count = 0;
778 let first_retrieved_message = loop {
779 assert!(count < 2 * total_chunks as usize);
780 if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
781 break message;
782 } else {
783 count += 1;
784 }
785 };
786 let second_retrieved_message = loop {
787 assert!(count < 2 * total_chunks as usize);
788 if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
789 break message;
790 } else {
791 count += 1
792 }
793 };
794
795 assert_eq!(second_retrieved_message, expected_first_message);
796 assert_eq!(first_retrieved_message, expected_second_message);
797 }
798
799 #[rstest]
800 #[tokio::test(start_paused = true)]
801 #[traced_test]
802 async fn decode_timeout(two_chunks_message: ([BytesMut; 2], String)) {
803 let (mut chunks, _) = two_chunks_message;
804 let mut decoder = ChunkedGelfDecoder::default();
805
806 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
807 assert!(frame.is_none());
808 assert!(!decoder.state.lock().unwrap().is_empty());
809
810 tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
812 assert!(decoder.state.lock().unwrap().is_empty());
813 assert!(logs_contain(
814 "Message was not fully received within the timeout window. Discarding it."
815 ));
816
817 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
818 assert!(frame.is_none());
819
820 tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
821 assert!(decoder.state.lock().unwrap().is_empty());
822 assert!(logs_contain(
823 "Message was not fully received within the timeout window. Discarding it"
824 ));
825 }
826
827 #[tokio::test]
828 async fn decode_empty_input() {
829 let mut src = BytesMut::new();
830 let mut decoder = ChunkedGelfDecoder::default();
831
832 let frame = decoder.decode_eof(&mut src).unwrap();
833 assert!(frame.is_none());
834 }
835
836 #[tokio::test]
837 async fn decode_chunk_with_invalid_header() {
838 let mut src = BytesMut::new();
839 src.extend_from_slice(GELF_MAGIC);
840 let invalid_chunk = [0x12, 0x34];
842 src.extend_from_slice(&invalid_chunk);
843 let mut decoder = ChunkedGelfDecoder::default();
844 let frame = decoder.decode_eof(&mut src);
845
846 let error = frame.unwrap_err();
847 let downcasted_error = downcast_framing_error(&error);
848 assert!(matches!(
849 downcasted_error,
850 ChunkedGelfDecoderError::InvalidChunkHeader { .. }
851 ));
852 }
853
854 #[tokio::test]
855 async fn decode_chunk_with_invalid_total_chunks() {
856 let message_id = 1u64;
857 let sequence_number = 1u8;
858 let invalid_total_chunks = GELF_MAX_TOTAL_CHUNKS + 1;
859 let payload = "foo";
860 let mut chunk = create_chunk(message_id, sequence_number, invalid_total_chunks, &payload);
861 let mut decoder = ChunkedGelfDecoder::default();
862
863 let frame = decoder.decode_eof(&mut chunk);
864 let error = frame.unwrap_err();
865 let downcasted_error = downcast_framing_error(&error);
866 assert!(matches!(
867 downcasted_error,
868 ChunkedGelfDecoderError::InvalidTotalChunks {
869 message_id: 1,
870 sequence_number: 1,
871 total_chunks: 129,
872 }
873 ));
874 }
875
876 #[tokio::test]
877 async fn decode_chunk_with_invalid_sequence_number() {
878 let message_id = 1u64;
879 let total_chunks = 2u8;
880 let invalid_sequence_number = total_chunks + 1;
881 let payload = "foo";
882 let mut chunk = create_chunk(message_id, invalid_sequence_number, total_chunks, &payload);
883 let mut decoder = ChunkedGelfDecoder::default();
884
885 let frame = decoder.decode_eof(&mut chunk);
886 let error = frame.unwrap_err();
887 let downcasted_error = downcast_framing_error(&error);
888 assert!(matches!(
889 downcasted_error,
890 ChunkedGelfDecoderError::InvalidSequenceNumber {
891 message_id: 1,
892 sequence_number: 3,
893 total_chunks: 2,
894 }
895 ));
896 }
897
898 #[rstest]
899 #[tokio::test]
900 async fn decode_reached_pending_messages_limit(
901 two_chunks_message: ([BytesMut; 2], String),
902 three_chunks_message: ([BytesMut; 3], String),
903 ) {
904 let (mut two_chunks, _) = two_chunks_message;
905 let (mut three_chunks, _) = three_chunks_message;
906 let mut decoder = ChunkedGelfDecoder {
907 pending_messages_limit: Some(1),
908 ..Default::default()
909 };
910
911 let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
912 assert!(frame.is_none());
913 assert!(decoder.state.lock().unwrap().len() == 1);
914
915 let frame = decoder.decode_eof(&mut three_chunks[0]);
916 let error = frame.unwrap_err();
917 let downcasted_error = downcast_framing_error(&error);
918 assert!(matches!(
919 downcasted_error,
920 ChunkedGelfDecoderError::PendingMessagesLimitReached {
921 message_id: 2u64,
922 sequence_number: 0u8,
923 pending_messages_limit: 1,
924 }
925 ));
926 assert!(decoder.state.lock().unwrap().len() == 1);
927 }
928
929 #[rstest]
930 #[tokio::test]
931 async fn decode_chunk_with_different_total_chunks() {
932 let message_id = 1u64;
933 let sequence_number = 0u8;
934 let total_chunks = 2u8;
935 let payload = "foo";
936 let mut first_chunk = create_chunk(message_id, sequence_number, total_chunks, &payload);
937 let mut second_chunk =
938 create_chunk(message_id, sequence_number + 1, total_chunks + 1, &payload);
939 let mut decoder = ChunkedGelfDecoder::default();
940
941 let frame = decoder.decode_eof(&mut first_chunk).unwrap();
942 assert!(frame.is_none());
943
944 let frame = decoder.decode_eof(&mut second_chunk);
945 let error = frame.unwrap_err();
946 let downcasted_error = downcast_framing_error(&error);
947 assert!(matches!(
948 downcasted_error,
949 ChunkedGelfDecoderError::TotalChunksMismatch {
950 message_id: 1,
951 sequence_number: 1,
952 original_total_chunks: 2,
953 received_total_chunks: 3,
954 }
955 ));
956 }
957
958 #[rstest]
959 #[tokio::test]
960 async fn decode_message_greater_than_max_length(two_chunks_message: ([BytesMut; 2], String)) {
961 let (mut chunks, _) = two_chunks_message;
962 let mut decoder = ChunkedGelfDecoder {
963 max_length: Some(5),
964 ..Default::default()
965 };
966
967 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
968 assert!(frame.is_none());
969 let frame = decoder.decode_eof(&mut chunks[1]);
970 let error = frame.unwrap_err();
971 let downcasted_error = downcast_framing_error(&error);
972 assert!(matches!(
973 downcasted_error,
974 ChunkedGelfDecoderError::MaxLengthExceed {
975 message_id: 1,
976 sequence_number: 1,
977 length: 6,
978 max_length: 5,
979 }
980 ));
981 assert_eq!(decoder.state.lock().unwrap().len(), 0);
982 }
983
984 #[rstest]
985 #[tokio::test]
986 #[traced_test]
987 async fn decode_duplicated_chunk(two_chunks_message: ([BytesMut; 2], String)) {
988 let (mut chunks, _) = two_chunks_message;
989 let mut decoder = ChunkedGelfDecoder::default();
990
991 let frame = decoder.decode_eof(&mut chunks[0].clone()).unwrap();
992 assert!(frame.is_none());
993
994 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
995 assert!(frame.is_none());
996 assert!(logs_contain("Received a duplicate chunk. Ignoring it."));
997 }
998
999 #[tokio::test]
1000 #[rstest]
1001 #[case::gzip(Compression::Gzip)]
1002 #[case::zlib(Compression::Zlib)]
1003 async fn decode_compressed_unchunked_message(#[case] compression: Compression) {
1004 let payload = (0..100).fold(String::new(), |mut payload, n| {
1005 write!(payload, "foo{n}").unwrap();
1006 payload
1007 });
1008 let compressed_payload = compression.compress(&payload);
1009 let mut decoder = ChunkedGelfDecoder::default();
1010
1011 let frame = decoder
1012 .decode_eof(&mut compressed_payload.into())
1013 .expect("decoding should not fail")
1014 .expect("decoding should return a frame");
1015
1016 assert_eq!(frame, payload);
1017 }
1018
1019 #[tokio::test]
1020 #[rstest]
1021 #[case::gzip(Compression::Gzip)]
1022 #[case::zlib(Compression::Zlib)]
1023 async fn decode_compressed_chunked_message(#[case] compression: Compression) {
1024 let message_id = 1u64;
1025 let max_chunk_size = 5;
1026 let payload = (0..100).fold(String::new(), |mut payload, n| {
1027 write!(payload, "foo{n}").unwrap();
1028 payload
1029 });
1030 let compressed_payload = compression.compress(&payload);
1031 let total_chunks = compressed_payload.len().div_ceil(max_chunk_size) as u8;
1032 assert!(total_chunks < GELF_MAX_TOTAL_CHUNKS);
1033 let mut chunks = compressed_payload
1034 .chunks(max_chunk_size)
1035 .enumerate()
1036 .map(|(i, chunk)| create_chunk(message_id, i as u8, total_chunks, &chunk))
1037 .collect::<Vec<_>>();
1038 let (last_chunk, first_chunks) =
1039 chunks.split_last_mut().expect("chunks should not be empty");
1040 let mut decoder = ChunkedGelfDecoder::default();
1041
1042 for chunk in first_chunks {
1043 let frame = decoder.decode_eof(chunk).expect("decoding should not fail");
1044 assert!(frame.is_none());
1045 }
1046 let frame = decoder
1047 .decode_eof(last_chunk)
1048 .expect("decoding should not fail")
1049 .expect("decoding should return a frame");
1050
1051 assert_eq!(frame, payload);
1052 }
1053
1054 #[tokio::test]
1055 async fn decode_malformed_gzip_message() {
1056 let mut compressed_payload = BytesMut::new();
1057 compressed_payload.extend(GZIP_MAGIC);
1058 compressed_payload.extend(&[0x12, 0x34, 0x56, 0x78]);
1059 let mut decoder = ChunkedGelfDecoder::default();
1060
1061 let error = decoder
1062 .decode_eof(&mut compressed_payload)
1063 .expect_err("decoding should fail");
1064
1065 let downcasted_error = downcast_framing_error(&error);
1066 assert!(matches!(
1067 downcasted_error,
1068 ChunkedGelfDecoderError::Decompression {
1069 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1070 }
1071 ));
1072 }
1073
1074 #[tokio::test]
1075 async fn decode_malformed_zlib_message() {
1076 let mut compressed_payload = BytesMut::new();
1077 compressed_payload.extend(ZLIB_MAGIC);
1078 compressed_payload.extend(&[0x9c, 0x12, 0x00, 0xFF]);
1079 let mut decoder = ChunkedGelfDecoder::default();
1080
1081 let error = decoder
1082 .decode_eof(&mut compressed_payload)
1083 .expect_err("decoding should fail");
1084
1085 let downcasted_error = downcast_framing_error(&error);
1086 assert!(matches!(
1087 downcasted_error,
1088 ChunkedGelfDecoderError::Decompression {
1089 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1090 }
1091 ));
1092 }
1093
1094 #[tokio::test]
1095 async fn decode_zlib_payload_with_zlib_decoder() {
1096 let payload = "foo";
1097 let compressed_payload = Compression::Zlib.compress(&payload);
1098 let mut decoder = ChunkedGelfDecoder {
1099 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1100 ..Default::default()
1101 };
1102
1103 let frame = decoder
1104 .decode_eof(&mut compressed_payload.into())
1105 .expect("decoding should not fail")
1106 .expect("decoding should return a frame");
1107
1108 assert_eq!(frame, payload);
1109 }
1110
1111 #[tokio::test]
1112 async fn decode_gzip_payload_with_zlib_decoder() {
1113 let payload = "foo";
1114 let compressed_payload = Compression::Gzip.compress(&payload);
1115 let mut decoder = ChunkedGelfDecoder {
1116 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1117 ..Default::default()
1118 };
1119
1120 let error = decoder
1121 .decode_eof(&mut compressed_payload.into())
1122 .expect_err("decoding should fail");
1123
1124 let downcasted_error = downcast_framing_error(&error);
1125 assert!(matches!(
1126 downcasted_error,
1127 ChunkedGelfDecoderError::Decompression {
1128 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1129 }
1130 ));
1131 }
1132
1133 #[tokio::test]
1134 async fn decode_uncompressed_payload_with_zlib_decoder() {
1135 let payload = "foo";
1136 let mut decoder = ChunkedGelfDecoder {
1137 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1138 ..Default::default()
1139 };
1140
1141 let error = decoder
1142 .decode_eof(&mut payload.into())
1143 .expect_err("decoding should fail");
1144
1145 let downcasted_error = downcast_framing_error(&error);
1146 assert!(matches!(
1147 downcasted_error,
1148 ChunkedGelfDecoderError::Decompression {
1149 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1150 }
1151 ));
1152 }
1153
1154 #[tokio::test]
1155 async fn decode_gzip_payload_with_gzip_decoder() {
1156 let payload = "foo";
1157 let compressed_payload = Compression::Gzip.compress(&payload);
1158 let mut decoder = ChunkedGelfDecoder {
1159 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1160 ..Default::default()
1161 };
1162
1163 let frame = decoder
1164 .decode_eof(&mut compressed_payload.into())
1165 .expect("decoding should not fail")
1166 .expect("decoding should return a frame");
1167
1168 assert_eq!(frame, payload);
1169 }
1170
1171 #[tokio::test]
1172 async fn decode_zlib_payload_with_gzip_decoder() {
1173 let payload = "foo";
1174 let compressed_payload = Compression::Zlib.compress(&payload);
1175 let mut decoder = ChunkedGelfDecoder {
1176 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1177 ..Default::default()
1178 };
1179
1180 let error = decoder
1181 .decode_eof(&mut compressed_payload.into())
1182 .expect_err("decoding should fail");
1183
1184 let downcasted_error = downcast_framing_error(&error);
1185 assert!(matches!(
1186 downcasted_error,
1187 ChunkedGelfDecoderError::Decompression {
1188 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1189 }
1190 ));
1191 }
1192
1193 #[tokio::test]
1194 async fn decode_uncompressed_payload_with_gzip_decoder() {
1195 let payload = "foo";
1196 let mut decoder = ChunkedGelfDecoder {
1197 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1198 ..Default::default()
1199 };
1200
1201 let error = decoder
1202 .decode_eof(&mut payload.into())
1203 .expect_err("decoding should fail");
1204
1205 let downcasted_error = downcast_framing_error(&error);
1206 assert!(matches!(
1207 downcasted_error,
1208 ChunkedGelfDecoderError::Decompression {
1209 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1210 }
1211 ));
1212 }
1213
1214 #[tokio::test]
1215 #[rstest]
1216 #[case::gzip(Compression::Gzip)]
1217 #[case::zlib(Compression::Zlib)]
1218 async fn decode_compressed_payload_with_no_decompression_decoder(
1219 #[case] compression: Compression,
1220 ) {
1221 let payload = "foo";
1222 let compressed_payload = compression.compress(&payload);
1223 let mut decoder = ChunkedGelfDecoder {
1224 decompression_config: ChunkedGelfDecompressionConfig::None,
1225 ..Default::default()
1226 };
1227
1228 let frame = decoder
1229 .decode_eof(&mut compressed_payload.clone().into())
1230 .expect("decoding should not fail")
1231 .expect("decoding should return a frame");
1232
1233 assert_eq!(frame, compressed_payload);
1234 }
1235
1236 #[test]
1237 fn detect_gzip_compression() {
1238 let payload = "foo";
1239
1240 for level in 0..=9 {
1241 let level = flate2::Compression::new(level);
1242 let compressed_payload = Compression::Gzip.compress_with_level(&payload, level);
1243 let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1244 assert_eq!(
1245 actual,
1246 ChunkedGelfDecompression::Gzip,
1247 "Failed for level {}",
1248 level.level()
1249 );
1250 }
1251 }
1252
1253 #[test]
1254 fn detect_zlib_compression() {
1255 let payload = "foo";
1256
1257 for level in 0..=9 {
1258 let level = flate2::Compression::new(level);
1259 let compressed_payload = Compression::Zlib.compress_with_level(&payload, level);
1260 let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1261 assert_eq!(
1262 actual,
1263 ChunkedGelfDecompression::Zlib,
1264 "Failed for level {}",
1265 level.level()
1266 );
1267 }
1268 }
1269
1270 #[test]
1271 fn detect_no_compression() {
1272 let payload = "foo";
1273
1274 let detected_compression = ChunkedGelfDecompression::from_magic(&payload.into());
1275
1276 assert_eq!(detected_compression, ChunkedGelfDecompression::None);
1277 }
1278}