Skip to content

Commit 98bb593

Browse files
authored
feature: implement ewma balancer for upstream node (#2001)
ewma is a different balancing implementation that will generate a weight for every backend IP based on the last server response time, basically it tries to dispatch more requests to the backends that reply faster, supposing that they are less loaded. fix #1996
1 parent a3f865a commit 98bb593

File tree

10 files changed

+443
-4
lines changed

10 files changed

+443
-4
lines changed

LICENSE

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,3 +199,14 @@
199199
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
200200
See the License for the specific language governing permissions and
201201
limitations under the License.
202+
203+
=======================================================================
204+
Apache ApiSix Subcomponents:
205+
206+
The Apache ApiSix project contains subcomponents with separate copyright
207+
notices and license terms. Your use of the source code for the these
208+
subcomponents is subject to the terms and conditions of the following
209+
licenses.
210+
211+
ewma.lua file from kubernetes/ingress-nginx: https://github.com/kubernetes/ingress-nginx Apache 2.0
212+

apisix/balancer.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ local module_name = "balancer"
3030
local pickers = {
3131
roundrobin = require("apisix.balancer.roundrobin"),
3232
chash = require("apisix.balancer.chash"),
33+
ewma = require("apisix.balancer.ewma")
3334
}
3435

35-
3636
local lrucache_server_picker = core.lrucache.new({
3737
ttl = 300, count = 256
3838
})
@@ -245,7 +245,7 @@ local function pick_server(route, ctx)
245245
core.log.error("failed to parse server addr: ", server, " err: ", err)
246246
return core.response.exit(502)
247247
end
248-
248+
ctx.server_picker = server_picker
249249
return res
250250
end
251251

apisix/balancer/ewma.lua

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
18+
19+
local core = require("apisix.core")
20+
local ngx = ngx
21+
local ngx_shared = ngx.shared
22+
local ngx_now = ngx.now
23+
local math = math
24+
local pairs = pairs
25+
local next = next
26+
local tonumber = tonumber
27+
28+
local _M = {}
29+
local DECAY_TIME = 10 -- this value is in seconds
30+
31+
local shm_ewma = ngx_shared.balancer_ewma
32+
local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at
33+
34+
local lrucache_addr = core.lrucache.new({
35+
ttl = 300, count = 1024
36+
})
37+
local lrucache_trans_format = core.lrucache.new({
38+
ttl = 300, count = 256
39+
})
40+
41+
42+
local function decay_ewma(ewma, last_touched_at, rtt, now)
43+
local td = now - last_touched_at
44+
td = math.max(td, 0)
45+
local weight = math.exp(-td / DECAY_TIME)
46+
47+
ewma = ewma * weight + rtt * (1.0 - weight)
48+
return ewma
49+
end
50+
51+
52+
local function store_stats(upstream, ewma, now)
53+
local success, err, forcible = shm_last_touched_at:set(upstream, now)
54+
if not success then
55+
core.log.error("balancer_ewma_last_touched_at:set failed ", err)
56+
end
57+
if forcible then
58+
core.log.warn("balancer_ewma_last_touched_at:set valid items forcibly overwritten")
59+
end
60+
61+
success, err, forcible = shm_ewma:set(upstream, ewma)
62+
if not success then
63+
core.log.error("balancer_ewma:set failed ", err)
64+
end
65+
if forcible then
66+
core.log.warn("balancer_ewma:set valid items forcibly overwritten")
67+
end
68+
end
69+
70+
71+
local function get_or_update_ewma(upstream, rtt, update)
72+
local ewma = shm_ewma:get(upstream) or 0
73+
local now = ngx_now()
74+
local last_touched_at = shm_last_touched_at:get(upstream) or 0
75+
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
76+
77+
if not update then
78+
return ewma
79+
end
80+
81+
store_stats(upstream, ewma, now)
82+
83+
return ewma
84+
end
85+
86+
87+
local function score(upstream)
88+
-- Original implementation used names
89+
-- Endpoints don't have names, so passing in host:Port as key instead
90+
local upstream_name = upstream.host .. ":" .. upstream.port
91+
return get_or_update_ewma(upstream_name, 0, false)
92+
end
93+
94+
95+
local function pick_and_score(peers)
96+
local lowest_score_index = 1
97+
local lowest_score = score(peers[lowest_score_index])
98+
for i = 2, #peers do
99+
local new_score = score(peers[i])
100+
if new_score < lowest_score then
101+
lowest_score_index, lowest_score = i, new_score
102+
end
103+
end
104+
105+
return peers[lowest_score_index], lowest_score
106+
end
107+
108+
109+
local function parse_addr(addr)
110+
local host, port, err = core.utils.parse_addr(addr)
111+
return {host = host, port = port}, err
112+
end
113+
114+
115+
local function _trans_format(up_nodes)
116+
-- trans
117+
--{"1.2.3.4:80":100,"5.6.7.8:8080":100}
118+
-- into
119+
-- [{"host":"1.2.3.4","port":"80"},{"host":"5.6.7.8","port":"8080"}]
120+
local peers = {}
121+
local res, err
122+
123+
for addr, _ in pairs(up_nodes) do
124+
res, err = lrucache_addr(addr, nil, parse_addr, addr)
125+
if not err then
126+
core.table.insert(peers, res)
127+
else
128+
core.log.error('parse_addr error: ', addr, err)
129+
end
130+
end
131+
132+
return next(peers) and peers or nil
133+
end
134+
135+
136+
local function _ewma_find(ctx, up_nodes)
137+
local peers
138+
local endpoint
139+
140+
if not up_nodes
141+
or core.table.nkeys(up_nodes) == 0 then
142+
return nil, 'up_nodes empty'
143+
end
144+
145+
peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version,
146+
_trans_format, up_nodes)
147+
if not peers then
148+
return nil, 'up_nodes trans error'
149+
end
150+
151+
if #peers > 1 then
152+
endpoint = pick_and_score(peers)
153+
else
154+
endpoint = peers[1]
155+
end
156+
157+
return endpoint.host .. ":" .. endpoint.port
158+
end
159+
160+
161+
local function _ewma_after_balance(ctx)
162+
local response_time = tonumber(ctx.var.upstream_response_time) or 0
163+
local connect_time = tonumber(ctx.var.upstream_connect_time) or 0
164+
local rtt = connect_time + response_time
165+
local upstream = ctx.var.upstream_addr
166+
167+
if not upstream then
168+
return nil, "no upstream addr found"
169+
end
170+
171+
return get_or_update_ewma(upstream, rtt, true)
172+
end
173+
174+
175+
function _M.new(up_nodes, upstream)
176+
if not shm_ewma
177+
or not shm_last_touched_at then
178+
return nil, "dictionary not find"
179+
end
180+
181+
return {
182+
upstream = upstream,
183+
get = function (ctx)
184+
return _ewma_find(ctx, up_nodes)
185+
end,
186+
after_balance = _ewma_after_balance
187+
}
188+
end
189+
190+
191+
return _M

apisix/init.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,10 @@ function _M.http_log_phase()
651651
local api_ctx = common_phase("log")
652652
healcheck_passive(api_ctx)
653653

654+
if api_ctx.server_picker and api_ctx.server_picker.after_balance then
655+
api_ctx.server_picker.after_balance(api_ctx)
656+
end
657+
654658
if api_ctx.uri_parse_param then
655659
core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param)
656660
end

apisix/schema_def.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ local upstream_schema = {
311311
type = {
312312
description = "algorithms of load balancing",
313313
type = "string",
314-
enum = {"chash", "roundrobin"}
314+
enum = {"chash", "roundrobin", "ewma"}
315315
},
316316
checks = health_checker,
317317
hash_on = {

bin/apisix

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ http {
181181
lua_shared_dict worker-events 10m;
182182
lua_shared_dict lrucache-lock 10m;
183183
lua_shared_dict skywalking-tracing-buffer 100m;
184+
lua_shared_dict balancer_ewma 10m;
185+
lua_shared_dict balancer_ewma_locks 10m;
186+
lua_shared_dict balancer_ewma_last_touched_at 10m;
184187
185188
# for openid-connect plugin
186189
lua_shared_dict discovery 1m; # cache for discovery metadata documents

doc/admin-api.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ In addition to the basic complex equalization algorithm selection, APISIX's Upst
493493

494494
|Name |Optional|Description|
495495
|------- |-----|------|
496-
|type |required|`roundrobin` supports the weight of the load, `chash` consistency hash, pick one of them.|
496+
|type |required|`roundrobin` supports the weight of the load, `chash` consistency hash,`ewma` minimum latency ,pick one of them.see https://en.wikipedia.org/wiki/EWMA_chart for details|
497497
|nodes |required if `k8s_deployment_info` not configured|Hash table, the key of the internal element is the upstream machine address list, the format is `Address + Port`, where the address part can be IP or domain name, such as `192.168.1.100:80`, `foo.com:80`, etc. Value is the weight of the node. In particular, when the weight value is `0`, it has a special meaning, which usually means that the upstream node is invalid and never wants to be selected.|
498498
|k8s_deployment_info|required if `nodes` not configured|fields: `namespace``deploy_name``service_name``port``backend_type`, `port` is number, `backend_type` is `pod` or `service`, others is string. |
499499
|hash_on |optional|This option is only valid if the `type` is `chash`. Supported types `vars`(Nginx variables), `header`(custom header), `cookie`, `consumer`, the default value is `vars`.|

t/APISIX.pm

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ _EOC_
220220
lua_shared_dict worker-events 10m;
221221
lua_shared_dict lrucache-lock 10m;
222222
lua_shared_dict skywalking-tracing-buffer 100m;
223+
lua_shared_dict balancer_ewma 1m;
224+
lua_shared_dict balancer_ewma_locks 1m;
225+
lua_shared_dict balancer_ewma_last_touched_at 1m;
223226
224227
resolver $dns_addrs_str;
225228
resolver_timeout 5;

t/lib/server.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ function _M.sleep1()
6868
ngx.say("ok")
6969
end
7070

71+
function _M.ewma()
72+
if ngx.var.server_port == "1981"
73+
or ngx.var.server_port == "1982" then
74+
ngx.sleep(0.2)
75+
else
76+
ngx.sleep(0.1)
77+
end
78+
ngx.print(ngx.var.server_port)
79+
end
80+
7181
function _M.uri()
7282
-- ngx.sleep(1)
7383
ngx.say("uri: ", ngx.var.uri)

0 commit comments

Comments
 (0)