File tree Expand file tree Collapse file tree 1 file changed +26
-0
lines changed Expand file tree Collapse file tree 1 file changed +26
-0
lines changed Original file line number Diff line number Diff line change @@ -21,7 +21,11 @@ local batch_processor = require("apisix.utils.batch-processor")
21
21
local pairs = pairs
22
22
local type = type
23
23
local table = table
24
+ local ipairs = ipairs
24
25
local plugin_name = " kafka-logger"
26
+ local stale_timer_running = false ;
27
+ local timer_at = ngx .timer .at
28
+ local tostring = tostring
25
29
local ngx = ngx
26
30
local buffers = {}
27
31
@@ -90,6 +94,22 @@ local function send_kafka_data(conf, log_message)
90
94
end
91
95
92
96
97
+ local function remove_stale_objects (premature )
98
+ if premature then
99
+ return
100
+ end
101
+
102
+ for key , batch in ipairs (buffers ) do
103
+ if # batch .entry_buffer .entries == 0 and # batch .batch_to_process == 0 then
104
+ core .log .debug (" removing batch processor stale object, route id:" , tostring (key ))
105
+ buffers [key ] = nil
106
+ end
107
+ end
108
+
109
+ stale_timer_running = false
110
+ end
111
+
112
+
93
113
function _M .log (conf )
94
114
local entry = log_util .get_full_log (ngx )
95
115
@@ -100,6 +120,12 @@ function _M.log(conf)
100
120
101
121
local log_buffer = buffers [entry .route_id ]
102
122
123
+ if not stale_timer_running then
124
+ -- run the timer every 30 mins if any log is present
125
+ timer_at (1800 , remove_stale_objects )
126
+ stale_timer_running = true
127
+ end
128
+
103
129
if log_buffer then
104
130
log_buffer :push (entry )
105
131
return
You can’t perform that action at this time.
0 commit comments