Skip to content

Commit c9dbb56

Browse files
authored
feat(events): add metadata info to events (#4875)
1 parent 9903119 commit c9dbb56

File tree

5 files changed

+39
-10
lines changed

5 files changed

+39
-10
lines changed

crates/events/src/lib.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ pub trait Event: EventInfo {
4949

5050
/// The class/type of the event. This is used to group/categorize events together.
5151
fn class(&self) -> Self::EventType;
52+
53+
/// Metadata associated with the event
54+
fn metadata(&self) -> HashMap<String, String> {
55+
HashMap::new()
56+
}
5257
}
5358

5459
/// Hold the context information for any events
@@ -73,7 +78,8 @@ where
7378
event: E,
7479
}
7580

76-
struct RawEvent<T, A: Event<EventType = T>>(HashMap<String, Value>, A);
81+
/// A flattened event that flattens the context provided to it along with the actual event.
82+
struct FlatMapEvent<T, A: Event<EventType = T>>(HashMap<String, Value>, A);
7783

7884
impl<T, A, E, D> EventBuilder<T, A, E, D>
7985
where
@@ -109,12 +115,13 @@ where
109115
/// Emit the event.
110116
pub fn try_emit(self) -> Result<(), EventsError> {
111117
let ts = self.event.timestamp();
118+
let metadata = self.event.metadata();
112119
self.message_sink
113-
.send_message(RawEvent(self.metadata, self.event), ts)
120+
.send_message(FlatMapEvent(self.metadata, self.event), metadata, ts)
114121
}
115122
}
116123

117-
impl<T, A> Serialize for RawEvent<T, A>
124+
impl<T, A> Serialize for FlatMapEvent<T, A>
118125
where
119126
A: Event<EventType = T>,
120127
{
@@ -236,7 +243,12 @@ pub trait MessagingInterface {
236243
/// The type of the event used for categorization by the event publisher.
237244
type MessageClass;
238245
/// Send a message that follows the defined message class.
239-
fn send_message<T>(&self, data: T, timestamp: PrimitiveDateTime) -> Result<(), EventsError>
246+
fn send_message<T>(
247+
&self,
248+
data: T,
249+
metadata: HashMap<String, String>,
250+
timestamp: PrimitiveDateTime,
251+
) -> Result<(), EventsError>
240252
where
241253
T: Message<Class = Self::MessageClass> + ErasedMaskSerialize;
242254
}
@@ -252,7 +264,7 @@ pub trait Message {
252264
fn identifier(&self) -> String;
253265
}
254266

255-
impl<T, A> Message for RawEvent<T, A>
267+
impl<T, A> Message for FlatMapEvent<T, A>
256268
where
257269
A: Event<EventType = T>,
258270
{

crates/router/src/events.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashMap;
2+
13
use error_stack::ResultExt;
24
use events::{EventsError, Message, MessagingInterface};
35
use hyperswitch_domain_models::errors::{StorageError, StorageResult};
@@ -94,14 +96,15 @@ impl MessagingInterface for EventsHandler {
9496
fn send_message<T>(
9597
&self,
9698
data: T,
99+
metadata: HashMap<String, String>,
97100
timestamp: PrimitiveDateTime,
98101
) -> error_stack::Result<(), EventsError>
99102
where
100103
T: Message<Class = Self::MessageClass> + ErasedMaskSerialize,
101104
{
102105
match self {
103-
Self::Kafka(a) => a.send_message(data, timestamp),
104-
Self::Logs(a) => a.send_message(data, timestamp),
106+
Self::Kafka(a) => a.send_message(data, metadata, timestamp),
107+
Self::Logs(a) => a.send_message(data, metadata, timestamp),
105108
}
106109
}
107110
}

crates/router/src/events/event_logger.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashMap;
2+
13
use events::{EventsError, Message, MessagingInterface};
24
use masking::ErasedMaskSerialize;
35
use time::PrimitiveDateTime;
@@ -21,12 +23,13 @@ impl MessagingInterface for EventLogger {
2123
fn send_message<T>(
2224
&self,
2325
data: T,
26+
metadata: HashMap<String, String>,
2427
_timestamp: PrimitiveDateTime,
2528
) -> error_stack::Result<(), EventsError>
2629
where
2730
T: Message<Class = Self::MessageClass> + ErasedMaskSerialize,
2831
{
29-
logger::info!(event =? data.masked_serialize().unwrap_or_else(|e| serde_json::json!({"error": e.to_string()})), event_type =? data.get_message_class(), event_id =? data.identifier(), log_type =? "event");
32+
logger::info!(event =? data.masked_serialize().unwrap_or_else(|e| serde_json::json!({"error": e.to_string()})), event_type =? data.get_message_class(), event_id =? data.identifier(), log_type =? "event", metadata = ?metadata);
3033
Ok(())
3134
}
3235
}

crates/router/src/middleware.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ where
157157
payment_method = Empty,
158158
status_code = Empty,
159159
flow = "UNKNOWN",
160-
golden_log_line = Empty
160+
golden_log_line = Empty,
161+
tenant_id = "ta"
161162
)
162163
.or_current(),
163164
),

crates/router/src/services/kafka.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
use std::sync::Arc;
1+
use std::{collections::HashMap, sync::Arc};
22

33
use bigdecimal::ToPrimitive;
44
use common_utils::errors::CustomResult;
55
use error_stack::{report, ResultExt};
66
use events::{EventsError, Message, MessagingInterface};
77
use rdkafka::{
88
config::FromClientConfig,
9+
message::{Header, OwnedHeaders},
910
producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedProducer},
1011
};
1112
#[cfg(feature = "payouts")]
@@ -528,6 +529,7 @@ impl MessagingInterface for KafkaProducer {
528529
fn send_message<T>(
529530
&self,
530531
data: T,
532+
metadata: HashMap<String, String>,
531533
timestamp: PrimitiveDateTime,
532534
) -> error_stack::Result<(), EventsError>
533535
where
@@ -538,12 +540,20 @@ impl MessagingInterface for KafkaProducer {
538540
.masked_serialize()
539541
.and_then(|i| serde_json::to_vec(&i))
540542
.change_context(EventsError::SerializationError)?;
543+
let mut headers = OwnedHeaders::new();
544+
for (k, v) in metadata.iter() {
545+
headers = headers.insert(Header {
546+
key: k.as_str(),
547+
value: Some(v),
548+
});
549+
}
541550
self.producer
542551
.0
543552
.send(
544553
BaseRecord::to(topic)
545554
.key(&data.identifier())
546555
.payload(&json_data)
556+
.headers(headers)
547557
.timestamp(
548558
(timestamp.assume_utc().unix_timestamp_nanos() / 1_000_000)
549559
.to_i64()

0 commit comments

Comments
 (0)