1use std::{
2 collections::HashMap,
3 convert::Infallible,
4 io::Read,
5 net::{Ipv4Addr, SocketAddr},
6 sync::Arc,
7 time::Duration,
8};
9
10use bytes::{Buf, Bytes, BytesMut};
11use chrono::{DateTime, TimeZone, Utc};
12use flate2::read::MultiGzDecoder;
13use futures::FutureExt;
14use http::StatusCode;
15use hyper::{Server, service::make_service_fn};
16use serde::{Serialize, de::DeserializeOwned};
17use serde_json::{
18 Deserializer, Value as JsonValue,
19 de::{Read as JsonRead, StrRead},
20};
21use snafu::Snafu;
22use tokio::net::TcpStream;
23use tokio_util::codec::Decoder as _;
24use tower::ServiceBuilder;
25use tracing::Span;
26use vector_lib::{
27 EstimatedJsonEncodedSizeOf,
28 codecs::{
29 Decoder, StreamDecodingError,
30 decoding::{DeserializerConfig, FramingConfig},
31 },
32 config::{LegacyKey, LogNamespace},
33 configurable::configurable_component,
34 event::{BatchNotifier, BatchStatusReceiver, EventMetadata},
35 internal_event::{
36 ComponentEventsDropped, CountByteSize, InternalEventHandle as _, Registered, UNINTENTIONAL,
37 },
38 lookup::{
39 self, OwnedValuePath, event_path, lookup_v2::OptionalValuePath, metadata_path,
40 owned_value_path,
41 },
42 schema::meaning,
43 sensitive_string::SensitiveString,
44 source_sender::SendError,
45 tls::MaybeTlsIncomingStream,
46};
47use vrl::{
48 path::{OwnedTargetPath, PathPrefix, ValuePath as _},
49 value::{Kind, kind::Collection},
50};
51use warp::{
52 Filter, Reply,
53 filters::BoxedFilter,
54 http::header::{CONTENT_TYPE, HeaderValue},
55 path,
56 reject::Rejection,
57 reply::Response,
58};
59
60use self::{
61 acknowledgements::{
62 HecAckStatusRequest, HecAckStatusResponse, HecAcknowledgementsConfig,
63 IndexerAcknowledgement,
64 },
65 splunk_response::{HecResponse, HecResponseMetadata, HecStatusCode},
66};
67use crate::{
68 SourceSender,
69 codecs::DecodingConfig,
70 config::{DataType, Resource, SourceConfig, SourceContext, SourceOutput, log_schema},
71 event::{Event, LogEvent, Value},
72 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
73 internal_events::{
74 EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError,
75 },
76 serde::bool_or_struct,
77 tls::{MaybeTlsSettings, TlsEnableableConfig},
78};
79
80mod acknowledgements;
81
82pub const CHANNEL: &str = "splunk_channel";
84pub const INDEX: &str = "splunk_index";
85pub const SOURCE: &str = "splunk_source";
86pub const SOURCETYPE: &str = "splunk_sourcetype";
87
88const X_SPLUNK_REQUEST_CHANNEL: &str = "x-splunk-request-channel";
89
90#[configurable_component(source("splunk_hec", "Receive logs from Splunk."))]
92#[derive(Clone, Debug)]
93#[serde(deny_unknown_fields, default)]
94pub struct SplunkConfig {
95 #[serde(default = "default_socket_address")]
99 pub address: SocketAddr,
100
101 #[configurable(deprecated = "This option has been deprecated, use `valid_tokens` instead.")]
108 token: Option<SensitiveString>,
109
110 #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
117 valid_tokens: Option<Vec<SensitiveString>>,
118
119 store_hec_token: bool,
124
125 #[configurable(derived)]
126 tls: Option<TlsEnableableConfig>,
127
128 #[configurable(derived)]
129 #[serde(deserialize_with = "bool_or_struct")]
130 acknowledgements: HecAcknowledgementsConfig,
131
132 #[configurable(metadata(docs::hidden))]
134 #[serde(default)]
135 log_namespace: Option<bool>,
136
137 #[configurable(derived)]
138 #[serde(default)]
139 keepalive: KeepaliveConfig,
140
141 #[configurable(derived)]
152 #[configurable(metadata(docs::advanced))]
153 #[serde(default)]
154 pub event: CodecConfig,
155
156 #[configurable(derived)]
163 #[configurable(metadata(docs::advanced))]
164 #[serde(default)]
165 pub raw: CodecConfig,
166}
167
168#[configurable_component]
170#[derive(Clone, Debug, Default)]
171#[serde(deny_unknown_fields, default)]
172pub struct CodecConfig {
173 #[configurable(derived)]
178 #[configurable(metadata(docs::advanced))]
179 #[serde(default)]
180 pub framing: Option<FramingConfig>,
181
182 #[configurable(derived)]
189 #[configurable(metadata(docs::advanced))]
190 #[serde(default)]
191 pub decoding: Option<DeserializerConfig>,
192}
193
194impl CodecConfig {
195 fn build_decoder(&self, log_namespace: LogNamespace) -> crate::Result<Option<Decoder>> {
196 match &self.decoding {
197 Some(decoding) => {
198 let framing = self
199 .framing
200 .clone()
201 .unwrap_or_else(|| decoding.default_message_based_framing());
202 Ok(Some(
203 DecodingConfig::new(framing, decoding.clone(), log_namespace).build()?,
204 ))
205 }
206 None => Ok(None),
207 }
208 }
209}
210
211impl_generate_config_from_default!(SplunkConfig);
212
213impl Default for SplunkConfig {
214 fn default() -> Self {
215 SplunkConfig {
216 address: default_socket_address(),
217 token: None,
218 valid_tokens: None,
219 tls: None,
220 acknowledgements: Default::default(),
221 store_hec_token: false,
222 log_namespace: None,
223 keepalive: Default::default(),
224 event: CodecConfig::default(),
225 raw: CodecConfig::default(),
226 }
227 }
228}
229
230fn default_socket_address() -> SocketAddr {
231 SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 8088)
232}
233
234#[async_trait::async_trait]
235#[typetag::serde(name = "splunk_hec")]
236impl SourceConfig for SplunkConfig {
237 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
238 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
239 let shutdown = cx.shutdown.clone();
240 let out = cx.out.clone();
241 let log_namespace = cx.log_namespace(self.log_namespace);
242 let event_decoder = self.event.build_decoder(log_namespace)?;
243 let raw_decoder = self.raw.build_decoder(log_namespace)?;
244 let source = SplunkSource::new(
245 self,
246 tls.http_protocol_name(),
247 event_decoder,
248 raw_decoder,
249 cx,
250 );
251
252 let event_service = source.event_service(out.clone());
253 let raw_service = source.raw_service(out);
254 let health_service = source.health_service();
255 let ack_service = source.ack_service();
256 let options = SplunkSource::options();
257
258 let services = path!("services" / "collector" / ..)
259 .and(
260 event_service
261 .or(raw_service)
262 .unify()
263 .or(health_service)
264 .unify()
265 .or(ack_service)
266 .unify()
267 .or(options)
268 .unify(),
269 )
270 .or_else(finish_err);
271
272 let listener = tls.bind(&self.address).await?;
273
274 let keepalive_settings = self.keepalive.clone();
275 Ok(Box::pin(async move {
276 let span = Span::current();
277 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
278 let svc = ServiceBuilder::new()
279 .layer(build_http_trace_layer(span.clone()))
280 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
281 MaxConnectionAgeLayer::new(
282 Duration::from_secs(secs),
283 keepalive_settings.max_connection_age_jitter_factor,
284 conn.peer_addr(),
285 )
286 }))
287 .service(warp::service(services.clone()));
288 futures_util::future::ok::<_, Infallible>(svc)
289 });
290
291 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
292 .serve(make_svc)
293 .with_graceful_shutdown(shutdown.map(|_| ()))
294 .await
295 .map_err(|err| {
296 error!("An error occurred: {:?}.", err);
297 })?;
298
299 Ok(())
300 }))
301 }
302
303 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
304 let log_namespace = global_log_namespace.merge(self.log_namespace);
305
306 let legacy_base = || match log_namespace {
311 LogNamespace::Legacy => {
312 let definition = vector_lib::schema::Definition::empty_legacy_namespace()
313 .with_event_field(
314 &owned_value_path!("line"),
315 Kind::object(Collection::empty())
316 .or_array(Collection::empty())
317 .or_undefined(),
318 None,
319 );
320
321 if let Some(message_key) = log_schema().message_key() {
322 definition.with_event_field(
323 message_key,
324 Kind::bytes().or_undefined(),
325 Some(meaning::MESSAGE),
326 )
327 } else {
328 definition
329 }
330 }
331 LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
332 Kind::bytes().or_object(Collection::empty()),
333 [log_namespace],
334 )
335 .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
336 };
337
338 let endpoint_base = |decoding: &Option<DeserializerConfig>| match decoding {
339 Some(decoding) => decoding.schema_definition(log_namespace),
340 None => legacy_base(),
341 };
342
343 let splunk_legacy_key = |path: OwnedValuePath, has_decoder: bool| {
344 if has_decoder {
345 LegacyKey::InsertIfEmpty(path)
346 } else {
347 LegacyKey::Overwrite(path)
348 }
349 };
350
351 let add_common_metadata = |definition: vector_lib::schema::Definition| {
352 definition
353 .with_standard_vector_source_metadata()
354 .with_source_metadata(
355 SplunkConfig::NAME,
356 log_schema()
357 .host_key()
358 .cloned()
359 .map(LegacyKey::InsertIfEmpty),
360 &owned_value_path!("host"),
361 Kind::bytes(),
362 Some(meaning::HOST),
363 )
364 };
365
366 let add_channel_metadata = |definition: vector_lib::schema::Definition,
367 has_decoder: bool| {
368 definition.with_source_metadata(
369 SplunkConfig::NAME,
370 Some(splunk_legacy_key(owned_value_path!(CHANNEL), has_decoder)),
371 &owned_value_path!("channel"),
372 Kind::bytes(),
373 None,
374 )
375 };
376
377 let event_has_decoder = self.event.decoding.is_some();
378 let raw_has_decoder = self.raw.decoding.is_some();
379
380 let merged_base = add_common_metadata(
386 endpoint_base(&self.event.decoding).merge(endpoint_base(&self.raw.decoding)),
387 );
388
389 let channel_has_decoder = event_has_decoder && raw_has_decoder;
393 let schema_definition = add_channel_metadata(
394 merged_base
395 .with_source_metadata(
396 SplunkConfig::NAME,
397 Some(splunk_legacy_key(
398 owned_value_path!(INDEX),
399 event_has_decoder,
400 )),
401 &owned_value_path!("index"),
402 Kind::bytes(),
403 None,
404 )
405 .with_source_metadata(
406 SplunkConfig::NAME,
407 Some(splunk_legacy_key(
408 owned_value_path!(SOURCE),
409 event_has_decoder,
410 )),
411 &owned_value_path!("source"),
412 Kind::bytes(),
413 Some(meaning::SERVICE),
414 )
415 .with_source_metadata(
417 SplunkConfig::NAME,
418 Some(splunk_legacy_key(
419 owned_value_path!(SOURCETYPE),
420 event_has_decoder,
421 )),
422 &owned_value_path!("sourcetype"),
423 Kind::bytes(),
424 None,
425 ),
426 channel_has_decoder,
427 );
428
429 let output_type = match (&self.event.decoding, &self.raw.decoding) {
433 (None, None) => DataType::Log,
434 (Some(d), None) | (None, Some(d)) => d.output_type() | DataType::Log,
435 (Some(de), Some(dr)) => de.output_type() | dr.output_type(),
436 };
437 vec![SourceOutput::new_maybe_logs(output_type, schema_definition)]
438 }
439
440 fn resources(&self) -> Vec<Resource> {
441 vec![Resource::tcp(self.address)]
442 }
443
444 fn can_acknowledge(&self) -> bool {
445 true
446 }
447}
448
449struct SplunkSource {
451 valid_credentials: Vec<String>,
452 protocol: &'static str,
453 idx_ack: Option<Arc<IndexerAcknowledgement>>,
454 store_hec_token: bool,
455 log_namespace: LogNamespace,
456 events_received: Registered<EventsReceived>,
457 event_decoder: Option<Decoder>,
458 raw_decoder: Option<Decoder>,
459}
460
461impl SplunkSource {
462 fn new(
463 config: &SplunkConfig,
464 protocol: &'static str,
465 event_decoder: Option<Decoder>,
466 raw_decoder: Option<Decoder>,
467 cx: SourceContext,
468 ) -> Self {
469 let log_namespace = cx.log_namespace(config.log_namespace);
470 let acknowledgements = cx.do_acknowledgements(config.acknowledgements.enabled.into());
471 let shutdown = cx.shutdown;
472 let valid_tokens = config
473 .valid_tokens
474 .iter()
475 .flatten()
476 .chain(config.token.iter());
477
478 let idx_ack = acknowledgements.then(|| {
479 Arc::new(IndexerAcknowledgement::new(
480 config.acknowledgements.clone(),
481 shutdown,
482 ))
483 });
484
485 SplunkSource {
486 valid_credentials: valid_tokens
487 .map(|token| format!("Splunk {}", token.inner()))
488 .collect(),
489 protocol,
490 idx_ack,
491 store_hec_token: config.store_hec_token,
492 log_namespace,
493 events_received: register!(EventsReceived),
494 event_decoder,
495 raw_decoder,
496 }
497 }
498
499 fn event_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
500 let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
501 .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
502 let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
503
504 let splunk_channel = splunk_channel_header
505 .and(splunk_channel_query_param)
506 .map(|header: Option<String>, query_param| header.or(query_param));
507
508 let protocol = self.protocol;
509 let idx_ack = self.idx_ack.clone();
510 let store_hec_token = self.store_hec_token;
511 let log_namespace = self.log_namespace;
512 let events_received = self.events_received.clone();
513 let decoder = self.event_decoder.clone();
514
515 warp::post()
516 .and(
517 path!("event")
518 .or(path!("event" / "1.0"))
519 .or(warp::path::end()),
520 )
521 .and(self.authorization())
522 .and(splunk_channel)
523 .and(warp::addr::remote())
524 .and(warp::header::optional::<String>("X-Forwarded-For"))
525 .and(self.gzip())
526 .and(warp::body::bytes())
527 .and(warp::path::full())
528 .and_then(
529 move |_,
530 token: Option<String>,
531 channel: Option<String>,
532 remote: Option<SocketAddr>,
533 remote_addr: Option<String>,
534 gzip: bool,
535 body: Bytes,
536 path: warp::path::FullPath| {
537 let mut out = out.clone();
538 let idx_ack = idx_ack.clone();
539 let events_received = events_received.clone();
540 let decoder = decoder.clone();
541
542 async move {
543 if idx_ack.is_some() && channel.is_none() {
544 return Err(Rejection::from(ApiError::MissingChannel));
545 }
546
547 let mut data = Vec::new();
548 let (byte_size, body) = if gzip {
549 MultiGzDecoder::new(body.reader())
550 .read_to_end(&mut data)
551 .map_err(|_| Rejection::from(ApiError::BadRequest))?;
552 (data.len(), String::from_utf8_lossy(data.as_slice()))
553 } else {
554 (body.len(), String::from_utf8_lossy(body.as_ref()))
555 };
556 emit!(HttpBytesReceived {
557 byte_size,
558 http_path: path.as_str(),
559 protocol,
560 });
561
562 let (batch, mut receiver) =
563 BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
564 let decoder_in_use = decoder.is_some();
565
566 let mut maybe_ack_id = None;
571 if !decoder_in_use {
572 maybe_ack_id =
573 register_ack(idx_ack.clone(), receiver.take(), channel.clone())
574 .await?;
575 }
576
577 let mut error = None;
578 let mut events = Vec::new();
579 let mut had_decode_errors = false;
580
581 let iter: EventIterator<'_, StrRead<'_>> = EventIteratorGenerator {
582 deserializer: Deserializer::from_str(&body).into_iter::<JsonValue>(),
583 channel: channel.clone(),
584 remote,
585 remote_addr,
586 batch,
587 token: token.filter(|_| store_hec_token).map(Into::into),
588 log_namespace,
589 events_received,
590 decoder,
591 }
592 .into();
593
594 for result in iter {
595 match result {
596 Ok((chunk, errored)) => {
597 events.extend(chunk);
598 had_decode_errors |= errored;
599 }
600 Err(err) => {
601 error = Some(err);
602 break;
603 }
604 }
605 }
606
607 if decoder_in_use {
614 maybe_ack_id =
615 if events.is_empty() || had_decode_errors || error.is_some() {
616 drop(receiver);
617 None
618 } else {
619 register_ack(idx_ack, receiver, channel).await?
620 };
621 }
622
623 if !events.is_empty() {
624 match out.send_batch(events).await {
625 Ok(()) => (),
626 Err(SendError::Closed) => {
627 return Err(Rejection::from(ApiError::ServerShutdown));
628 }
629 Err(SendError::Timeout) => {
630 unreachable!("No timeout is configured for this source.")
631 }
632 }
633 }
634
635 if let Some(error) = error {
636 Err(error)
637 } else {
638 Ok(maybe_ack_id)
639 }
640 }
641 },
642 )
643 .map(finish_ok)
644 .boxed()
645 }
646
647 fn raw_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
648 let protocol = self.protocol;
649 let idx_ack = self.idx_ack.clone();
650 let store_hec_token = self.store_hec_token;
651 let events_received = self.events_received.clone();
652 let log_namespace = self.log_namespace;
653 let decoder = self.raw_decoder.clone();
654
655 warp::post()
656 .and(path!("raw" / "1.0").or(path!("raw")))
657 .and(self.authorization())
658 .and(SplunkSource::required_channel())
659 .and(warp::addr::remote())
660 .and(warp::header::optional::<String>("X-Forwarded-For"))
661 .and(self.gzip())
662 .and(warp::body::bytes())
663 .and(warp::path::full())
664 .and_then(
665 move |_,
666 token: Option<String>,
667 channel_id: String,
668 remote: Option<SocketAddr>,
669 xff: Option<String>,
670 gzip: bool,
671 body: Bytes,
672 path: warp::path::FullPath| {
673 let mut out = out.clone();
674 let idx_ack = idx_ack.clone();
675 let events_received = events_received.clone();
676 let decoder = decoder.clone();
677 emit!(HttpBytesReceived {
678 byte_size: body.len(),
679 http_path: path.as_str(),
680 protocol,
681 });
682
683 async move {
684 let (batch, receiver) =
685 BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
686
687 let Some(decoder) = decoder else {
692 let maybe_ack_id =
693 register_ack(idx_ack, receiver, Some(channel_id.clone())).await?;
694 let (mut events, _) = raw_event(
695 body,
696 gzip,
697 channel_id,
698 remote,
699 xff,
700 batch,
701 log_namespace,
702 &events_received,
703 None,
704 None,
705 )?;
706 let mut event = events.pop().expect(
709 "raw_event always produces a single event when no decoder is set",
710 );
711 if let Some(token) = token.filter(|_| store_hec_token) {
712 event.metadata_mut().set_splunk_hec_token(token.into());
713 }
714 let res = out.send_event(event).await;
715 return res
716 .map(|_| maybe_ack_id)
717 .map_err(|_| Rejection::from(ApiError::ServerShutdown));
718 };
719
720 let token: Option<Arc<str>> =
724 token.filter(|_| store_hec_token).map(Arc::from);
725 let (events, had_decode_errors) = raw_event(
726 body,
727 gzip,
728 channel_id.clone(),
729 remote,
730 xff,
731 batch,
732 log_namespace,
733 &events_received,
734 Some(decoder),
735 token,
736 )?;
737
738 if events.is_empty() || had_decode_errors {
739 drop(receiver);
744 if events.is_empty() {
745 return Ok(None);
746 }
747 let res = out.send_batch(events).await;
750 return res
751 .map(|_| None)
752 .map_err(|_| Rejection::from(ApiError::ServerShutdown));
753 }
754
755 let maybe_ack_id =
756 register_ack(idx_ack, receiver, Some(channel_id)).await?;
757
758 let res = out.send_batch(events).await;
759 res.map(|_| maybe_ack_id)
760 .map_err(|_| Rejection::from(ApiError::ServerShutdown))
761 }
762 },
763 )
764 .map(finish_ok)
765 .boxed()
766 }
767
768 fn health_service(&self) -> BoxedFilter<(Response,)> {
769 warp::get()
776 .and(path!("health" / "1.0").or(path!("health")))
777 .map(move |_| {
778 http::Response::builder()
779 .header(http::header::CONTENT_TYPE, "application/json")
780 .body(hyper::Body::from(r#"{"text":"HEC is healthy","code":17}"#))
781 .expect("static response")
782 })
783 .boxed()
784 }
785
786 fn lenient_json_content_type_check<T>() -> impl Filter<Extract = (T,), Error = Rejection> + Clone
787 where
788 T: Send + DeserializeOwned + 'static,
789 {
790 warp::header::optional::<HeaderValue>(CONTENT_TYPE.as_str())
791 .and(warp::body::bytes())
792 .and_then(
793 |ctype: Option<HeaderValue>, body: bytes::Bytes| async move {
794 let ok = ctype
795 .as_ref()
796 .and_then(|v| v.to_str().ok())
797 .map(|h| h.to_ascii_lowercase().contains("application/json"))
798 .unwrap_or(true);
799
800 if !ok {
801 return Err(warp::reject::custom(ApiError::UnsupportedContentType));
802 }
803
804 let value = serde_json::from_slice::<T>(&body)
805 .map_err(|_| warp::reject::custom(ApiError::BadRequest))?;
806
807 Ok(value)
808 },
809 )
810 }
811
812 fn ack_service(&self) -> BoxedFilter<(Response,)> {
813 let idx_ack = self.idx_ack.clone();
814
815 warp::post()
816 .and(warp::path!("ack"))
817 .and(self.authorization())
818 .and(SplunkSource::required_channel())
819 .and(Self::lenient_json_content_type_check::<HecAckStatusRequest>())
820 .and_then(move |_, channel: String, req: HecAckStatusRequest| {
821 let idx_ack = idx_ack.clone();
822 async move {
823 if let Some(idx_ack) = idx_ack {
824 let acks = idx_ack
825 .get_acks_status_from_channel(channel, &req.acks)
826 .await?;
827 Ok(warp::reply::json(&HecAckStatusResponse { acks }).into_response())
828 } else {
829 Err(warp::reject::custom(ApiError::AckIsDisabled))
830 }
831 }
832 })
833 .boxed()
834 }
835
836 fn options() -> BoxedFilter<(Response,)> {
837 let post = warp::options()
838 .and(
839 path!("event")
840 .or(path!("event" / "1.0"))
841 .or(path!("raw" / "1.0"))
842 .or(path!("raw")),
843 )
844 .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
845
846 let get = warp::options()
847 .and(path!("health").or(path!("health" / "1.0")))
848 .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
849
850 post.or(get).unify().boxed()
851 }
852
853 fn authorization(&self) -> BoxedFilter<(Option<String>,)> {
855 let valid_credentials = self.valid_credentials.clone();
856 warp::header::optional("Authorization")
857 .and_then(move |token: Option<String>| {
858 let valid_credentials = valid_credentials.clone();
859 async move {
860 match (token, valid_credentials.is_empty()) {
861 (token, true) => {
864 Ok(token
865 .map(|t| t.strip_prefix("Splunk ").map(Into::into).unwrap_or(t)))
866 }
867 (Some(token), false) if valid_credentials.contains(&token) => Ok(Some(
868 token
869 .strip_prefix("Splunk ")
870 .map(Into::into)
871 .unwrap_or(token),
872 )),
873 (Some(_), false) => Err(Rejection::from(ApiError::InvalidAuthorization)),
874 (None, false) => Err(Rejection::from(ApiError::MissingAuthorization)),
875 }
876 }
877 })
878 .boxed()
879 }
880
881 fn gzip(&self) -> BoxedFilter<(bool,)> {
883 warp::header::optional::<String>("Content-Encoding")
884 .and_then(|encoding: Option<String>| async move {
885 match encoding {
886 Some(s) if s.as_bytes() == b"gzip" => Ok(true),
887 Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
888 None => Ok(false),
889 }
890 })
891 .boxed()
892 }
893
894 fn required_channel() -> BoxedFilter<(String,)> {
895 let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
896 .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
897 let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
898
899 splunk_channel_header
900 .and(splunk_channel_query_param)
901 .and_then(|header: Option<String>, query_param| async move {
902 header
903 .or(query_param)
904 .ok_or_else(|| Rejection::from(ApiError::MissingChannel))
905 })
906 .boxed()
907 }
908}
909struct EventIterator<'de, R: JsonRead<'de>> {
912 deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
914 envelopes_processed: usize,
919 channel: Option<Value>,
921 time: Time,
923 extractors: [DefaultExtractor; 4],
925 batch: Option<BatchNotifier>,
927 token: Option<Arc<str>>,
929 log_namespace: LogNamespace,
931 events_received: Registered<EventsReceived>,
933 decoder: Option<Decoder>,
936}
937
938struct EventIteratorGenerator<'de, R: JsonRead<'de>> {
940 deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
941 channel: Option<String>,
942 batch: Option<BatchNotifier>,
943 token: Option<Arc<str>>,
944 log_namespace: LogNamespace,
945 events_received: Registered<EventsReceived>,
946 remote: Option<SocketAddr>,
947 remote_addr: Option<String>,
948 decoder: Option<Decoder>,
949}
950
951impl<'de, R: JsonRead<'de>> From<EventIteratorGenerator<'de, R>> for EventIterator<'de, R> {
952 fn from(f: EventIteratorGenerator<'de, R>) -> Self {
953 let extractor_strategy = if f.decoder.is_some() {
959 LegacyKeyStrategy::InsertIfEmpty
960 } else {
961 LegacyKeyStrategy::Overwrite
962 };
963 Self {
964 deserializer: f.deserializer,
965 envelopes_processed: 0,
966 channel: f.channel.map(Value::from),
967 time: Time::Now(Utc::now()),
968 extractors: [
969 DefaultExtractor::new_with(
974 "host",
975 log_schema().host_key().cloned().into(),
976 f.remote_addr
977 .or_else(|| f.remote.map(|addr| addr.to_string()))
978 .map(Value::from),
979 f.log_namespace,
980 )
981 .with_legacy_key_strategy(extractor_strategy),
982 DefaultExtractor::new("index", OptionalValuePath::new(INDEX), f.log_namespace)
983 .with_legacy_key_strategy(extractor_strategy),
984 DefaultExtractor::new("source", OptionalValuePath::new(SOURCE), f.log_namespace)
985 .with_legacy_key_strategy(extractor_strategy),
986 DefaultExtractor::new(
987 "sourcetype",
988 OptionalValuePath::new(SOURCETYPE),
989 f.log_namespace,
990 )
991 .with_legacy_key_strategy(extractor_strategy),
992 ],
993 batch: f.batch,
994 token: f.token,
995 log_namespace: f.log_namespace,
996 events_received: f.events_received,
997 decoder: f.decoder,
998 }
999 }
1000}
1001
1002impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
1003 fn process_time(&mut self, json: &mut JsonValue) -> Result<(), Rejection> {
1006 let parsed_time = match json.get_mut("time").map(JsonValue::take) {
1007 Some(JsonValue::Number(time)) => Some(Some(time)),
1008 Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
1009 _ => None,
1010 };
1011
1012 match parsed_time {
1013 None => Ok(()),
1014 Some(Some(t)) => {
1015 if let Some(t) = t.as_u64() {
1016 let time = parse_timestamp(t as i64).ok_or(ApiError::InvalidDataFormat {
1017 event: self.envelopes_processed.saturating_sub(1),
1018 })?;
1019 self.time = Time::Provided(time);
1020 Ok(())
1021 } else if let Some(t) = t.as_f64() {
1022 self.time = Time::Provided(
1023 Utc.timestamp_opt(
1024 t.floor() as i64,
1025 (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
1026 )
1027 .single()
1028 .expect("invalid timestamp"),
1029 );
1030 Ok(())
1031 } else {
1032 Err(ApiError::InvalidDataFormat {
1033 event: self.envelopes_processed.saturating_sub(1),
1034 }
1035 .into())
1036 }
1037 }
1038 Some(None) => Err(ApiError::InvalidDataFormat {
1039 event: self.envelopes_processed.saturating_sub(1),
1040 }
1041 .into()),
1042 }
1043 }
1044
1045 fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
1046 self.envelopes_processed += 1;
1047 let mut log = match self.log_namespace {
1049 LogNamespace::Vector => self.build_log_vector(&mut json)?,
1050 LogNamespace::Legacy => self.build_log_legacy(&mut json)?,
1051 };
1052
1053 self.log_namespace.insert_vector_metadata(
1055 &mut log,
1056 log_schema().source_type_key(),
1057 &owned_value_path!("source_type"),
1058 SplunkConfig::NAME,
1059 );
1060
1061 let channel_path = owned_value_path!(CHANNEL);
1063 if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
1064 self.log_namespace.insert_source_metadata(
1065 SplunkConfig::NAME,
1066 &mut log,
1067 Some(LegacyKey::Overwrite(&channel_path)),
1068 lookup::path!(CHANNEL),
1069 guid,
1070 );
1071 } else if let Some(guid) = self.channel.as_ref() {
1072 self.log_namespace.insert_source_metadata(
1073 SplunkConfig::NAME,
1074 &mut log,
1075 Some(LegacyKey::Overwrite(&channel_path)),
1076 lookup::path!(CHANNEL),
1077 guid.clone(),
1078 );
1079 }
1080
1081 if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
1083 for (key, value) in object {
1084 self.log_namespace.insert_source_metadata(
1085 SplunkConfig::NAME,
1086 &mut log,
1087 Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))),
1088 lookup::path!(key.as_str()),
1089 value,
1090 );
1091 }
1092 }
1093
1094 self.process_time(&mut json)?;
1095
1096 let timestamp = match self.time.clone() {
1098 Time::Provided(time) => time,
1099 Time::Now(time) => time,
1100 };
1101
1102 self.log_namespace.insert_source_metadata(
1103 SplunkConfig::NAME,
1104 &mut log,
1105 log_schema().timestamp_key().map(LegacyKey::Overwrite),
1106 lookup::path!("timestamp"),
1107 timestamp,
1108 );
1109
1110 for de in self.extractors.iter_mut() {
1112 de.extract(&mut log, &mut json);
1113 }
1114
1115 if let Some(token) = &self.token {
1117 log.metadata_mut().set_splunk_hec_token(Arc::clone(token));
1118 }
1119
1120 if let Some(batch) = self.batch.clone() {
1121 log = log.with_batch_notifier(&batch);
1122 }
1123
1124 Ok(log.into())
1125 }
1126
1127 fn build_vrl_metadata(&self, json: &JsonValue) -> EventMetadata {
1135 let mut metadata = EventMetadata::default();
1136
1137 if let Some(token) = &self.token {
1139 metadata.set_splunk_hec_token(Arc::clone(token));
1140 }
1141
1142 let fields: &[(&str, &str)] = &[
1145 ("host", "splunk_hec.host"),
1146 ("source", "splunk_hec.source"),
1147 ("sourcetype", "splunk_hec.sourcetype"),
1148 ("index", "splunk_hec.index"),
1149 ];
1150 for (json_key, meta_path) in fields {
1151 let val = json
1152 .get(json_key)
1153 .and_then(|v| v.as_str())
1154 .map(|s| Value::from(s.to_string()))
1155 .or_else(|| {
1156 self.extractors
1157 .iter()
1158 .find(|e| e.field == *json_key)
1159 .and_then(|e| e.value.clone())
1160 });
1161 if let Some(v) = val {
1162 metadata.value_mut().insert(*meta_path, v);
1163 }
1164 }
1165
1166 let channel = json
1168 .get("channel")
1169 .and_then(|v| v.as_str())
1170 .map(|s| Value::from(s.to_string()))
1171 .or_else(|| self.channel.clone());
1172 if let Some(ch) = channel {
1173 metadata.value_mut().insert("splunk_hec.channel", ch);
1174 }
1175
1176 metadata
1177 }
1178
1179 fn build_events_decoded(
1185 &mut self,
1186 mut json: JsonValue,
1187 decoder: Decoder,
1188 ) -> Result<(Vec<Event>, bool), Rejection> {
1189 self.envelopes_processed += 1;
1190 let event = self.validate_event_field(&json)?;
1191 let payload = if let Some(s) = event.as_str() {
1196 s.as_bytes().to_vec()
1197 } else {
1198 match serde_json::to_vec(event) {
1199 Ok(bytes) => bytes,
1200 Err(error) => {
1201 let error: vector_lib::Error = Box::new(error);
1202 emit!(
1203 vector_lib::codecs::internal_events::DecoderDeserializeError {
1204 error: &error
1205 }
1206 );
1207 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
1208 count: 1,
1209 reason: "Failed to serialize event field to bytes.",
1210 });
1211 return Ok((vec![], true));
1212 }
1213 }
1214 };
1215
1216 self.process_time(&mut json)?;
1217
1218 let fallback_time = match self.time {
1223 Time::Provided(t) | Time::Now(t) => t,
1224 };
1225
1226 let decoder = decoder.with_metadata_template(self.build_vrl_metadata(&json));
1230
1231 let (decoded, had_decode_errors) = decode_payload(
1232 decoder,
1233 &payload,
1234 Some(fallback_time),
1235 true, DecodePayloadContext {
1237 batch: &self.batch,
1238 log_namespace: self.log_namespace,
1239 events_received: &self.events_received,
1240 splunk_hec_token: self.token.as_ref(),
1241 },
1242 );
1243
1244 let envelope_channel: Option<Value> = match json.get_mut("channel").map(JsonValue::take) {
1246 Some(JsonValue::String(guid)) => Some(guid.into()),
1247 _ => None,
1248 };
1249 let envelope_fields: Option<serde_json::Map<String, JsonValue>> =
1250 match json.get_mut("fields").map(JsonValue::take) {
1251 Some(JsonValue::Object(object)) => Some(object),
1252 _ => None,
1253 };
1254 let channel_path = owned_value_path!(CHANNEL);
1255
1256 let mut out = Vec::with_capacity(decoded.len());
1257 for mut event in decoded {
1258 if let Event::Log(log) = &mut event {
1259 if let Some(channel_val) = envelope_channel.clone().or_else(|| self.channel.clone())
1263 {
1264 match self.log_namespace {
1265 LogNamespace::Legacy => {
1266 self.log_namespace.insert_source_metadata(
1267 SplunkConfig::NAME,
1268 log,
1269 Some(LegacyKey::InsertIfEmpty(&channel_path)),
1270 lookup::path!(CHANNEL),
1271 channel_val,
1272 );
1273 }
1274 LogNamespace::Vector => {
1275 log.try_insert(
1276 metadata_path!(SplunkConfig::NAME, CHANNEL),
1277 channel_val,
1278 );
1279 }
1280 }
1281 }
1282
1283 for de in self.extractors.iter_mut() {
1289 de.extract(log, &mut json);
1290 }
1291
1292 if let Some(ref fields) = envelope_fields {
1296 for (key, value) in fields {
1297 match self.log_namespace {
1298 LogNamespace::Legacy => {
1299 self.log_namespace.insert_source_metadata(
1300 SplunkConfig::NAME,
1301 log,
1302 Some(LegacyKey::InsertIfEmpty(&owned_value_path!(
1303 key.as_str()
1304 ))),
1305 lookup::path!(key.as_str()),
1306 value.clone(),
1307 );
1308 }
1309 LogNamespace::Vector => {
1310 log.try_insert(
1311 metadata_path!(SplunkConfig::NAME, key.as_str()),
1312 value.clone(),
1313 );
1314 }
1315 }
1316 }
1317 }
1318 }
1319 out.push(event);
1323 }
1324
1325 Ok((out, had_decode_errors))
1326 }
1327
1328 fn validate_event_field<'a>(&self, json: &'a JsonValue) -> Result<&'a JsonValue, Rejection> {
1333 let event_idx = self.envelopes_processed.saturating_sub(1);
1334 match json.get("event") {
1335 None | Some(JsonValue::Null) => {
1336 Err(ApiError::MissingEventField { event: event_idx }.into())
1337 }
1338 Some(JsonValue::String(s)) if s.is_empty() => {
1339 Err(ApiError::EmptyEventField { event: event_idx }.into())
1340 }
1341 Some(event) => Ok(event),
1342 }
1343 }
1344
1345 fn build_log_vector(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
1349 let event: Value = self.validate_event_field(json)?.into();
1350 let mut log = LogEvent::from(event);
1351
1352 self.events_received
1354 .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1355
1356 self.log_namespace.insert_vector_metadata(
1358 &mut log,
1359 log_schema().timestamp_key(),
1360 lookup::path!("ingest_timestamp"),
1361 chrono::Utc::now(),
1362 );
1363
1364 Ok(log)
1365 }
1366
1367 fn build_log_legacy(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
1372 self.validate_event_field(json)?;
1374 let mut log = LogEvent::default();
1375 match json["event"].take() {
1376 JsonValue::String(string) => {
1377 log.maybe_insert(log_schema().message_key_target_path(), string);
1378 }
1379 JsonValue::Object(mut object) => {
1380 if object.is_empty() {
1381 return Err(ApiError::EmptyEventField {
1382 event: self.envelopes_processed.saturating_sub(1),
1383 }
1384 .into());
1385 }
1386
1387 if let Some(line) = object.remove("line") {
1389 match line {
1390 JsonValue::Array(_) | JsonValue::Object(_) => {
1392 log.insert(event_path!("line"), line);
1393 }
1394 _ => {
1395 log.maybe_insert(log_schema().message_key_target_path(), line);
1396 }
1397 }
1398 }
1399
1400 for (key, value) in object {
1401 log.insert(event_path!(key.as_str()), value);
1402 }
1403 }
1404 _ => {
1405 return Err(ApiError::InvalidDataFormat {
1406 event: self.envelopes_processed.saturating_sub(1),
1407 }
1408 .into());
1409 }
1410 }
1411
1412 self.events_received
1414 .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1415
1416 Ok(log)
1417 }
1418}
1419
1420impl<'de, R: JsonRead<'de>> Iterator for EventIterator<'de, R> {
1421 type Item = Result<(Vec<Event>, bool), Rejection>;
1425
1426 fn next(&mut self) -> Option<Self::Item> {
1427 match self.deserializer.next() {
1428 Some(Ok(json)) => {
1429 let result = if let Some(decoder) = self.decoder.clone() {
1430 self.build_events_decoded(json, decoder)
1431 } else {
1432 self.build_event(json).map(|event| (vec![event], false))
1433 };
1434 Some(result)
1435 }
1436 None => {
1437 if self.envelopes_processed == 0 {
1438 Some(Err(ApiError::NoData.into()))
1439 } else {
1440 None
1441 }
1442 }
1443 Some(Err(error)) => {
1444 emit!(SplunkHecRequestBodyInvalidError {
1445 error: error.into()
1446 });
1447 Some(Err(ApiError::InvalidDataFormat {
1451 event: self.envelopes_processed,
1452 }
1453 .into()))
1454 }
1455 }
1456 }
1457}
1458
1459struct DecodePayloadContext<'a> {
1460 batch: &'a Option<BatchNotifier>,
1461 log_namespace: LogNamespace,
1462 events_received: &'a Registered<EventsReceived>,
1463 splunk_hec_token: Option<&'a Arc<str>>,
1464}
1465
1466fn decode_payload(
1480 mut decoder: Decoder,
1481 payload: &[u8],
1482 fallback_timestamp: Option<DateTime<Utc>>,
1483 set_source_timestamp: bool,
1484 ctx: DecodePayloadContext<'_>,
1485) -> (Vec<Event>, bool) {
1486 let DecodePayloadContext {
1487 batch,
1488 log_namespace,
1489 events_received,
1490 splunk_hec_token,
1491 } = ctx;
1492 let mut buffer = BytesMut::with_capacity(payload.len());
1493 buffer.extend_from_slice(payload);
1494 let now = Utc::now();
1495 let mut events: Vec<Event> = Vec::new();
1496 let mut had_errors = false;
1497
1498 loop {
1499 match decoder.decode_eof(&mut buffer) {
1500 Ok(Some((decoded, _))) => {
1501 for mut event in decoded {
1502 if let Event::Log(log) = &mut event {
1503 log_namespace.insert_vector_metadata(
1504 log,
1505 log_schema().source_type_key(),
1506 lookup::path!("source_type"),
1507 Bytes::from_static(SplunkConfig::NAME.as_bytes()),
1508 );
1509 match log_namespace {
1510 LogNamespace::Vector => {
1511 if set_source_timestamp && let Some(timestamp) = fallback_timestamp
1516 {
1517 log.try_insert(
1518 metadata_path!(SplunkConfig::NAME, "timestamp"),
1519 timestamp,
1520 );
1521 }
1522 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
1523 }
1524 LogNamespace::Legacy => {
1525 if let Some(timestamp) = fallback_timestamp
1526 && let Some(timestamp_key) = log_schema().timestamp_key()
1527 {
1528 log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
1529 }
1530 }
1531 }
1532 }
1533 if let Some(token) = splunk_hec_token {
1534 event.metadata_mut().set_splunk_hec_token(Arc::clone(token));
1535 }
1536 events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
1537 events.push(event.with_batch_notifier_option(batch));
1538 }
1539 }
1540 Ok(None) => break,
1541 Err(error) => {
1542 had_errors = true;
1545 if !error.can_continue() {
1546 break;
1547 }
1548 }
1549 }
1550 }
1551
1552 (events, had_errors)
1553}
1554
1555fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
1566 const SEC_CUTOFF: i64 = 13569465600;
1568 const MILLISEC_CUTOFF: i64 = 253402300800000;
1570
1571 if t < 0 {
1573 return None;
1574 }
1575
1576 let ts = if t < SEC_CUTOFF {
1577 Utc.timestamp_opt(t, 0).single().expect("invalid timestamp")
1578 } else if t < MILLISEC_CUTOFF {
1579 Utc.timestamp_millis_opt(t)
1580 .single()
1581 .expect("invalid timestamp")
1582 } else {
1583 Utc.timestamp_nanos(t)
1584 };
1585
1586 Some(ts)
1587}
1588
1589#[derive(Clone, Copy)]
1591enum LegacyKeyStrategy {
1592 Overwrite,
1593 InsertIfEmpty,
1594}
1595
1596struct DefaultExtractor {
1598 field: &'static str,
1599 to_field: OptionalValuePath,
1600 value: Option<Value>,
1601 log_namespace: LogNamespace,
1602 legacy_key_strategy: LegacyKeyStrategy,
1603}
1604
1605impl DefaultExtractor {
1606 const fn new(
1607 field: &'static str,
1608 to_field: OptionalValuePath,
1609 log_namespace: LogNamespace,
1610 ) -> Self {
1611 DefaultExtractor {
1612 field,
1613 to_field,
1614 value: None,
1615 log_namespace,
1616 legacy_key_strategy: LegacyKeyStrategy::Overwrite,
1617 }
1618 }
1619
1620 fn new_with(
1621 field: &'static str,
1622 to_field: OptionalValuePath,
1623 value: impl Into<Option<Value>>,
1624 log_namespace: LogNamespace,
1625 ) -> Self {
1626 DefaultExtractor {
1627 field,
1628 to_field,
1629 value: value.into(),
1630 log_namespace,
1631 legacy_key_strategy: LegacyKeyStrategy::Overwrite,
1632 }
1633 }
1634
1635 const fn with_legacy_key_strategy(mut self, strategy: LegacyKeyStrategy) -> Self {
1639 self.legacy_key_strategy = strategy;
1640 self
1641 }
1642
1643 fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
1644 if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
1646 self.value = Some(new_value.into());
1647 }
1648
1649 if let Some(index) = self.value.as_ref()
1651 && let Some(metadata_key) = self.to_field.path.as_ref()
1652 {
1653 if matches!(self.log_namespace, LogNamespace::Vector)
1658 && matches!(self.legacy_key_strategy, LegacyKeyStrategy::InsertIfEmpty)
1659 {
1660 log.try_insert(
1661 (
1662 PathPrefix::Metadata,
1663 lookup::path!(SplunkConfig::NAME).concat(metadata_key),
1664 ),
1665 index.clone(),
1666 );
1667 } else {
1668 let legacy_key = match self.legacy_key_strategy {
1669 LegacyKeyStrategy::Overwrite => LegacyKey::Overwrite(metadata_key),
1670 LegacyKeyStrategy::InsertIfEmpty => LegacyKey::InsertIfEmpty(metadata_key),
1671 };
1672 self.log_namespace.insert_source_metadata(
1673 SplunkConfig::NAME,
1674 log,
1675 Some(legacy_key),
1676 &self.to_field.path.clone().unwrap_or(owned_value_path!("")),
1677 index.clone(),
1678 );
1679 }
1680 }
1681 }
1682}
1683
1684#[derive(Clone, Debug)]
1686enum Time {
1687 Now(DateTime<Utc>),
1689 Provided(DateTime<Utc>),
1691}
1692
1693#[allow(clippy::too_many_arguments)]
1701fn raw_event(
1702 bytes: Bytes,
1703 gzip: bool,
1704 channel: String,
1705 remote: Option<SocketAddr>,
1706 xff: Option<String>,
1707 batch: Option<BatchNotifier>,
1708 log_namespace: LogNamespace,
1709 events_received: &Registered<EventsReceived>,
1710 decoder: Option<Decoder>,
1711 splunk_hec_token: Option<Arc<str>>,
1712) -> Result<(Vec<Event>, bool), Rejection> {
1713 let body_bytes: Bytes = if gzip {
1715 let mut data = Vec::new();
1716 match MultiGzDecoder::new(bytes.reader()).read_to_end(&mut data) {
1717 Ok(0) => return Err(ApiError::NoData.into()),
1718 Ok(_) => Bytes::from(data),
1719 Err(error) => {
1720 emit!(SplunkHecRequestBodyInvalidError { error });
1721 return Err(ApiError::InvalidDataFormat { event: 0 }.into());
1722 }
1723 }
1724 } else {
1725 bytes
1726 };
1727
1728 let host = if let Some(remote_address) = xff {
1732 Some(remote_address)
1733 } else {
1734 remote.map(|remote| remote.to_string())
1735 };
1736
1737 let decoder_in_use = decoder.is_some();
1738 let (mut events, had_decode_errors): (Vec<Event>, bool) = if let Some(decoder) = decoder {
1739 let decoder = {
1743 let mut meta = EventMetadata::default();
1744 if let Some(token) = splunk_hec_token.as_ref() {
1745 meta.set_splunk_hec_token(Arc::clone(token));
1746 }
1747 if let Some(ref h) = host {
1748 meta.value_mut().insert("splunk_hec.host", h.clone());
1749 }
1750 meta.value_mut()
1751 .insert("splunk_hec.channel", channel.clone());
1752 decoder.with_metadata_template(meta)
1753 };
1754
1755 decode_payload(
1760 decoder,
1761 &body_bytes,
1762 Some(Utc::now()),
1763 false, DecodePayloadContext {
1765 batch: &batch,
1766 log_namespace,
1767 events_received,
1768 splunk_hec_token: splunk_hec_token.as_ref(),
1769 },
1770 )
1771 } else {
1772 let message: Value = body_bytes.into();
1773 let mut log = match log_namespace {
1774 LogNamespace::Vector => LogEvent::from(message),
1775 LogNamespace::Legacy => {
1776 let mut log = LogEvent::default();
1777 log.maybe_insert(log_schema().message_key_target_path(), message);
1778 log
1779 }
1780 };
1781 events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1783
1784 log_namespace.insert_standard_vector_source_metadata(
1785 &mut log,
1786 SplunkConfig::NAME,
1787 Utc::now(),
1788 );
1789
1790 if let Some(batch) = batch.clone() {
1791 log = log.with_batch_notifier(&batch);
1792 }
1793 (vec![Event::from(log)], false)
1794 };
1795
1796 let channel_path = owned_value_path!(CHANNEL);
1797 for event in &mut events {
1798 if let Event::Log(log) = event {
1799 if decoder_in_use && matches!(log_namespace, LogNamespace::Vector) {
1805 log.try_insert(metadata_path!(SplunkConfig::NAME, CHANNEL), channel.clone());
1806 if let Some(ref h) = host {
1807 log.try_insert(metadata_path!(SplunkConfig::NAME, "host"), h.clone());
1808 }
1809 } else {
1810 let channel_legacy_key = if decoder_in_use {
1811 LegacyKey::InsertIfEmpty(&channel_path)
1812 } else {
1813 LegacyKey::Overwrite(&channel_path)
1814 };
1815 log_namespace.insert_source_metadata(
1816 SplunkConfig::NAME,
1817 log,
1818 Some(channel_legacy_key),
1819 lookup::path!(CHANNEL),
1820 channel.clone(),
1821 );
1822 if let Some(ref host) = host {
1823 log_namespace.insert_source_metadata(
1824 SplunkConfig::NAME,
1825 log,
1826 log_schema().host_key().map(LegacyKey::InsertIfEmpty),
1827 lookup::path!("host"),
1828 host.clone(),
1829 );
1830 }
1831 }
1832 }
1833 }
1834
1835 Ok((events, had_decode_errors))
1836}
1837
1838#[derive(Clone, Copy, Debug, Snafu)]
1839pub(crate) enum ApiError {
1840 MissingAuthorization,
1841 InvalidAuthorization,
1842 UnsupportedEncoding,
1843 UnsupportedContentType,
1844 MissingChannel,
1845 NoData,
1846 InvalidDataFormat { event: usize },
1847 ServerShutdown,
1848 EmptyEventField { event: usize },
1849 MissingEventField { event: usize },
1850 BadRequest,
1851 ServiceUnavailable,
1852 AckIsDisabled,
1853}
1854
1855impl warp::reject::Reject for ApiError {}
1856
1857mod splunk_response {
1859 use serde::Serialize;
1860
1861 pub enum HecStatusCode {
1863 Success = 0,
1864 TokenIsRequired = 2,
1865 InvalidAuthorization = 3,
1866 NoData = 5,
1867 InvalidDataFormat = 6,
1868 ServerIsBusy = 9,
1869 DataChannelIsMissing = 10,
1870 EventFieldIsRequired = 12,
1871 EventFieldCannotBeBlank = 13,
1872 AckIsDisabled = 14,
1873 }
1874
1875 #[derive(Serialize)]
1876 pub enum HecResponseMetadata {
1877 #[serde(rename = "ackId")]
1878 AckId(u64),
1879 #[serde(rename = "invalid-event-number")]
1880 InvalidEventNumber(usize),
1881 }
1882
1883 #[derive(Serialize)]
1884 pub struct HecResponse {
1885 text: &'static str,
1886 code: u8,
1887 #[serde(skip_serializing_if = "Option::is_none", flatten)]
1888 pub metadata: Option<HecResponseMetadata>,
1889 }
1890
1891 impl HecResponse {
1892 pub const fn new(code: HecStatusCode) -> Self {
1893 let text = match code {
1894 HecStatusCode::Success => "Success",
1895 HecStatusCode::TokenIsRequired => "Token is required",
1896 HecStatusCode::InvalidAuthorization => "Invalid authorization",
1897 HecStatusCode::NoData => "No data",
1898 HecStatusCode::InvalidDataFormat => "Invalid data format",
1899 HecStatusCode::DataChannelIsMissing => "Data channel is missing",
1900 HecStatusCode::EventFieldIsRequired => "Event field is required",
1901 HecStatusCode::EventFieldCannotBeBlank => "Event field cannot be blank",
1902 HecStatusCode::ServerIsBusy => "Server is busy",
1903 HecStatusCode::AckIsDisabled => "Ack is disabled",
1904 };
1905
1906 Self {
1907 text,
1908 code: code as u8,
1909 metadata: None,
1910 }
1911 }
1912
1913 pub const fn with_metadata(mut self, metadata: HecResponseMetadata) -> Self {
1914 self.metadata = Some(metadata);
1915 self
1916 }
1917 }
1918
1919 pub const INVALID_AUTHORIZATION: HecResponse =
1920 HecResponse::new(HecStatusCode::InvalidAuthorization);
1921 pub const TOKEN_IS_REQUIRED: HecResponse = HecResponse::new(HecStatusCode::TokenIsRequired);
1922 pub const NO_DATA: HecResponse = HecResponse::new(HecStatusCode::NoData);
1923 pub const SUCCESS: HecResponse = HecResponse::new(HecStatusCode::Success);
1924 pub const SERVER_IS_BUSY: HecResponse = HecResponse::new(HecStatusCode::ServerIsBusy);
1925 pub const NO_CHANNEL: HecResponse = HecResponse::new(HecStatusCode::DataChannelIsMissing);
1926 pub const ACK_IS_DISABLED: HecResponse = HecResponse::new(HecStatusCode::AckIsDisabled);
1927}
1928
1929async fn register_ack(
1930 idx_ack: Option<Arc<IndexerAcknowledgement>>,
1931 receiver: Option<BatchStatusReceiver>,
1932 channel: Option<String>,
1933) -> Result<Option<u64>, Rejection> {
1934 match (idx_ack, receiver, channel) {
1935 (Some(ack), Some(rx), Some(ch)) => Ok(Some(ack.get_ack_id_from_channel(ch, rx).await?)),
1936 _ => Ok(None),
1937 }
1938}
1939
1940fn finish_ok(maybe_ack_id: Option<u64>) -> Response {
1941 let body = if let Some(ack_id) = maybe_ack_id {
1942 HecResponse::new(HecStatusCode::Success).with_metadata(HecResponseMetadata::AckId(ack_id))
1943 } else {
1944 splunk_response::SUCCESS
1945 };
1946 response_json(StatusCode::OK, body)
1947}
1948
1949fn response_plain(code: StatusCode, msg: &'static str) -> Response {
1950 warp::reply::with_status(
1951 warp::reply::with_header(msg, http::header::CONTENT_TYPE, "text/plain; charset=utf-8"),
1952 code,
1953 )
1954 .into_response()
1955}
1956
1957async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
1958 if let Some(&error) = rejection.find::<ApiError>() {
1959 emit!(SplunkHecRequestError { error });
1960 Ok((match error {
1961 ApiError::MissingAuthorization => {
1962 response_json(StatusCode::UNAUTHORIZED, splunk_response::TOKEN_IS_REQUIRED)
1963 }
1964 ApiError::InvalidAuthorization => response_json(
1965 StatusCode::UNAUTHORIZED,
1966 splunk_response::INVALID_AUTHORIZATION,
1967 ),
1968 ApiError::UnsupportedEncoding => empty_response(StatusCode::UNSUPPORTED_MEDIA_TYPE),
1969 ApiError::UnsupportedContentType => response_plain(
1970 StatusCode::UNSUPPORTED_MEDIA_TYPE,
1971 "The request's content-type is not supported",
1972 ),
1973 ApiError::MissingChannel => {
1974 response_json(StatusCode::BAD_REQUEST, splunk_response::NO_CHANNEL)
1975 }
1976 ApiError::NoData => response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA),
1977 ApiError::ServerShutdown => empty_response(StatusCode::SERVICE_UNAVAILABLE),
1978 ApiError::InvalidDataFormat { event } => response_json(
1979 StatusCode::BAD_REQUEST,
1980 HecResponse::new(HecStatusCode::InvalidDataFormat)
1981 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1982 ),
1983 ApiError::EmptyEventField { event } => response_json(
1984 StatusCode::BAD_REQUEST,
1985 HecResponse::new(HecStatusCode::EventFieldCannotBeBlank)
1986 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1987 ),
1988 ApiError::MissingEventField { event } => response_json(
1989 StatusCode::BAD_REQUEST,
1990 HecResponse::new(HecStatusCode::EventFieldIsRequired)
1991 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1992 ),
1993 ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
1994 ApiError::ServiceUnavailable => response_json(
1995 StatusCode::SERVICE_UNAVAILABLE,
1996 splunk_response::SERVER_IS_BUSY,
1997 ),
1998 ApiError::AckIsDisabled => {
1999 response_json(StatusCode::BAD_REQUEST, splunk_response::ACK_IS_DISABLED)
2000 }
2001 },))
2002 } else {
2003 Err(rejection)
2004 }
2005}
2006
2007fn empty_response(code: StatusCode) -> Response {
2009 let mut res = Response::default();
2010 *res.status_mut() = code;
2011 res
2012}
2013
2014fn response_json(code: StatusCode, body: impl Serialize) -> Response {
2016 warp::reply::with_status(warp::reply::json(&body), code).into_response()
2017}
2018
2019#[cfg(feature = "sinks-splunk_hec")]
2020#[cfg(test)]
2021mod tests {
2022 use std::{net::SocketAddr, num::NonZeroU64};
2023
2024 use chrono::{TimeZone, Utc};
2025 use futures_util::Stream;
2026 use http::Uri;
2027 use reqwest::{RequestBuilder, Response};
2028 use serde::Deserialize;
2029 use vector_lib::{
2030 codecs::{
2031 BytesDecoderConfig, JsonSerializerConfig, TextSerializerConfig,
2032 decoding::{
2033 DeserializerConfig,
2034 format::{VrlDeserializerConfig, VrlDeserializerOptions},
2035 },
2036 },
2037 event::EventStatus,
2038 schema::Definition,
2039 sensitive_string::SensitiveString,
2040 };
2041 use vrl::path::PathPrefix;
2042
2043 use super::*;
2044 use crate::{
2045 SourceSender,
2046 codecs::{DecodingConfig, EncodingConfig},
2047 components::validation::prelude::*,
2048 config::{SinkConfig, SinkContext, SourceConfig, SourceContext, log_schema},
2049 event::{Event, LogEvent},
2050 sinks::{
2051 Healthcheck, VectorSink,
2052 splunk_hec::logs::config::HecLogsSinkConfig,
2053 util::{BatchConfig, Compression, TowerRequestConfig},
2054 },
2055 sources::splunk_hec::acknowledgements::{HecAckStatusRequest, HecAckStatusResponse},
2056 test_util::{
2057 addr::{PortGuard, next_addr},
2058 collect_n,
2059 components::{
2060 COMPONENT_ERROR_TAGS, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance,
2061 assert_source_error,
2062 },
2063 wait_for_tcp,
2064 },
2065 };
2066
2067 #[test]
2068 fn generate_config() {
2069 crate::test_util::test_generate_config::<SplunkConfig>();
2070 }
2071
2072 const TOKEN: &str = "token";
2074 const VALID_TOKENS: &[&str; 2] = &[TOKEN, "secondary-token"];
2075
2076 async fn source(
2077 acknowledgements: Option<HecAcknowledgementsConfig>,
2078 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
2079 source_with(Some(TOKEN.to_owned().into()), None, acknowledgements, false).await
2080 }
2081
2082 async fn source_with(
2083 token: Option<SensitiveString>,
2084 valid_tokens: Option<&[&str]>,
2085 acknowledgements: Option<HecAcknowledgementsConfig>,
2086 store_hec_token: bool,
2087 ) -> (
2088 impl Stream<Item = Event> + Unpin + use<>,
2089 SocketAddr,
2090 PortGuard,
2091 ) {
2092 let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
2093 let (_guard, address) = next_addr();
2094 let valid_tokens =
2095 valid_tokens.map(|tokens| tokens.iter().map(|v| v.to_string().into()).collect());
2096 let cx = SourceContext::new_test(sender, None);
2097 tokio::spawn(async move {
2098 SplunkConfig {
2099 address,
2100 token,
2101 valid_tokens,
2102 tls: None,
2103 acknowledgements: acknowledgements.unwrap_or_default(),
2104 store_hec_token,
2105 log_namespace: None,
2106 keepalive: Default::default(),
2107 event: CodecConfig::default(),
2108 raw: CodecConfig::default(),
2109 }
2110 .build(cx)
2111 .await
2112 .unwrap()
2113 .await
2114 .unwrap()
2115 });
2116 wait_for_tcp(address).await;
2117 (recv, address, _guard)
2118 }
2119
2120 async fn sink(
2121 address: SocketAddr,
2122 encoding: EncodingConfig,
2123 compression: Compression,
2124 ) -> (VectorSink, Healthcheck) {
2125 HecLogsSinkConfig {
2126 default_token: TOKEN.to_owned().into(),
2127 endpoint: format!("http://{address}"),
2128 host_key: None,
2129 indexed_fields: vec![],
2130 index: None,
2131 sourcetype: None,
2132 source: None,
2133 encoding,
2134 compression,
2135 batch: BatchConfig::default(),
2136 request: TowerRequestConfig::default(),
2137 tls: None,
2138 acknowledgements: Default::default(),
2139 timestamp_nanos_key: None,
2140 timestamp_key: None,
2141 auto_extract_timestamp: None,
2142 endpoint_target: Default::default(),
2143 }
2144 .build(SinkContext::default())
2145 .await
2146 .unwrap()
2147 }
2148
2149 async fn start(
2150 encoding: EncodingConfig,
2151 compression: Compression,
2152 acknowledgements: Option<HecAcknowledgementsConfig>,
2153 ) -> (VectorSink, impl Stream<Item = Event> + Unpin) {
2154 let (source, address, _guard) = source(acknowledgements).await;
2155 let (sink, health) = sink(address, encoding, compression).await;
2156 assert!(health.await.is_ok());
2157 (sink, source)
2158 }
2159
2160 async fn channel_n(
2161 messages: Vec<impl Into<String> + Send + 'static>,
2162 sink: VectorSink,
2163 source: impl Stream<Item = Event> + Unpin,
2164 ) -> Vec<Event> {
2165 let n = messages.len();
2166
2167 tokio::spawn(async move {
2168 sink.run_events(
2169 messages
2170 .into_iter()
2171 .map(|s| Event::Log(LogEvent::from(s.into()))),
2172 )
2173 .await
2174 .unwrap();
2175 });
2176
2177 let events = collect_n(source, n).await;
2178 assert_eq!(n, events.len());
2179
2180 events
2181 }
2182
2183 #[derive(Clone, Copy, Debug)]
2184 enum Channel<'a> {
2185 Header(&'a str),
2186 QueryParam(&'a str),
2187 }
2188
2189 #[derive(Default)]
2190 struct SendWithOpts<'a> {
2191 channel: Option<Channel<'a>>,
2192 forwarded_for: Option<String>,
2193 }
2194
2195 async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
2196 let channel = Channel::Header("channel");
2197 let options = SendWithOpts {
2198 channel: Some(channel),
2199 forwarded_for: None,
2200 };
2201 send_with(address, api, message, TOKEN, &options).await
2202 }
2203
2204 fn build_request(
2205 address: SocketAddr,
2206 api: &str,
2207 message: &str,
2208 token: &str,
2209 opts: &SendWithOpts<'_>,
2210 ) -> RequestBuilder {
2211 let mut b = reqwest::Client::new()
2212 .post(format!("http://{address}/{api}"))
2213 .header("Authorization", format!("Splunk {token}"));
2214
2215 b = match opts.channel {
2216 Some(c) => match c {
2217 Channel::Header(v) => b.header("x-splunk-request-channel", v),
2218 Channel::QueryParam(v) => b.query(&[("channel", v)]),
2219 },
2220 None => b,
2221 };
2222
2223 b = match &opts.forwarded_for {
2224 Some(f) => b.header("X-Forwarded-For", f),
2225 None => b,
2226 };
2227
2228 b.body(message.to_owned())
2229 }
2230
2231 async fn send_with(
2232 address: SocketAddr,
2233 api: &str,
2234 message: &str,
2235 token: &str,
2236 opts: &SendWithOpts<'_>,
2237 ) -> u16 {
2238 let b = build_request(address, api, message, token, opts);
2239 b.send().await.unwrap().status().as_u16()
2240 }
2241
2242 async fn send_with_response(
2243 address: SocketAddr,
2244 api: &str,
2245 message: &str,
2246 token: &str,
2247 opts: &SendWithOpts<'_>,
2248 ) -> Response {
2249 let b = build_request(address, api, message, token, opts);
2250 b.send().await.unwrap()
2251 }
2252
2253 #[tokio::test]
2254 async fn no_compression_text_event() {
2255 let message = "gzip_text_event";
2256 let (sink, source) = start(
2257 TextSerializerConfig::default().into(),
2258 Compression::None,
2259 None,
2260 )
2261 .await;
2262
2263 let event = channel_n(vec![message], sink, source).await.remove(0);
2264
2265 assert_eq!(
2266 event.as_log()[log_schema().message_key().unwrap().to_string()],
2267 message.into()
2268 );
2269 assert!(event.as_log().get_timestamp().is_some());
2270 assert_eq!(
2271 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2272 "splunk_hec".into()
2273 );
2274 assert!(event.metadata().splunk_hec_token().is_none());
2275 }
2276
2277 #[tokio::test]
2278 async fn one_simple_text_event() {
2279 let message = "one_simple_text_event";
2280 let (sink, source) = start(
2281 TextSerializerConfig::default().into(),
2282 Compression::gzip_default(),
2283 None,
2284 )
2285 .await;
2286
2287 let event = channel_n(vec![message], sink, source).await.remove(0);
2288
2289 assert_eq!(
2290 event.as_log()[log_schema().message_key().unwrap().to_string()],
2291 message.into()
2292 );
2293 assert!(event.as_log().get_timestamp().is_some());
2294 assert_eq!(
2295 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2296 "splunk_hec".into()
2297 );
2298 assert!(event.metadata().splunk_hec_token().is_none());
2299 }
2300
2301 #[tokio::test]
2302 async fn multiple_simple_text_event() {
2303 let n = 200;
2304 let (sink, source) = start(
2305 TextSerializerConfig::default().into(),
2306 Compression::None,
2307 None,
2308 )
2309 .await;
2310
2311 let messages = (0..n)
2312 .map(|i| format!("multiple_simple_text_event_{i}"))
2313 .collect::<Vec<_>>();
2314 let events = channel_n(messages.clone(), sink, source).await;
2315
2316 for (msg, event) in messages.into_iter().zip(events.into_iter()) {
2317 assert_eq!(
2318 event.as_log()[log_schema().message_key().unwrap().to_string()],
2319 msg.into()
2320 );
2321 assert!(event.as_log().get_timestamp().is_some());
2322 assert_eq!(
2323 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2324 "splunk_hec".into()
2325 );
2326 assert!(event.metadata().splunk_hec_token().is_none());
2327 }
2328 }
2329
2330 #[tokio::test]
2331 async fn one_simple_json_event() {
2332 let message = "one_simple_json_event";
2333 let (sink, source) = start(
2334 JsonSerializerConfig::default().into(),
2335 Compression::gzip_default(),
2336 None,
2337 )
2338 .await;
2339
2340 let event = channel_n(vec![message], sink, source).await.remove(0);
2341
2342 assert_eq!(
2343 event.as_log()[log_schema().message_key().unwrap().to_string()],
2344 message.into()
2345 );
2346 assert!(event.as_log().get_timestamp().is_some());
2347 assert_eq!(
2348 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2349 "splunk_hec".into()
2350 );
2351 assert!(event.metadata().splunk_hec_token().is_none());
2352 }
2353
2354 #[tokio::test]
2355 async fn multiple_simple_json_event() {
2356 let n = 200;
2357 let (sink, source) = start(
2358 JsonSerializerConfig::default().into(),
2359 Compression::gzip_default(),
2360 None,
2361 )
2362 .await;
2363
2364 let messages = (0..n)
2365 .map(|i| format!("multiple_simple_json_event{i}"))
2366 .collect::<Vec<_>>();
2367 let events = channel_n(messages.clone(), sink, source).await;
2368
2369 for (msg, event) in messages.into_iter().zip(events.into_iter()) {
2370 assert_eq!(
2371 event.as_log()[log_schema().message_key().unwrap().to_string()],
2372 msg.into()
2373 );
2374 assert!(event.as_log().get_timestamp().is_some());
2375 assert_eq!(
2376 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2377 "splunk_hec".into()
2378 );
2379 assert!(event.metadata().splunk_hec_token().is_none());
2380 }
2381 }
2382
2383 #[tokio::test]
2384 async fn json_event() {
2385 let (sink, source) = start(
2386 JsonSerializerConfig::default().into(),
2387 Compression::gzip_default(),
2388 None,
2389 )
2390 .await;
2391
2392 let mut log = LogEvent::default();
2393 log.insert("greeting", "hello");
2394 log.insert("name", "bob");
2395 sink.run_events(vec![log.into()]).await.unwrap();
2396
2397 let event = collect_n(source, 1).await.remove(0).into_log();
2398 assert_eq!(event["greeting"], "hello".into());
2399 assert_eq!(event["name"], "bob".into());
2400 assert!(event.get_timestamp().is_some());
2401 assert_eq!(
2402 event[log_schema().source_type_key().unwrap().to_string()],
2403 "splunk_hec".into()
2404 );
2405 assert!(event.metadata().splunk_hec_token().is_none());
2406 }
2407
2408 #[tokio::test]
2409 async fn json_invalid_path_event() {
2410 let (sink, source) = start(
2411 JsonSerializerConfig::default().into(),
2412 Compression::gzip_default(),
2413 None,
2414 )
2415 .await;
2416
2417 let mut log = LogEvent::default();
2418 log.insert(event_path!("(greeting | thing"), "hello");
2421 sink.run_events(vec![log.into()]).await.unwrap();
2422
2423 let event = collect_n(source, 1).await.remove(0).into_log();
2424 assert_eq!(
2425 event.get(event_path!("(greeting | thing")),
2426 Some(&Value::from("hello"))
2427 );
2428 }
2429
2430 #[tokio::test]
2431 async fn line_to_message() {
2432 let (sink, source) = start(
2433 JsonSerializerConfig::default().into(),
2434 Compression::gzip_default(),
2435 None,
2436 )
2437 .await;
2438
2439 let mut event = LogEvent::default();
2440 event.insert("line", "hello");
2441 sink.run_events(vec![event.into()]).await.unwrap();
2442
2443 let event = collect_n(source, 1).await.remove(0);
2444 assert_eq!(
2445 event.as_log()[log_schema().message_key().unwrap().to_string()],
2446 "hello".into()
2447 );
2448 assert!(event.metadata().splunk_hec_token().is_none());
2449 }
2450
2451 #[tokio::test]
2452 async fn raw() {
2453 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2454 let message = "raw";
2455 let (source, address, _guard) = source(None).await;
2456
2457 assert_eq!(200, post(address, "services/collector/raw", message).await);
2458
2459 let event = collect_n(source, 1).await.remove(0);
2460 assert_eq!(
2461 event.as_log()[log_schema().message_key().unwrap().to_string()],
2462 message.into()
2463 );
2464 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
2465 assert!(event.as_log().get_timestamp().is_some());
2466 assert_eq!(
2467 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2468 "splunk_hec".into()
2469 );
2470 assert!(event.metadata().splunk_hec_token().is_none());
2471 })
2472 .await;
2473 }
2474
2475 #[tokio::test]
2476 async fn root() {
2477 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2478 let message = r#"{ "event": { "message": "root"} }"#;
2479 let (source, address, _guard) = source(None).await;
2480
2481 assert_eq!(200, post(address, "services/collector", message).await);
2482
2483 let event = collect_n(source, 1).await.remove(0);
2484 assert_eq!(
2485 event.as_log()[log_schema().message_key().unwrap().to_string()],
2486 "root".into()
2487 );
2488 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
2489 assert!(event.as_log().get_timestamp().is_some());
2490 assert_eq!(
2491 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2492 "splunk_hec".into()
2493 );
2494 assert!(event.metadata().splunk_hec_token().is_none());
2495 })
2496 .await;
2497 }
2498
2499 #[tokio::test]
2500 async fn channel_header() {
2501 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2502 let message = "raw";
2503 let (source, address, _guard) = source(None).await;
2504
2505 let opts = SendWithOpts {
2506 channel: Some(Channel::Header("guid")),
2507 forwarded_for: None,
2508 };
2509
2510 assert_eq!(
2511 200,
2512 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
2513 );
2514
2515 let event = collect_n(source, 1).await.remove(0);
2516 assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
2517 })
2518 .await;
2519 }
2520
2521 #[tokio::test]
2522 async fn xff_header_raw() {
2523 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2524 let message = "raw";
2525 let (source, address, _guard) = source(None).await;
2526
2527 let opts = SendWithOpts {
2528 channel: Some(Channel::Header("guid")),
2529 forwarded_for: Some(String::from("10.0.0.1")),
2530 };
2531
2532 assert_eq!(
2533 200,
2534 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
2535 );
2536
2537 let event = collect_n(source, 1).await.remove(0);
2538 assert_eq!(
2539 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
2540 "10.0.0.1".into()
2541 );
2542 })
2543 .await;
2544 }
2545
2546 #[tokio::test]
2548 async fn xff_header_event_with_host_field() {
2549 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2550 let message = r#"{"event":"first", "host": "10.1.0.2"}"#;
2551 let (source, address, _guard) = source(None).await;
2552
2553 let opts = SendWithOpts {
2554 channel: Some(Channel::Header("guid")),
2555 forwarded_for: Some(String::from("10.0.0.1")),
2556 };
2557
2558 assert_eq!(
2559 200,
2560 send_with(address, "services/collector/event", message, TOKEN, &opts).await
2561 );
2562
2563 let event = collect_n(source, 1).await.remove(0);
2564 assert_eq!(
2565 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
2566 "10.1.0.2".into()
2567 );
2568 })
2569 .await;
2570 }
2571
2572 #[tokio::test]
2574 async fn xff_header_event_without_host_field() {
2575 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2576 let message = r#"{"event":"first", "color": "blue"}"#;
2577 let (source, address, _guard) = source(None).await;
2578
2579 let opts = SendWithOpts {
2580 channel: Some(Channel::Header("guid")),
2581 forwarded_for: Some(String::from("10.0.0.1")),
2582 };
2583
2584 assert_eq!(
2585 200,
2586 send_with(address, "services/collector/event", message, TOKEN, &opts).await
2587 );
2588
2589 let event = collect_n(source, 1).await.remove(0);
2590 assert_eq!(
2591 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
2592 "10.0.0.1".into()
2593 );
2594 })
2595 .await;
2596 }
2597
2598 #[tokio::test]
2599 async fn channel_query_param() {
2600 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2601 let message = "raw";
2602 let (source, address, _guard) = source(None).await;
2603
2604 let opts = SendWithOpts {
2605 channel: Some(Channel::QueryParam("guid")),
2606 forwarded_for: None,
2607 };
2608
2609 assert_eq!(
2610 200,
2611 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
2612 );
2613
2614 let event = collect_n(source, 1).await.remove(0);
2615 assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
2616 })
2617 .await;
2618 }
2619
2620 #[tokio::test]
2621 async fn no_data() {
2622 let (_source, address, _guard) = source(None).await;
2623
2624 assert_eq!(400, post(address, "services/collector/event", "").await);
2625 }
2626
2627 #[tokio::test]
2628 async fn invalid_token() {
2629 assert_source_error(&COMPONENT_ERROR_TAGS, async {
2630 let (_source, address, _guard) = source(None).await;
2631 let opts = SendWithOpts {
2632 channel: Some(Channel::Header("channel")),
2633 forwarded_for: None,
2634 };
2635
2636 assert_eq!(
2637 401,
2638 send_with(address, "services/collector/event", "", "nope", &opts).await
2639 );
2640 })
2641 .await;
2642 }
2643
2644 #[tokio::test]
2645 async fn health_ignores_token() {
2646 let (_source, address, _guard) = source(None).await;
2647
2648 let res = reqwest::Client::new()
2649 .get(format!("http://{address}/services/collector/health"))
2650 .header("Authorization", format!("Splunk {}", "invalid token"))
2651 .send()
2652 .await
2653 .unwrap();
2654
2655 assert_eq!(200, res.status().as_u16());
2656 }
2657
2658 #[tokio::test]
2659 async fn health() {
2660 let (_source, address, _guard) = source(None).await;
2661
2662 let res = reqwest::Client::new()
2663 .get(format!("http://{address}/services/collector/health"))
2664 .send()
2665 .await
2666 .unwrap();
2667
2668 assert_eq!(200, res.status().as_u16());
2669 }
2670
2671 #[tokio::test]
2672 async fn secondary_token() {
2673 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2674 let message = r#"{"event":"first", "color": "blue"}"#;
2675 let (_source, address, _guard) =
2676 source_with(None, Some(VALID_TOKENS), None, false).await;
2677 let options = SendWithOpts {
2678 channel: None,
2679 forwarded_for: None,
2680 };
2681
2682 assert_eq!(
2683 200,
2684 send_with(
2685 address,
2686 "services/collector/event",
2687 message,
2688 VALID_TOKENS.get(1).unwrap(),
2689 &options
2690 )
2691 .await
2692 );
2693 })
2694 .await;
2695 }
2696
2697 #[tokio::test]
2698 async fn event_service_token_passthrough_enabled() {
2699 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2700 let message = "passthrough_token_enabled";
2701 let (source, address, _guard) = source_with(None, Some(VALID_TOKENS), None, true).await;
2702 let (sink, health) = sink(
2703 address,
2704 TextSerializerConfig::default().into(),
2705 Compression::gzip_default(),
2706 )
2707 .await;
2708 assert!(health.await.is_ok());
2709
2710 let event = channel_n(vec![message], sink, source).await.remove(0);
2711
2712 assert_eq!(
2713 event.as_log()[log_schema().message_key().unwrap().to_string()],
2714 message.into()
2715 );
2716 assert_eq!(
2717 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2718 TOKEN
2719 );
2720 })
2721 .await;
2722 }
2723
2724 #[tokio::test]
2725 async fn raw_service_token_passthrough_enabled() {
2726 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2727 let message = "raw";
2728 let (source, address, _guard) = source_with(None, Some(VALID_TOKENS), None, true).await;
2729
2730 assert_eq!(200, post(address, "services/collector/raw", message).await);
2731
2732 let event = collect_n(source, 1).await.remove(0);
2733 assert_eq!(
2734 event.as_log()[log_schema().message_key().unwrap().to_string()],
2735 message.into()
2736 );
2737 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
2738 assert!(event.as_log().get_timestamp().is_some());
2739 assert_eq!(
2740 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2741 "splunk_hec".into()
2742 );
2743 assert_eq!(
2744 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2745 TOKEN
2746 );
2747 })
2748 .await;
2749 }
2750
2751 #[tokio::test]
2752 async fn no_authorization() {
2753 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2754 let message = "no_authorization";
2755 let (source, address, _guard) = source_with(None, None, None, false).await;
2756 let (sink, health) = sink(
2757 address,
2758 TextSerializerConfig::default().into(),
2759 Compression::gzip_default(),
2760 )
2761 .await;
2762 assert!(health.await.is_ok());
2763
2764 let event = channel_n(vec![message], sink, source).await.remove(0);
2765
2766 assert_eq!(
2767 event.as_log()[log_schema().message_key().unwrap().to_string()],
2768 message.into()
2769 );
2770 assert!(event.metadata().splunk_hec_token().is_none());
2771 })
2772 .await;
2773 }
2774
2775 #[tokio::test]
2776 async fn no_authorization_token_passthrough_enabled() {
2777 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2778 let message = "no_authorization";
2779 let (source, address, _guard) = source_with(None, None, None, true).await;
2780 let (sink, health) = sink(
2781 address,
2782 TextSerializerConfig::default().into(),
2783 Compression::gzip_default(),
2784 )
2785 .await;
2786 assert!(health.await.is_ok());
2787
2788 let event = channel_n(vec![message], sink, source).await.remove(0);
2789
2790 assert_eq!(
2791 event.as_log()[log_schema().message_key().unwrap().to_string()],
2792 message.into()
2793 );
2794 assert_eq!(
2795 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2796 TOKEN
2797 );
2798 })
2799 .await;
2800 }
2801
2802 #[tokio::test]
2803 async fn partial() {
2804 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2805 let message = r#"{"event":"first"}{"event":"second""#;
2806 let (source, address, _guard) = source(None).await;
2807
2808 assert_eq!(
2809 400,
2810 post(address, "services/collector/event", message).await
2811 );
2812
2813 let event = collect_n(source, 1).await.remove(0);
2814 assert_eq!(
2815 event.as_log()[log_schema().message_key().unwrap().to_string()],
2816 "first".into()
2817 );
2818 assert!(event.as_log().get_timestamp().is_some());
2819 assert_eq!(
2820 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2821 "splunk_hec".into()
2822 );
2823 })
2824 .await;
2825 }
2826
2827 #[tokio::test]
2828 async fn handles_newlines() {
2829 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2830 let message = r#"
2831{"event":"first"}
2832 "#;
2833 let (source, address, _guard) = source(None).await;
2834
2835 assert_eq!(
2836 200,
2837 post(address, "services/collector/event", message).await
2838 );
2839
2840 let event = collect_n(source, 1).await.remove(0);
2841 assert_eq!(
2842 event.as_log()[log_schema().message_key().unwrap().to_string()],
2843 "first".into()
2844 );
2845 assert!(event.as_log().get_timestamp().is_some());
2846 assert_eq!(
2847 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2848 "splunk_hec".into()
2849 );
2850 })
2851 .await;
2852 }
2853
2854 #[tokio::test]
2855 async fn handles_spaces() {
2856 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2857 let message = r#" {"event":"first"} "#;
2858 let (source, address, _guard) = source(None).await;
2859
2860 assert_eq!(
2861 200,
2862 post(address, "services/collector/event", message).await
2863 );
2864
2865 let event = collect_n(source, 1).await.remove(0);
2866 assert_eq!(
2867 event.as_log()[log_schema().message_key().unwrap().to_string()],
2868 "first".into()
2869 );
2870 assert!(event.as_log().get_timestamp().is_some());
2871 assert_eq!(
2872 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2873 "splunk_hec".into()
2874 );
2875 })
2876 .await;
2877 }
2878
2879 #[tokio::test]
2880 async fn handles_non_utf8() {
2881 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2882 let message = b" {\"event\": { \"non\": \"A non UTF8 character \xE4\", \"number\": 2, \"bool\": true } } ";
2883 let (source, address, _guard) = source(None).await;
2884
2885 let b = reqwest::Client::new()
2886 .post(format!(
2887 "http://{}/{}",
2888 address, "services/collector/event"
2889 ))
2890 .header("Authorization", format!("Splunk {TOKEN}"))
2891 .body::<&[u8]>(message);
2892
2893 assert_eq!(200, b.send().await.unwrap().status().as_u16());
2894
2895 let event = collect_n(source, 1).await.remove(0);
2896 assert_eq!(event.as_log()["non"], "A non UTF8 character �".into());
2897 assert_eq!(event.as_log()["number"], 2.into());
2898 assert_eq!(event.as_log()["bool"], true.into());
2899 assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some());
2900 assert_eq!(
2901 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2902 "splunk_hec".into()
2903 );
2904 }).await;
2905 }
2906
2907 #[tokio::test]
2908 async fn default() {
2909 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2910 let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
2911 let (source, address, _guard) = source(None).await;
2912
2913 assert_eq!(
2914 200,
2915 post(address, "services/collector/event", message).await
2916 );
2917
2918 let events = collect_n(source, 3).await;
2919
2920 assert_eq!(
2921 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
2922 "first".into()
2923 );
2924 assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
2925
2926 assert_eq!(
2927 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
2928 "second".into()
2929 );
2930 assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
2931
2932 assert_eq!(
2933 events[2].as_log()[log_schema().message_key().unwrap().to_string()],
2934 "third".into()
2935 );
2936 assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
2937 }).await;
2938 }
2939
2940 #[test]
2941 fn parse_timestamps() {
2942 let cases = vec![
2943 Utc::now(),
2944 Utc.with_ymd_and_hms(1971, 11, 7, 1, 1, 1)
2945 .single()
2946 .expect("invalid timestamp"),
2947 Utc.with_ymd_and_hms(2011, 8, 5, 1, 1, 1)
2948 .single()
2949 .expect("invalid timestamp"),
2950 Utc.with_ymd_and_hms(2189, 11, 4, 2, 2, 2)
2951 .single()
2952 .expect("invalid timestamp"),
2953 ];
2954
2955 for case in cases {
2956 let sec = case.timestamp();
2957 let millis = case.timestamp_millis();
2958 let nano = case.timestamp_nanos_opt().expect("Timestamp out of range");
2959
2960 assert_eq!(parse_timestamp(sec).unwrap().timestamp(), case.timestamp());
2961 assert_eq!(
2962 parse_timestamp(millis).unwrap().timestamp_millis(),
2963 case.timestamp_millis()
2964 );
2965 assert_eq!(
2966 parse_timestamp(nano)
2967 .unwrap()
2968 .timestamp_nanos_opt()
2969 .unwrap(),
2970 case.timestamp_nanos_opt().expect("Timestamp out of range")
2971 );
2972 }
2973
2974 assert!(parse_timestamp(-1).is_none());
2975 }
2976
2977 #[tokio::test]
2984 async fn host_test() {
2985 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2986 let message = "for the host";
2987 let (sink, source) = start(
2988 TextSerializerConfig::default().into(),
2989 Compression::gzip_default(),
2990 None,
2991 )
2992 .await;
2993
2994 let event = channel_n(vec![message], sink, source).await.remove(0);
2995
2996 assert_eq!(
2997 event.as_log()[log_schema().message_key().unwrap().to_string()],
2998 message.into()
2999 );
3000 assert!(
3001 event
3002 .as_log()
3003 .get((PathPrefix::Event, log_schema().host_key().unwrap()))
3004 .is_none()
3005 );
3006 })
3007 .await;
3008 }
3009
3010 #[derive(Deserialize)]
3011 struct HecAckEventResponse {
3012 text: String,
3013 code: u8,
3014 #[serde(rename = "ackId")]
3015 ack_id: u64,
3016 }
3017
3018 #[tokio::test]
3019 async fn ack_json_event() {
3020 let ack_config = HecAcknowledgementsConfig {
3021 enabled: Some(true),
3022 ..Default::default()
3023 };
3024 let (source, address, _guard) = source(Some(ack_config)).await;
3025 let event_message = r#"{"event":"first", "color": "blue"}{"event":"second"}"#;
3026 let opts = SendWithOpts {
3027 channel: Some(Channel::Header("guid")),
3028 forwarded_for: None,
3029 };
3030 let event_res = send_with_response(
3031 address,
3032 "services/collector/event",
3033 event_message,
3034 TOKEN,
3035 &opts,
3036 )
3037 .await
3038 .json::<HecAckEventResponse>()
3039 .await
3040 .unwrap();
3041 assert_eq!("Success", event_res.text.as_str());
3042 assert_eq!(0, event_res.code);
3043 _ = collect_n(source, 1).await;
3044
3045 let ack_message = serde_json::to_string(&HecAckStatusRequest {
3046 acks: vec![event_res.ack_id],
3047 })
3048 .unwrap();
3049 let ack_res = send_with_response(
3050 address,
3051 "services/collector/ack",
3052 ack_message.as_str(),
3053 TOKEN,
3054 &opts,
3055 )
3056 .await
3057 .json::<HecAckStatusResponse>()
3058 .await
3059 .unwrap();
3060 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
3061 }
3062
3063 #[tokio::test]
3064 async fn ack_raw_event() {
3065 let ack_config = HecAcknowledgementsConfig {
3066 enabled: Some(true),
3067 ..Default::default()
3068 };
3069 let (source, address, _guard) = source(Some(ack_config)).await;
3070 let event_message = "raw event message";
3071 let opts = SendWithOpts {
3072 channel: Some(Channel::Header("guid")),
3073 forwarded_for: None,
3074 };
3075 let event_res = send_with_response(
3076 address,
3077 "services/collector/raw",
3078 event_message,
3079 TOKEN,
3080 &opts,
3081 )
3082 .await
3083 .json::<HecAckEventResponse>()
3084 .await
3085 .unwrap();
3086 assert_eq!("Success", event_res.text.as_str());
3087 assert_eq!(0, event_res.code);
3088 _ = collect_n(source, 1).await;
3089
3090 let ack_message = serde_json::to_string(&HecAckStatusRequest {
3091 acks: vec![event_res.ack_id],
3092 })
3093 .unwrap();
3094 let ack_res = send_with_response(
3095 address,
3096 "services/collector/ack",
3097 ack_message.as_str(),
3098 TOKEN,
3099 &opts,
3100 )
3101 .await
3102 .json::<HecAckStatusResponse>()
3103 .await
3104 .unwrap();
3105 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
3106 }
3107
3108 #[tokio::test]
3109 async fn ack_repeat_ack_query() {
3110 let ack_config = HecAcknowledgementsConfig {
3111 enabled: Some(true),
3112 ..Default::default()
3113 };
3114 let (source, address, _guard) = source(Some(ack_config)).await;
3115 let event_message = "raw event message";
3116 let opts = SendWithOpts {
3117 channel: Some(Channel::Header("guid")),
3118 forwarded_for: None,
3119 };
3120 let event_res = send_with_response(
3121 address,
3122 "services/collector/raw",
3123 event_message,
3124 TOKEN,
3125 &opts,
3126 )
3127 .await
3128 .json::<HecAckEventResponse>()
3129 .await
3130 .unwrap();
3131 _ = collect_n(source, 1).await;
3132
3133 let ack_message = serde_json::to_string(&HecAckStatusRequest {
3134 acks: vec![event_res.ack_id],
3135 })
3136 .unwrap();
3137 let ack_res = send_with_response(
3138 address,
3139 "services/collector/ack",
3140 ack_message.as_str(),
3141 TOKEN,
3142 &opts,
3143 )
3144 .await
3145 .json::<HecAckStatusResponse>()
3146 .await
3147 .unwrap();
3148 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
3149
3150 let ack_res = send_with_response(
3151 address,
3152 "services/collector/ack",
3153 ack_message.as_str(),
3154 TOKEN,
3155 &opts,
3156 )
3157 .await
3158 .json::<HecAckStatusResponse>()
3159 .await
3160 .unwrap();
3161 assert!(!ack_res.acks.get(&event_res.ack_id).unwrap());
3162 }
3163
3164 #[tokio::test]
3165 async fn ack_exceed_max_number_of_ack_channels() {
3166 let ack_config = HecAcknowledgementsConfig {
3167 enabled: Some(true),
3168 max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
3169 ..Default::default()
3170 };
3171
3172 let (_source, address, _guard) = source(Some(ack_config)).await;
3173 let mut opts = SendWithOpts {
3174 channel: Some(Channel::Header("guid")),
3175 forwarded_for: None,
3176 };
3177 assert_eq!(
3178 200,
3179 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
3180 );
3181
3182 opts.channel = Some(Channel::Header("other-guid"));
3183 assert_eq!(
3184 503,
3185 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
3186 );
3187 assert_eq!(
3188 503,
3189 send_with(
3190 address,
3191 "services/collector/event",
3192 r#"{"event":"first"}"#,
3193 TOKEN,
3194 &opts
3195 )
3196 .await
3197 );
3198 }
3199
3200 #[tokio::test]
3201 async fn ack_exceed_max_pending_acks_per_channel() {
3202 let ack_config = HecAcknowledgementsConfig {
3203 enabled: Some(true),
3204 max_pending_acks_per_channel: NonZeroU64::new(1).unwrap(),
3205 ..Default::default()
3206 };
3207
3208 let (source, address, _guard) = source(Some(ack_config)).await;
3209 let opts = SendWithOpts {
3210 channel: Some(Channel::Header("guid")),
3211 forwarded_for: None,
3212 };
3213 for _ in 0..5 {
3214 send_with(
3215 address,
3216 "services/collector/event",
3217 r#"{"event":"first"}"#,
3218 TOKEN,
3219 &opts,
3220 )
3221 .await;
3222 }
3223 for _ in 0..5 {
3224 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await;
3225 }
3226 let event_res = send_with_response(
3227 address,
3228 "services/collector/event",
3229 r#"{"event":"this will be acked"}"#,
3230 TOKEN,
3231 &opts,
3232 )
3233 .await
3234 .json::<HecAckEventResponse>()
3235 .await
3236 .unwrap();
3237 _ = collect_n(source, 11).await;
3238
3239 let ack_message_dropped = serde_json::to_string(&HecAckStatusRequest {
3240 acks: (0..10).collect::<Vec<u64>>(),
3241 })
3242 .unwrap();
3243 let ack_res = send_with_response(
3244 address,
3245 "services/collector/ack",
3246 ack_message_dropped.as_str(),
3247 TOKEN,
3248 &opts,
3249 )
3250 .await
3251 .json::<HecAckStatusResponse>()
3252 .await
3253 .unwrap();
3254 assert!(ack_res.acks.values().all(|ack_status| !*ack_status));
3255
3256 let ack_message_acked = serde_json::to_string(&HecAckStatusRequest {
3257 acks: vec![event_res.ack_id],
3258 })
3259 .unwrap();
3260 let ack_res = send_with_response(
3261 address,
3262 "services/collector/ack",
3263 ack_message_acked.as_str(),
3264 TOKEN,
3265 &opts,
3266 )
3267 .await
3268 .json::<HecAckStatusResponse>()
3269 .await
3270 .unwrap();
3271 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
3272 }
3273
3274 #[tokio::test]
3275 async fn ack_service_accepts_parameterized_content_type() {
3276 let ack_config = HecAcknowledgementsConfig {
3277 enabled: Some(true),
3278 ..Default::default()
3279 };
3280 let (source, address, _guard) = source(Some(ack_config)).await;
3281 let opts = SendWithOpts {
3282 channel: Some(Channel::Header("guid")),
3283 forwarded_for: None,
3284 };
3285
3286 let event_res = send_with_response(
3287 address,
3288 "services/collector/event",
3289 r#"{"event":"param-test"}"#,
3290 TOKEN,
3291 &opts,
3292 )
3293 .await
3294 .json::<HecAckEventResponse>()
3295 .await
3296 .unwrap();
3297 let _ = collect_n(source, 1).await;
3298
3299 let body = serde_json::to_string(&HecAckStatusRequest {
3300 acks: vec![event_res.ack_id],
3301 })
3302 .unwrap();
3303
3304 let res = reqwest::Client::new()
3305 .post(format!("http://{address}/services/collector/ack"))
3306 .header("Authorization", format!("Splunk {TOKEN}"))
3307 .header("x-splunk-request-channel", "guid")
3308 .header("Content-Type", "application/json; some-random-text; hello")
3309 .body(body)
3310 .send()
3311 .await
3312 .unwrap();
3313
3314 assert_eq!(200, res.status().as_u16());
3315
3316 let _parsed: HecAckStatusResponse = res.json().await.unwrap();
3317 }
3318
3319 #[tokio::test]
3320 async fn event_service_acknowledgements_enabled_channel_required() {
3321 let message = r#"{"event":"first", "color": "blue"}"#;
3322 let ack_config = HecAcknowledgementsConfig {
3323 enabled: Some(true),
3324 ..Default::default()
3325 };
3326 let (_, address, _guard) = source(Some(ack_config)).await;
3327
3328 let opts = SendWithOpts {
3329 channel: None,
3330 forwarded_for: None,
3331 };
3332
3333 assert_eq!(
3334 400,
3335 send_with(address, "services/collector/event", message, TOKEN, &opts).await
3336 );
3337 }
3338
3339 #[tokio::test]
3340 async fn ack_service_acknowledgements_disabled() {
3341 let message = r#" {"acks":[0]} "#;
3342 let (_, address, _guard) = source(None).await;
3343
3344 let opts = SendWithOpts {
3345 channel: Some(Channel::Header("guid")),
3346 forwarded_for: None,
3347 };
3348
3349 assert_eq!(
3350 400,
3351 send_with(address, "services/collector/ack", message, TOKEN, &opts).await
3352 );
3353 }
3354
3355 async fn source_with_codec(
3356 event: CodecConfig,
3357 raw: CodecConfig,
3358 ) -> (
3359 impl Stream<Item = Event> + Unpin + use<>,
3360 SocketAddr,
3361 PortGuard,
3362 ) {
3363 let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
3364 let (_guard, address) = next_addr();
3365 let cx = SourceContext::new_test(sender, None);
3366 tokio::spawn(async move {
3367 SplunkConfig {
3368 address,
3369 token: Some(TOKEN.to_owned().into()),
3370 valid_tokens: None,
3371 tls: None,
3372 acknowledgements: Default::default(),
3373 store_hec_token: false,
3374 log_namespace: None,
3375 keepalive: Default::default(),
3376 event,
3377 raw,
3378 }
3379 .build(cx)
3380 .await
3381 .unwrap()
3382 .await
3383 .unwrap()
3384 });
3385 wait_for_tcp(address).await;
3386 (recv, address, _guard)
3387 }
3388
3389 fn codec_decoding(decoding: DeserializerConfig) -> CodecConfig {
3391 CodecConfig {
3392 framing: None,
3393 decoding: Some(decoding),
3394 }
3395 }
3396
3397 fn codec_full(
3399 framing: Option<FramingConfig>,
3400 decoding: Option<DeserializerConfig>,
3401 ) -> CodecConfig {
3402 CodecConfig { framing, decoding }
3403 }
3404
3405 #[tokio::test]
3406 async fn decoder_event_endpoint_json_string() {
3407 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3408 let (source, address, _guard) = source_with_codec(
3409 codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3410 CodecConfig::default(),
3411 )
3412 .await;
3413 let envelope =
3414 r#"{"event":"{\"foo\":\"bar\",\"n\":42}","host":"client-host","sourcetype":"my-app"}"#;
3415 assert_eq!(
3416 200,
3417 post(address, "services/collector/event", envelope).await
3418 );
3419
3420 let event = collect_n(source, 1).await.remove(0);
3421 let log = event.as_log();
3422 assert_eq!(log["foo"], "bar".into());
3423 assert_eq!(log["n"], 42.into());
3424 assert_eq!(
3425 log[log_schema().host_key().unwrap().to_string().as_str()],
3426 "client-host".into()
3427 );
3428 assert_eq!(log[&super::SOURCETYPE], "my-app".into());
3429 })
3430 .await;
3431 }
3432
3433 #[tokio::test]
3434 async fn decoder_event_endpoint_json_object_round_trip() {
3435 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3436 let (source, address, _guard) = source_with_codec(
3437 codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3438 CodecConfig::default(),
3439 )
3440 .await;
3441 let envelope = r#"{"event":{"foo":"bar","nested":{"k":1}},"host":"h"}"#;
3442 assert_eq!(
3443 200,
3444 post(address, "services/collector/event", envelope).await
3445 );
3446
3447 let event = collect_n(source, 1).await.remove(0);
3448 let log = event.as_log();
3449 assert_eq!(log["foo"], "bar".into());
3450 assert_eq!(*log.get("nested.k").unwrap(), 1.into());
3451 assert_eq!(
3452 log[log_schema().host_key().unwrap().to_string().as_str()],
3453 "h".into()
3454 );
3455 })
3456 .await;
3457 }
3458
3459 #[tokio::test]
3460 async fn decoder_event_endpoint_all_envelope_fields_yield_to_decoder() {
3461 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3466 let (source, address, _guard) = source_with_codec(
3467 codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3468 CodecConfig::default(),
3469 )
3470 .await;
3471 let envelope = r#"{
3475 "event":"{\"splunk_channel\":\"decoder-channel\",\"splunk_index\":\"decoder-index\",\"splunk_source\":\"decoder-source\",\"splunk_sourcetype\":\"decoder-sourcetype\"}",
3476 "index":"envelope-index",
3477 "source":"envelope-source",
3478 "sourcetype":"envelope-sourcetype"
3479 }"#;
3480 assert_eq!(
3481 200,
3482 post(address, "services/collector/event", envelope).await
3483 );
3484
3485 let event = collect_n(source, 1).await.remove(0);
3486 let log = event.as_log();
3487 assert_eq!(log[&super::CHANNEL], "decoder-channel".into());
3488 assert_eq!(log[&super::INDEX], "decoder-index".into());
3489 assert_eq!(log[&super::SOURCE], "decoder-source".into());
3490 assert_eq!(log[&super::SOURCETYPE], "decoder-sourcetype".into());
3491 })
3492 .await;
3493 }
3494
3495 #[tokio::test]
3496 async fn decoder_event_endpoint_decoder_field_wins_over_envelope() {
3497 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3498 let (source, address, _guard) = source_with_codec(
3499 codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3500 CodecConfig::default(),
3501 )
3502 .await;
3503 let envelope = r#"{"event":"{\"host\":\"decoder-host\"}","host":"envelope-host"}"#;
3506 assert_eq!(
3507 200,
3508 post(address, "services/collector/event", envelope).await
3509 );
3510
3511 let event = collect_n(source, 1).await.remove(0);
3512 let log = event.as_log();
3513 assert_eq!(
3514 log[log_schema().host_key().unwrap().to_string().as_str()],
3515 "decoder-host".into()
3516 );
3517 })
3518 .await;
3519 }
3520
3521 #[tokio::test]
3522 async fn decoder_event_endpoint_decode_failure_returns_200() {
3523 let (_source, address, _guard) = source_with_codec(
3527 codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3528 CodecConfig::default(),
3529 )
3530 .await;
3531 let envelope = r#"{"event":"not valid json {","host":"h"}"#;
3532 assert_eq!(
3533 200,
3534 post(address, "services/collector/event", envelope).await
3535 );
3536 }
3537
3538 #[tokio::test]
3539 async fn decoder_raw_endpoint_newline_delimited() {
3540 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3541 let (source, address, _guard) = source_with_codec(
3542 CodecConfig::default(),
3543 codec_full(
3544 Some(FramingConfig::NewlineDelimited(Default::default())),
3545 Some(DeserializerConfig::Bytes),
3546 ),
3547 )
3548 .await;
3549 let body = "line1\nline2\nline3";
3550 assert_eq!(200, post(address, "services/collector/raw", body).await);
3551
3552 let events = collect_n(source, 3).await;
3553 assert_eq!(events.len(), 3);
3554 let messages: Vec<String> = events
3555 .iter()
3556 .map(|e| {
3557 e.as_log()[log_schema().message_key().unwrap().to_string()]
3558 .to_string_lossy()
3559 .into_owned()
3560 })
3561 .collect();
3562 assert!(messages.contains(&"line1".to_string()));
3563 assert!(messages.contains(&"line2".to_string()));
3564 assert!(messages.contains(&"line3".to_string()));
3565
3566 for event in &events {
3568 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
3569 }
3570 })
3571 .await;
3572 }
3573
3574 #[tokio::test]
3575 async fn decoder_event_endpoint_envelope_without_time_has_fallback_timestamp() {
3576 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3579 let (source, address, _guard) = source_with_codec(
3580 codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3581 CodecConfig::default(),
3582 )
3583 .await;
3584 let envelope = r#"{"event":"{\"foo\":\"bar\"}"}"#;
3585 assert_eq!(
3586 200,
3587 post(address, "services/collector/event", envelope).await
3588 );
3589
3590 let event = collect_n(source, 1).await.remove(0);
3591 assert!(
3592 event.as_log().get_timestamp().is_some(),
3593 "decoded event from envelope without `time` field is missing a timestamp"
3594 );
3595 })
3596 .await;
3597 }
3598
3599 #[tokio::test]
3600 async fn decoder_independent_per_endpoint_codecs() {
3601 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3606 let (source, address, _guard) = source_with_codec(
3607 codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3608 codec_full(
3609 Some(FramingConfig::NewlineDelimited(Default::default())),
3610 Some(DeserializerConfig::Bytes),
3611 ),
3612 )
3613 .await;
3614
3615 assert_eq!(
3617 200,
3618 post(
3619 address,
3620 "services/collector/event",
3621 r#"{"event":"{\"foo\":\"bar\"}"}"#
3622 )
3623 .await
3624 );
3625 assert_eq!(
3627 200,
3628 post(address, "services/collector/raw", "a\nb\nc").await
3629 );
3630
3631 let events = collect_n(source, 4).await;
3632 assert_eq!(events.len(), 4);
3633
3634 let event_log = events
3636 .iter()
3637 .find(|e| e.as_log().contains("foo"))
3638 .expect("expected /event request to produce a log with `foo` set");
3639 assert_eq!(event_log.as_log()["foo"], "bar".into());
3640
3641 let raw_messages: Vec<String> = events
3643 .iter()
3644 .filter(|e| !e.as_log().contains("foo"))
3645 .map(|e| {
3646 e.as_log()[log_schema().message_key().unwrap().to_string()]
3647 .to_string_lossy()
3648 .into_owned()
3649 })
3650 .collect();
3651 assert_eq!(raw_messages.len(), 3);
3652 assert!(raw_messages.contains(&"a".to_string()));
3653 assert!(raw_messages.contains(&"b".to_string()));
3654 assert!(raw_messages.contains(&"c".to_string()));
3655 })
3656 .await;
3657 }
3658
3659 #[tokio::test]
3665 async fn decoder_vrl_reads_envelope_metadata() {
3666 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3667 let vrl_source = r#"
3668 # Read envelope metadata injected before this VRL program runs.
3669 .envelope_host = string!(%splunk_hec.host)
3670 .envelope_sourcetype = string!(%splunk_hec.sourcetype)
3671
3672 # Decode the inner JSON payload (the bytes of the `event` string).
3673 . = merge!(parse_json!(string!(.message)), .)
3674 "#;
3675
3676 let event_codec = codec_decoding(
3677 DeserializerConfig::Vrl(VrlDeserializerConfig {
3678 vrl: VrlDeserializerOptions {
3679 source: vrl_source.into(),
3680 timezone: None,
3681 },
3682 }),
3683 );
3684
3685 let (source, address, _guard) =
3686 source_with_codec(event_codec, CodecConfig::default()).await;
3687
3688 let payload = r#"{"event":"{\"level\":\"info\",\"msg\":\"hello\"}","host":"splunk-host","sourcetype":"my-app"}"#;
3691 assert_eq!(
3692 200,
3693 post(address, "services/collector/event", payload).await
3694 );
3695
3696 let event = collect_n(source, 1).await.remove(0);
3697 let log = event.as_log();
3698
3699 assert_eq!(log["level"], "info".into());
3701 assert_eq!(log["msg"], "hello".into());
3702
3703 assert_eq!(log["envelope_host"], "splunk-host".into());
3705 assert_eq!(log["envelope_sourcetype"], "my-app".into());
3706
3707 assert_eq!(
3709 log[log_schema().host_key().unwrap().to_string().as_str()],
3710 "splunk-host".into()
3711 );
3712 assert_eq!(log[&super::SOURCETYPE], "my-app".into());
3713 })
3714 .await;
3715 }
3716
3717 #[tokio::test]
3718 async fn decoder_raw_endpoint_event_has_fallback_timestamp() {
3719 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3722 let (source, address, _guard) = source_with_codec(
3723 CodecConfig::default(),
3724 codec_full(None, Some(DeserializerConfig::Bytes)),
3725 )
3726 .await;
3727 let body = "hello";
3728 assert_eq!(200, post(address, "services/collector/raw", body).await);
3729
3730 let event = collect_n(source, 1).await.remove(0);
3731 assert!(
3732 event.as_log().get_timestamp().is_some(),
3733 "decoded /raw event is missing a timestamp"
3734 );
3735 })
3736 .await;
3737 }
3738
3739 #[tokio::test]
3740 async fn decoder_raw_endpoint_empty_decode_does_not_ack() {
3741 let ack_config = HecAcknowledgementsConfig {
3746 enabled: Some(true),
3747 ..Default::default()
3748 };
3749 let (sender, _recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
3750 let (_guard, address) = next_addr();
3751 let cx = SourceContext::new_test(sender, None);
3752 tokio::spawn(async move {
3753 SplunkConfig {
3754 address,
3755 token: Some(TOKEN.to_owned().into()),
3756 valid_tokens: None,
3757 tls: None,
3758 acknowledgements: ack_config,
3759 store_hec_token: false,
3760 log_namespace: None,
3761 keepalive: Default::default(),
3762 event: CodecConfig::default(),
3763 raw: codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3764 }
3765 .build(cx)
3766 .await
3767 .unwrap()
3768 .await
3769 .unwrap()
3770 });
3771 wait_for_tcp(address).await;
3772
3773 let opts = SendWithOpts {
3774 channel: Some(Channel::Header("guid")),
3775 forwarded_for: None,
3776 };
3777 let body = "not json {";
3779 let response = send_with_response(address, "services/collector/raw", body, TOKEN, &opts)
3780 .await
3781 .json::<serde_json::Value>()
3782 .await
3783 .unwrap();
3784
3785 assert_eq!(response["code"].as_u64(), Some(0), "response: {response:?}");
3786 assert!(
3787 response.get("ackId").is_none(),
3788 "expected no ackId in response when decoder produced zero events, got: {response:?}"
3789 );
3790 }
3791
3792 #[tokio::test]
3793 async fn decoder_raw_endpoint_partial_decode_does_not_ack() {
3794 let ack_config = HecAcknowledgementsConfig {
3799 enabled: Some(true),
3800 ..Default::default()
3801 };
3802 let (sender, _recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
3803 let (_guard, address) = next_addr();
3804 let cx = SourceContext::new_test(sender, None);
3805 tokio::spawn(async move {
3806 SplunkConfig {
3807 address,
3808 token: Some(TOKEN.to_owned().into()),
3809 valid_tokens: None,
3810 tls: None,
3811 acknowledgements: ack_config,
3812 store_hec_token: false,
3813 log_namespace: None,
3814 keepalive: Default::default(),
3815 event: CodecConfig::default(),
3816 raw: codec_full(
3817 Some(FramingConfig::NewlineDelimited(Default::default())),
3818 Some(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3819 ),
3820 }
3821 .build(cx)
3822 .await
3823 .unwrap()
3824 .await
3825 .unwrap()
3826 });
3827 wait_for_tcp(address).await;
3828
3829 let opts = SendWithOpts {
3830 channel: Some(Channel::Header("guid")),
3831 forwarded_for: None,
3832 };
3833 let body = "{\"valid\":1}\nnot json\n{\"valid\":2}";
3835 let response = send_with_response(address, "services/collector/raw", body, TOKEN, &opts)
3836 .await
3837 .json::<serde_json::Value>()
3838 .await
3839 .unwrap();
3840
3841 assert_eq!(response["code"].as_u64(), Some(0), "response: {response:?}");
3842 assert!(
3843 response.get("ackId").is_none(),
3844 "expected no ackId when the decoder dropped a frame mid-request, got: {response:?}"
3845 );
3846 }
3847
3848 #[tokio::test]
3849 async fn decoder_event_endpoint_error_index_matches_envelope_not_fanout() {
3850 let (source, address, _guard) = source_with_codec(
3854 codec_full(
3855 Some(FramingConfig::NewlineDelimited(Default::default())),
3856 Some(DeserializerConfig::Bytes),
3857 ),
3858 CodecConfig::default(),
3859 )
3860 .await;
3861 let body = "{\"event\":\"a\\nb\\nc\"}{\"foo\":\"bar\"}";
3865
3866 let opts = SendWithOpts {
3867 channel: Some(Channel::Header("guid")),
3868 forwarded_for: None,
3869 };
3870 let response =
3871 send_with_response(address, "services/collector/event", body, TOKEN, &opts).await;
3872 let status = response.status();
3873 let body: serde_json::Value = response.json().await.unwrap();
3874
3875 assert_eq!(status.as_u16(), 400, "body: {body:?}");
3876 assert_eq!(
3877 body["invalid-event-number"].as_u64(),
3878 Some(1),
3879 "expected envelope index 1 (the failing envelope), not a fan-out event index. body: {body:?}"
3880 );
3881 let _ = collect_n(source, 3).await;
3883 }
3884
3885 #[test]
3886 fn output_schema_definition_with_decoder_vector_namespace() {
3887 let config = SplunkConfig {
3888 log_namespace: Some(true),
3889 event: codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3890 ..Default::default()
3891 };
3892 let definition = config
3893 .outputs(LogNamespace::Vector)
3894 .remove(0)
3895 .schema_definition(true);
3896
3897 let expected_definition =
3902 Definition::new_with_default_metadata(Kind::json(), [LogNamespace::Vector])
3903 .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
3904 .with_metadata_field(
3905 &owned_value_path!("vector", "source_type"),
3906 Kind::bytes(),
3907 None,
3908 )
3909 .with_metadata_field(
3910 &owned_value_path!("vector", "ingest_timestamp"),
3911 Kind::timestamp(),
3912 None,
3913 )
3914 .with_metadata_field(
3915 &owned_value_path!("splunk_hec", "host"),
3916 Kind::bytes(),
3917 Some("host"),
3918 )
3919 .with_metadata_field(
3920 &owned_value_path!("splunk_hec", "index"),
3921 Kind::bytes(),
3922 None,
3923 )
3924 .with_metadata_field(
3925 &owned_value_path!("splunk_hec", "source"),
3926 Kind::bytes(),
3927 Some("service"),
3928 )
3929 .with_metadata_field(
3930 &owned_value_path!("splunk_hec", "channel"),
3931 Kind::bytes(),
3932 None,
3933 )
3934 .with_metadata_field(
3935 &owned_value_path!("splunk_hec", "sourcetype"),
3936 Kind::bytes(),
3937 None,
3938 );
3939
3940 assert_eq!(definition, Some(expected_definition));
3941 }
3942
3943 #[test]
3944 fn output_schema_definition_vector_namespace() {
3945 let config = SplunkConfig {
3946 log_namespace: Some(true),
3947 ..Default::default()
3948 };
3949
3950 let definition = config
3951 .outputs(LogNamespace::Vector)
3952 .remove(0)
3953 .schema_definition(true);
3954
3955 let expected_definition = Definition::new_with_default_metadata(
3956 Kind::object(Collection::empty()).or_bytes(),
3957 [LogNamespace::Vector],
3958 )
3959 .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
3960 .with_metadata_field(
3961 &owned_value_path!("vector", "source_type"),
3962 Kind::bytes(),
3963 None,
3964 )
3965 .with_metadata_field(
3966 &owned_value_path!("vector", "ingest_timestamp"),
3967 Kind::timestamp(),
3968 None,
3969 )
3970 .with_metadata_field(
3971 &owned_value_path!("splunk_hec", "host"),
3972 Kind::bytes(),
3973 Some("host"),
3974 )
3975 .with_metadata_field(
3976 &owned_value_path!("splunk_hec", "index"),
3977 Kind::bytes(),
3978 None,
3979 )
3980 .with_metadata_field(
3981 &owned_value_path!("splunk_hec", "source"),
3982 Kind::bytes(),
3983 Some("service"),
3984 )
3985 .with_metadata_field(
3986 &owned_value_path!("splunk_hec", "channel"),
3987 Kind::bytes(),
3988 None,
3989 )
3990 .with_metadata_field(
3991 &owned_value_path!("splunk_hec", "sourcetype"),
3992 Kind::bytes(),
3993 None,
3994 );
3995
3996 assert_eq!(definition, Some(expected_definition));
3997 }
3998
3999 #[test]
4000 fn output_schema_definition_legacy_namespace() {
4001 let config = SplunkConfig::default();
4002 let definitions = config
4003 .outputs(LogNamespace::Legacy)
4004 .remove(0)
4005 .schema_definition(true);
4006
4007 let expected_definition = Definition::new_with_default_metadata(
4008 Kind::object(Collection::empty()),
4009 [LogNamespace::Legacy],
4010 )
4011 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
4012 .with_event_field(
4013 &owned_value_path!("message"),
4014 Kind::bytes().or_undefined(),
4015 Some("message"),
4016 )
4017 .with_event_field(
4018 &owned_value_path!("line"),
4019 Kind::array(Collection::empty())
4020 .or_object(Collection::empty())
4021 .or_undefined(),
4022 None,
4023 )
4024 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
4025 .with_event_field(&owned_value_path!("splunk_channel"), Kind::bytes(), None)
4026 .with_event_field(&owned_value_path!("splunk_index"), Kind::bytes(), None)
4027 .with_event_field(
4028 &owned_value_path!("splunk_source"),
4029 Kind::bytes(),
4030 Some("service"),
4031 )
4032 .with_event_field(&owned_value_path!("splunk_sourcetype"), Kind::bytes(), None)
4033 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
4034
4035 assert_eq!(definitions, Some(expected_definition));
4036 }
4037
4038 impl ValidatableComponent for SplunkConfig {
4039 fn validation_configuration() -> ValidationConfiguration {
4040 let config = Self {
4041 address: default_socket_address(),
4042 ..Default::default()
4043 };
4044
4045 let listen_addr_http = format!("http://{}/services/collector/event", config.address);
4046 let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
4047
4048 let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into();
4049 let framing = BytesDecoderConfig::new().into();
4050 let decoding = DeserializerConfig::Json(Default::default());
4051
4052 let external_resource = ExternalResource::new(
4053 ResourceDirection::Push,
4054 HttpResourceConfig::from_parts(uri, None).with_headers(HashMap::from([(
4055 X_SPLUNK_REQUEST_CHANNEL.to_string(),
4056 "channel".to_string(),
4057 )])),
4058 DecodingConfig::new(framing, decoding, false.into()),
4059 );
4060
4061 ValidationConfiguration::from_source(
4062 Self::NAME,
4063 log_namespace,
4064 vec![ComponentTestCaseConfig::from_source(
4065 config,
4066 None,
4067 Some(external_resource),
4068 )],
4069 )
4070 }
4071 }
4072
4073 register_validatable_component!(SplunkConfig);
4074}