Skip to content

Commit 398941b

Browse files
authored
feature: add skywalking plugin. (#1241)
1 parent 67a2096 commit 398941b

File tree

18 files changed

+698
-11
lines changed

18 files changed

+698
-11
lines changed

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ install: default
151151
$(INSTALL) -d $(INST_LUADIR)/apisix/plugins/zipkin
152152
$(INSTALL) apisix/plugins/zipkin/*.lua $(INST_LUADIR)/apisix/plugins/zipkin/
153153

154+
$(INSTALL) -d $(INST_LUADIR)/apisix/plugins/skywalking
155+
$(INSTALL) apisix/plugins/skywalking/*.lua $(INST_LUADIR)/apisix/plugins/skywalking/
156+
154157
$(INSTALL) -d $(INST_LUADIR)/apisix/stream/plugins
155158
$(INSTALL) apisix/stream/plugins/*.lua $(INST_LUADIR)/apisix/stream/plugins/
156159

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ A/B testing, canary release, blue-green deployment, limit rate, defense against
9494
- [CORS](doc/plugins/cors.md)
9595

9696
- **OPS friendly**
97-
- OpenTracing: [support Apache Skywalking and Zipkin](doc/plugins/zipkin.md)
97+
- OpenTracing: support [Apache Skywalking](doc/plugins/skywalking.md) and [Zipkin](doc/plugins/zipkin.md)
9898
- Monitoring And Metrics: [Prometheus](doc/plugins/prometheus.md)
9999
- Clustering: APISIX nodes are stateless, creates clustering of the configuration center, please refer to [etcd Clustering Guide](https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/clustering.md).
100100
- High availability: support to configure multiple etcd addresses in the same cluster.

README_CN.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、抵
9494
- [CORS](doc/plugins/cors-cn.md)
9595

9696
- **运维友好**
97-
- OpenTracing 可观测性: [支持 Apache Skywalking 和 Zipkin](doc/plugins/zipkin-cn.md)
97+
- OpenTracing 可观测性: 支持 [Apache Skywalking](doc/plugins/skywalking-cn.md)[Zipkin](doc/plugins/zipkin-cn.md)
9898
- 监控和指标: [Prometheus](doc/plugins/prometheus-cn.md)
9999
- 集群:APISIX 节点是无状态的,创建配置中心集群请参考 [etcd Clustering Guide](https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/clustering.md)
100100
- 高可用:支持配置同一个集群内的多个 etcd 地址。

apisix/admin/routes.lua

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ function _M.put(id, conf, sub_path, args)
135135
local key = "/routes/" .. id
136136
local res, err = core.etcd.set(key, conf, args.ttl)
137137
if not res then
138-
core.log.error("failed to put route[", key, "]: ", err)
138+
core.log.error("failed to put route[", key, "] to etcd: ", err)
139139
return 500, {error_msg = err}
140140
end
141141

@@ -151,7 +151,7 @@ function _M.get(id)
151151

152152
local res, err = core.etcd.get(key)
153153
if not res then
154-
core.log.error("failed to get route[", key, "]: ", err)
154+
core.log.error("failed to get route[", key, "] from etcd: ", err)
155155
return 500, {error_msg = err}
156156
end
157157

@@ -169,7 +169,7 @@ function _M.post(id, conf, sub_path, args)
169169
-- core.log.info("key: ", key)
170170
local res, err = core.etcd.push("/routes", conf, args.ttl)
171171
if not res then
172-
core.log.error("failed to post route[", key, "]: ", err)
172+
core.log.error("failed to post route[", key, "] to etcd: ", err)
173173
return 500, {error_msg = err}
174174
end
175175

@@ -186,7 +186,7 @@ function _M.delete(id)
186186
-- core.log.info("key: ", key)
187187
local res, err = core.etcd.delete(key)
188188
if not res then
189-
core.log.error("failed to delete route[", key, "]: ", err)
189+
core.log.error("failed to delete route[", key, "] in etcd: ", err)
190190
return 500, {error_msg = err}
191191
end
192192

@@ -214,7 +214,7 @@ function _M.patch(id, conf, sub_path, args)
214214

215215
local res_old, err = core.etcd.get(key)
216216
if not res_old then
217-
core.log.error("failed to get route [", key, "]: ", err)
217+
core.log.error("failed to get route [", key, "] in etcd: ", err)
218218
return 500, {error_msg = err}
219219
end
220220

@@ -261,7 +261,7 @@ function _M.patch(id, conf, sub_path, args)
261261
-- TODO: this is not safe, we need to use compare-set
262262
local res, err = core.etcd.set(key, node_value, args.ttl)
263263
if not res then
264-
core.log.error("failed to set new route[", key, "]: ", err)
264+
core.log.error("failed to set new route[", key, "] to etcd: ", err)
265265
return 500, {error_msg = err}
266266
end
267267

apisix/plugins/skywalking.lua

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
local core = require("apisix.core")
18+
local ngx = ngx
19+
local math = math
20+
21+
local sw_client = require("apisix.plugins.skywalking.client")
22+
local sw_tracer = require("apisix.plugins.skywalking.tracer")
23+
24+
local plugin_name = "skywalking"
25+
26+
27+
local schema = {
28+
type = "object",
29+
properties = {
30+
endpoint = {type = "string"},
31+
sample_ratio = {type = "number", minimum = 0.00001, maximum = 1, default = 1}
32+
},
33+
service_name = {
34+
type = "string",
35+
description = "service name for skywalking",
36+
default = "APISIX",
37+
},
38+
required = {"endpoint"}
39+
}
40+
41+
42+
local _M = {
43+
version = 0.1,
44+
priority = -1100, -- last running plugin, but before serverless post func
45+
name = plugin_name,
46+
schema = schema,
47+
}
48+
49+
50+
function _M.check_schema(conf)
51+
return core.schema.check(schema, conf)
52+
end
53+
54+
55+
function _M.rewrite(conf, ctx)
56+
core.log.debug("rewrite phase of skywalking plugin")
57+
ctx.skywalking_sample = false
58+
if conf.sample_ratio == 1 or math.random() < conf.sample_ratio then
59+
ctx.skywalking_sample = true
60+
sw_client.heartbeat(conf)
61+
-- Currently, we can not have the upstream real network address
62+
sw_tracer.start(ctx, conf.endpoint, "upstream service")
63+
end
64+
end
65+
66+
67+
function _M.body_filter(conf, ctx)
68+
if ctx.skywalking_sample and ngx.arg[2] then
69+
sw_tracer.finish(ctx)
70+
end
71+
end
72+
73+
74+
function _M.log(conf, ctx)
75+
if ctx.skywalking_sample then
76+
sw_tracer.prepareForReport(ctx, conf.endpoint)
77+
end
78+
end
79+
80+
return _M
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
local core = require("apisix.core")
18+
local http = require("resty.http")
19+
local cjson = require('cjson')
20+
local ngx = ngx
21+
local ipairs = ipairs
22+
23+
local register = require("skywalking.register")
24+
25+
local _M = {}
26+
27+
local function register_service(conf)
28+
local endpoint = conf.endpoint
29+
30+
local tracing_buffer = ngx.shared['skywalking-tracing-buffer']
31+
local service_id = tracing_buffer:get(endpoint .. '_service_id')
32+
if service_id then
33+
return service_id
34+
end
35+
36+
local service_name = conf.service_name
37+
local service = register.newServiceRegister(service_name)
38+
39+
local httpc = http.new()
40+
local res, err = httpc:request_uri(endpoint .. '/v2/service/register',
41+
{
42+
method = "POST",
43+
body = core.json.encode(service),
44+
headers = {
45+
["Content-Type"] = "application/json",
46+
},
47+
})
48+
if not res then
49+
core.log.error("skywalking service register failed, request uri: ",
50+
endpoint .. '/v2/service/register', ", err: ", err)
51+
52+
elseif res.status == 200 then
53+
core.log.debug("skywalking service register response: ", res.body)
54+
local register_results = cjson.decode(res.body)
55+
56+
for _, result in ipairs(register_results) do
57+
if result.key == service_name then
58+
service_id = result.value
59+
core.log.debug("skywalking service registered, service id:"
60+
.. service_id)
61+
end
62+
end
63+
64+
else
65+
core.log.error("skywalking service register failed, request uri:",
66+
endpoint .. "/v2/service/register",
67+
", response code:", res.status)
68+
end
69+
70+
if service_id then
71+
tracing_buffer:set(endpoint .. '_service_id', service_id)
72+
end
73+
74+
return service_id
75+
end
76+
77+
local function register_service_instance(conf, service_id)
78+
local endpoint = conf.endpoint
79+
80+
local tracing_buffer = ngx.shared['skywalking-tracing-buffer']
81+
local instance_id = tracing_buffer:get(endpoint .. '_instance_id')
82+
if instance_id then
83+
return instance_id
84+
end
85+
86+
local service_instance_name = core.id.get()
87+
local service_instance = register.newServiceInstanceRegister(
88+
service_id,
89+
service_instance_name,
90+
ngx.now() * 1000)
91+
92+
local httpc = http.new()
93+
local res, err = httpc:request_uri(endpoint .. '/v2/instance/register',
94+
{
95+
method = "POST",
96+
body = core.json.encode(service_instance),
97+
headers = {
98+
["Content-Type"] = "application/json",
99+
},
100+
})
101+
102+
if not res then
103+
core.log.error("skywalking service Instance register failed",
104+
", request uri: ", conf.endpoint .. '/v2/instance/register',
105+
", err: ", err)
106+
107+
elseif res.status == 200 then
108+
core.log.debug("skywalking service instance register response: ", res.body)
109+
local register_results = cjson.decode(res.body)
110+
111+
for _, result in ipairs(register_results) do
112+
if result.key == service_instance_name then
113+
instance_id = result.value
114+
core.log.debug("skywalking service Instance registered, ",
115+
"service instance id: ", instance_id)
116+
end
117+
end
118+
119+
else
120+
core.log.error("skywalking service instance register failed, ",
121+
"response code:", res.status)
122+
end
123+
124+
if instance_id then
125+
tracing_buffer:set(endpoint .. '_instance_id', instance_id)
126+
end
127+
128+
return instance_id
129+
end
130+
131+
local function ping(endpoint)
132+
local tracing_buffer = ngx.shared['skywalking-tracing-buffer']
133+
local ping_pkg = register.newServiceInstancePingPkg(
134+
tracing_buffer:get(endpoint .. '_instance_id'),
135+
core.id.get(),
136+
ngx.now() * 1000)
137+
138+
local httpc = http.new()
139+
local _, err = httpc:request_uri(endpoint .. '/v2/instance/heartbeat', {
140+
method = "POST",
141+
body = core.json.encode(ping_pkg),
142+
headers = {
143+
["Content-Type"] = "application/json",
144+
},
145+
})
146+
147+
if err then
148+
core.log.error("skywalking agent ping failed, err: ", err)
149+
end
150+
end
151+
152+
-- report trace segments to the backend
153+
local function report_traces(endpoint)
154+
local tracing_buffer = ngx.shared['skywalking-tracing-buffer']
155+
local segment = tracing_buffer:rpop(endpoint .. '_segment')
156+
157+
local count = 0
158+
159+
local httpc = http.new()
160+
161+
while segment ~= nil do
162+
local res, err = httpc:request_uri(endpoint .. '/v2/segments', {
163+
method = "POST",
164+
body = segment,
165+
headers = {
166+
["Content-Type"] = "application/json",
167+
},
168+
})
169+
170+
if err == nil then
171+
if res.status ~= 200 then
172+
core.log.error("skywalking segment report failed, response code ", res.status)
173+
break
174+
else
175+
count = count + 1
176+
end
177+
else
178+
core.log.error("skywalking segment report failed, err: ", err)
179+
break
180+
end
181+
182+
segment = tracing_buffer:rpop('segment')
183+
end
184+
185+
if count > 0 then
186+
core.log.debug(count, " skywalking segments reported")
187+
end
188+
end
189+
190+
do
191+
local heartbeat_timer
192+
193+
function _M.heartbeat(conf)
194+
local sw_heartbeat = function()
195+
local service_id = register_service(conf)
196+
if not service_id then
197+
return
198+
end
199+
200+
local service_instance_id = register_service_instance(conf, service_id)
201+
if not service_instance_id then
202+
return
203+
end
204+
205+
report_traces(conf.endpoint)
206+
ping(conf.endpoint)
207+
end
208+
209+
local err
210+
if ngx.worker.id() == 0 and not heartbeat_timer then
211+
heartbeat_timer, err = core.timer.new("skywalking_heartbeat",
212+
sw_heartbeat,
213+
{check_interval = 3}
214+
)
215+
if not heartbeat_timer then
216+
core.log.error("failed to create skywalking_heartbeat timer: ", err)
217+
else
218+
core.log.info("succeed to create timer: skywalking heartbeat")
219+
end
220+
end
221+
end
222+
223+
end -- do
224+
225+
226+
return _M

0 commit comments

Comments
 (0)