Skip to content

Commit fa59f20

Browse files
author
Ayeshmantha Perera
authored
optimize: use buffer for plugin syslog. (#1551)
1 parent a328fcc commit fa59f20

File tree

1 file changed

+71
-13
lines changed

1 file changed

+71
-13
lines changed

apisix/plugins/syslog.lua

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,28 @@
1616
--
1717
local core = require("apisix.core")
1818
local log_util = require("apisix.utils.log-util")
19+
local batch_processor = require("apisix.utils.batch-processor")
1920
local logger_socket = require("resty.logger.socket")
2021
local plugin_name = "syslog"
2122
local ngx = ngx
23+
local buffers = {}
2224

2325
local schema = {
2426
type = "object",
2527
properties = {
2628
host = {type = "string"},
2729
port = {type = "integer"},
30+
name = {type = "string", default = "sys logger"},
2831
flush_limit = {type = "integer", minimum = 1, default = 4096},
2932
drop_limit = {type = "integer", default = 1048576},
3033
timeout = {type = "integer", minimum = 1, default = 3},
3134
sock_type = {type = "string", default = "tcp"},
32-
max_retry_times = {type = "integer", minimum = 1, default = 3},
33-
retry_interval = {type = "integer", minimum = 10, default = 100},
35+
max_retry_times = {type = "integer", minimum = 1, default = 1},
36+
retry_interval = {type = "integer", minimum = 0, default = 1},
3437
pool_size = {type = "integer", minimum = 5, default = 5},
3538
tls = {type = "boolean", default = false},
39+
batch_max_size = {type = "integer", minimum = 1, default = 1000},
40+
buffer_duration = {type = "integer", minimum = 1, default = 60},
3641
},
3742
required = {"host", "port"}
3843
}
@@ -59,14 +64,9 @@ function _M.flush_syslog(logger)
5964
end
6065
end
6166

62-
-- log phase in APISIX
63-
function _M.log(conf)
64-
local entry = log_util.get_full_log(ngx)
65-
66-
if not entry.route_id then
67-
core.log.error("failed to obtain the route id for sys logger")
68-
return
69-
end
67+
local function send_syslog_data(conf, log_message)
68+
local err_msg
69+
local res = true
7070

7171
-- fetch api_ctx
7272
local api_ctx = ngx.ctx.api_ctx
@@ -91,14 +91,72 @@ function _M.log(conf)
9191
})
9292

9393
if not logger then
94-
core.log.error("failed when initiating the sys logger processor", err)
94+
res = false
95+
err_msg = "failed when initiating the sys logger processor".. err
9596
end
9697

9798
-- reuse the logger object
98-
local ok, err = logger:log(core.json.encode(entry))
99+
local ok, err = logger:log(core.json.encode(log_message))
99100
if not ok then
100-
core.log.error("failed to log message", err)
101+
res = false
102+
err_msg = "failed to log message" .. err
103+
end
104+
105+
return res, err_msg
106+
end
107+
108+
-- log phase in APISIX
109+
function _M.log(conf)
110+
local entry = log_util.get_full_log(ngx)
111+
112+
if not entry.route_id then
113+
core.log.error("failed to obtain the route id for sys logger")
114+
return
101115
end
116+
117+
local log_buffer = buffers[entry.route_id]
118+
119+
if log_buffer then
120+
log_buffer:push(entry)
121+
return
122+
end
123+
124+
-- Generate a function to be executed by the batch processor
125+
local func = function(entries, batch_max_size)
126+
local data, err
127+
if batch_max_size == 1 then
128+
data, err = core.json.encode(entries[1]) -- encode as single {}
129+
else
130+
data, err = core.json.encode(entries) -- encode as array [{}]
131+
end
132+
133+
if not data then
134+
return false, 'error occurred while encoding the data: ' .. err
135+
end
136+
137+
return send_syslog_data(conf, data)
138+
end
139+
140+
local config = {
141+
name = conf.name,
142+
retry_delay = conf.retry_interval,
143+
batch_max_size = conf.batch_max_size,
144+
max_retry_count = conf.max_retry_times,
145+
buffer_duration = conf.buffer_duration,
146+
inactive_timeout = conf.timeout,
147+
}
148+
149+
local err
150+
log_buffer, err = batch_processor:new(func, config)
151+
152+
if not log_buffer then
153+
core.log.error("error when creating the batch processor: ", err)
154+
return
155+
end
156+
157+
buffers[entry.route_id] = log_buffer
158+
log_buffer:push(entry)
159+
102160
end
103161

104162
return _M

0 commit comments

Comments
 (0)