diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog.lua index bbd53158b34b..270b719b1823 100644 --- a/apisix/plugins/syslog.lua +++ b/apisix/plugins/syslog.lua @@ -16,23 +16,28 @@ -- local core = require("apisix.core") local log_util = require("apisix.utils.log-util") +local batch_processor = require("apisix.utils.batch-processor") local logger_socket = require("resty.logger.socket") local plugin_name = "syslog" local ngx = ngx +local buffers = {} local schema = { type = "object", properties = { host = {type = "string"}, port = {type = "integer"}, + name = {type = "string", default = "sys logger"}, flush_limit = {type = "integer", minimum = 1, default = 4096}, drop_limit = {type = "integer", default = 1048576}, timeout = {type = "integer", minimum = 1, default = 3}, sock_type = {type = "string", default = "tcp"}, - max_retry_times = {type = "integer", minimum = 1, default = 3}, - retry_interval = {type = "integer", minimum = 10, default = 100}, + max_retry_times = {type = "integer", minimum = 1, default = 1}, + retry_interval = {type = "integer", minimum = 0, default = 1}, pool_size = {type = "integer", minimum = 5, default = 5}, tls = {type = "boolean", default = false}, + batch_max_size = {type = "integer", minimum = 1, default = 1000}, + buffer_duration = {type = "integer", minimum = 1, default = 60}, }, required = {"host", "port"} } @@ -59,14 +64,9 @@ function _M.flush_syslog(logger) end end --- log phase in APISIX -function _M.log(conf) - local entry = log_util.get_full_log(ngx) - - if not entry.route_id then - core.log.error("failed to obtain the route id for sys logger") - return - end +local function send_syslog_data(conf, log_message) + local err_msg + local res = true -- fetch api_ctx local api_ctx = ngx.ctx.api_ctx @@ -91,14 +91,72 @@ function _M.log(conf) }) if not logger then - core.log.error("failed when initiating the sys logger processor", err) + res = false + err_msg = "failed when initiating the sys logger processor".. err end -- reuse the logger object - local ok, err = logger:log(core.json.encode(entry)) + local ok, err = logger:log(core.json.encode(log_message)) if not ok then - core.log.error("failed to log message", err) + res = false + err_msg = "failed to log message" .. err + end + + return res, err_msg +end + +-- log phase in APISIX +function _M.log(conf) + local entry = log_util.get_full_log(ngx) + + if not entry.route_id then + core.log.error("failed to obtain the route id for sys logger") + return end + + local log_buffer = buffers[entry.route_id] + + if log_buffer then + log_buffer:push(entry) + return + end + + -- Generate a function to be executed by the batch processor + local func = function(entries, batch_max_size) + local data, err + if batch_max_size == 1 then + data, err = core.json.encode(entries[1]) -- encode as single {} + else + data, err = core.json.encode(entries) -- encode as array [{}] + end + + if not data then + return false, 'error occurred while encoding the data: ' .. err + end + + return send_syslog_data(conf, data) + end + + local config = { + name = conf.name, + retry_delay = conf.retry_interval, + batch_max_size = conf.batch_max_size, + max_retry_count = conf.max_retry_times, + buffer_duration = conf.buffer_duration, + inactive_timeout = conf.timeout, + } + + local err + log_buffer, err = batch_processor:new(func, config) + + if not log_buffer then + core.log.error("error when creating the batch processor: ", err) + return + end + + buffers[entry.route_id] = log_buffer + log_buffer:push(entry) + end return _M