diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index a9050b9d6080..1641c5e17bfe 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -21,7 +21,11 @@ local batch_processor = require("apisix.utils.batch-processor") local pairs = pairs local type = type local table = table +local ipairs = ipairs local plugin_name = "kafka-logger" +local stale_timer_running = false; +local timer_at = ngx.timer.at +local tostring = tostring local ngx = ngx local buffers = {} @@ -90,6 +94,22 @@ local function send_kafka_data(conf, log_message) end +local function remove_stale_objects(premature) + if premature then + return + end + + for key, batch in ipairs(buffers) do + if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then + core.log.debug("removing batch processor stale object, route id:", tostring(key)) + buffers[key] = nil + end + end + + stale_timer_running = false +end + + function _M.log(conf) local entry = log_util.get_full_log(ngx) @@ -100,6 +120,12 @@ function _M.log(conf) local log_buffer = buffers[entry.route_id] + if not stale_timer_running then + -- run the timer every 30 mins if any log is present + timer_at(1800, remove_stale_objects) + stale_timer_running = true + end + if log_buffer then log_buffer:push(entry) return