File tree Expand file tree Collapse file tree 1 file changed +9
-5
lines changed
crates/router/src/services Expand file tree Collapse file tree 1 file changed +9
-5
lines changed Original file line number Diff line number Diff line change @@ -319,11 +319,15 @@ impl KafkaProducer {
319
319
BaseRecord :: to ( topic)
320
320
. key ( & event. key ( ) )
321
321
. payload ( & event. value ( ) ?)
322
- . timestamp (
323
- event
324
- . creation_timestamp ( )
325
- . unwrap_or_else ( || OffsetDateTime :: now_utc ( ) . unix_timestamp ( ) * 1_000 ) ,
326
- ) ,
322
+ . timestamp ( event. creation_timestamp ( ) . unwrap_or_else ( || {
323
+ ( OffsetDateTime :: now_utc ( ) . unix_timestamp_nanos ( ) / 1_000_000 )
324
+ . try_into ( )
325
+ . unwrap_or_else ( |_| {
326
+ // kafka producer accepts milliseconds
327
+ // try converting nanos to millis if that fails convert seconds to millis
328
+ OffsetDateTime :: now_utc ( ) . unix_timestamp ( ) * 1_000
329
+ } )
330
+ } ) ) ,
327
331
)
328
332
. map_err ( |( error, record) | report ! ( error) . attach_printable ( format ! ( "{record:?}" ) ) )
329
333
. change_context ( KafkaError :: GenericError )
You can’t perform that action at this time.
0 commit comments