Skip to content

Commit c982af2

Browse files
committed
collector: use collector interface for tasks
Signed-off-by: Aaron Delaney <[email protected]>
1 parent dd0a9f3 commit c982af2

File tree

6 files changed

+148
-200
lines changed

6 files changed

+148
-200
lines changed

collector/cluster_settings_test.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package collector
1515

1616
import (
17-
"context"
1817
"io"
1918
"net/http"
2019
"net/http/httptest"
@@ -24,21 +23,9 @@ import (
2423
"testing"
2524

2625
"github.com/go-kit/log"
27-
"github.com/prometheus/client_golang/prometheus"
2826
"github.com/prometheus/client_golang/prometheus/testutil"
2927
)
3028

31-
type wrapCollector struct {
32-
c Collector
33-
}
34-
35-
func (w wrapCollector) Describe(ch chan<- *prometheus.Desc) {
36-
}
37-
38-
func (w wrapCollector) Collect(ch chan<- prometheus.Metric) {
39-
w.c.Update(context.Background(), ch)
40-
}
41-
4229
func TestClusterSettingsStats(t *testing.T) {
4330
// Testcases created using:
4431
// docker run -d -p 9200:9200 elasticsearch:VERSION-alpine

collector/collector_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright 2023 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package collector
15+
16+
import (
17+
"context"
18+
19+
"github.com/prometheus/client_golang/prometheus"
20+
)
21+
22+
// wrapCollector is a util to let you test your Collector implementation.
23+
//
24+
// Use this with prometheus/client_golang/prometheus/testutil to test metric output, for example:
25+
//
26+
// testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(want))
27+
type wrapCollector struct {
28+
c Collector
29+
}
30+
31+
func (w wrapCollector) Describe(_ chan<- *prometheus.Desc) {
32+
}
33+
34+
func (w wrapCollector) Collect(ch chan<- prometheus.Metric) {
35+
w.c.Update(context.Background(), ch)
36+
}

collector/tasks.go

Lines changed: 73 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -14,97 +14,80 @@
1414
package collector
1515

1616
import (
17+
"context"
1718
"encoding/json"
1819
"fmt"
1920
"io"
2021
"net/http"
2122
"net/url"
22-
"path"
2323

24+
"github.com/alecthomas/kingpin/v2"
2425
"github.com/go-kit/log"
2526
"github.com/go-kit/log/level"
2627
"github.com/prometheus/client_golang/prometheus"
2728
)
2829

29-
type taskByAction struct {
30-
Type prometheus.ValueType
31-
Desc *prometheus.Desc
32-
Value func(action string, count int64) float64
33-
Labels func(action string, count int64) []string
34-
}
35-
36-
var (
37-
taskLabels = []string{"cluster", "action"}
38-
)
30+
// filterByTask global required because collector interface doesn't expose any way to take
31+
// constructor args.
32+
var actionFilter string
3933

40-
// Task Information Struct
41-
type Task struct {
42-
logger log.Logger
43-
client *http.Client
44-
url *url.URL
45-
actions string
34+
var taskActionDesc = prometheus.NewDesc(
35+
prometheus.BuildFQName(namespace, "task_stats", "action_total"),
36+
"Number of tasks of a certain action",
37+
[]string{"action"}, nil)
4638

47-
up prometheus.Gauge
48-
totalScrapes, jsonParseFailures prometheus.Counter
39+
func init() {
40+
kingpin.Flag("tasks.actions",
41+
"Filter on task actions. Used in same way as Task API actions param").
42+
Default("indices:*").StringVar(&actionFilter)
43+
registerCollector("tasks", defaultDisabled, NewTaskCollector)
44+
}
4945

50-
byActionMetrics []*taskByAction
46+
// Task Information Struct
47+
type TaskCollector struct {
48+
logger log.Logger
49+
hc *http.Client
50+
u *url.URL
5151
}
5252

53-
// NewTask defines Task Prometheus metrics
54-
func NewTask(logger log.Logger, client *http.Client, url *url.URL, actions string) *Task {
55-
return &Task{
56-
logger: logger,
57-
client: client,
58-
url: url,
59-
actions: actions,
60-
61-
up: prometheus.NewGauge(prometheus.GaugeOpts{
62-
Name: prometheus.BuildFQName(namespace, "task_stats", "up"),
63-
Help: "Was the last scrape of the ElasticSearch Task endpoint successful.",
64-
}),
65-
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
66-
Name: prometheus.BuildFQName(namespace, "task_stats", "total_scrapes"),
67-
Help: "Current total Elasticsearch snapshots scrapes.",
68-
}),
69-
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
70-
Name: prometheus.BuildFQName(namespace, "task_stats", "json_parse_failures"),
71-
Help: "Number of errors while parsing JSON.",
72-
}),
73-
byActionMetrics: []*taskByAction{
74-
{
75-
Type: prometheus.GaugeValue,
76-
Desc: prometheus.NewDesc(
77-
prometheus.BuildFQName(namespace, "task_stats", "action_total"),
78-
"Number of tasks of a certain action",
79-
[]string{"action"}, nil,
80-
),
81-
Value: func(action string, count int64) float64 {
82-
return float64(count)
83-
},
84-
Labels: func(action string, count int64) []string {
85-
return []string{action}
86-
},
87-
},
88-
},
89-
}
53+
// NewTaskCollector defines Task Prometheus metrics
54+
func NewTaskCollector(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) {
55+
level.Info(logger).Log("msg", "task collector created",
56+
"actionFilter", actionFilter,
57+
)
58+
59+
return &TaskCollector{
60+
logger: logger,
61+
hc: hc,
62+
u: u,
63+
}, nil
9064
}
9165

92-
// Describe adds Task metrics descriptions
93-
func (t *Task) Describe(ch chan<- *prometheus.Desc) {
94-
for _, metric := range t.byActionMetrics {
95-
ch <- metric.Desc
66+
func (t *TaskCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
67+
stats, err := t.fetchAndDecodeAndAggregateTaskStats()
68+
if err != nil {
69+
err = fmt.Errorf("failed to fetch and decode task stats: %w", err)
70+
return err
9671
}
97-
98-
ch <- t.up.Desc()
99-
ch <- t.totalScrapes.Desc()
100-
ch <- t.jsonParseFailures.Desc()
72+
for action, count := range stats.CountByAction {
73+
ch <- prometheus.MustNewConstMetric(
74+
taskActionDesc,
75+
prometheus.GaugeValue,
76+
float64(count),
77+
action,
78+
)
79+
}
80+
return nil
10181
}
10282

103-
func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, error) {
104-
u := *t.url
105-
u.Path = path.Join(u.Path, "/_tasks")
106-
u.RawQuery = "group_by=none&actions=" + t.actions
107-
res, err := t.client.Get(u.String())
83+
func (t *TaskCollector) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, error) {
84+
u := t.u.ResolveReference(&url.URL{Path: "_tasks"})
85+
q := u.Query()
86+
q.Set("group_by", "none")
87+
q.Set("actions", actionFilter)
88+
u.RawQuery = q.Encode()
89+
90+
res, err := t.hc.Get(u.String())
10891
if err != nil {
10992
return nil, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s",
11093
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
@@ -126,49 +109,39 @@ func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, erro
126109

127110
bts, err := io.ReadAll(res.Body)
128111
if err != nil {
129-
t.jsonParseFailures.Inc()
130112
return nil, err
131113
}
132114

133115
var tr TasksResponse
134116
if err := json.Unmarshal(bts, &tr); err != nil {
135-
t.jsonParseFailures.Inc()
136117
return nil, err
137118
}
138119

139120
stats := AggregateTasks(tr)
140121
return stats, nil
141122
}
142123

143-
// Collect gets Task metric values
144-
func (ds *Task) Collect(ch chan<- prometheus.Metric) {
145-
ds.totalScrapes.Inc()
146-
defer func() {
147-
ch <- ds.up
148-
ch <- ds.totalScrapes
149-
ch <- ds.jsonParseFailures
150-
}()
124+
// TasksResponse is a representation of the Task management API.
125+
type TasksResponse struct {
126+
Tasks []TaskResponse `json:"tasks"`
127+
}
151128

152-
stats, err := ds.fetchAndDecodeAndAggregateTaskStats()
153-
if err != nil {
154-
ds.up.Set(0)
155-
level.Warn(ds.logger).Log(
156-
"msg", "failed to fetch and decode task stats",
157-
"err", err,
158-
)
159-
return
160-
}
129+
// TaskResponse is a representation of the individual task item returned by task API endpoint.
130+
//
131+
// We only parse a very limited amount of this API for use in aggregation.
132+
type TaskResponse struct {
133+
Action string `json:"action"`
134+
}
161135

162-
for action, count := range stats.CountByAction {
163-
for _, metric := range ds.byActionMetrics {
164-
ch <- prometheus.MustNewConstMetric(
165-
metric.Desc,
166-
metric.Type,
167-
metric.Value(action, count),
168-
metric.Labels(action, count)...,
169-
)
170-
}
171-
}
136+
type AggregatedTaskStats struct {
137+
CountByAction map[string]int64
138+
}
172139

173-
ds.up.Set(1)
140+
func AggregateTasks(t TasksResponse) *AggregatedTaskStats {
141+
actions := map[string]int64{}
142+
for _, task := range t.Tasks {
143+
actions[task.Action] += 1
144+
}
145+
agg := &AggregatedTaskStats{CountByAction: actions}
146+
return agg
174147
}

collector/tasks_response.go

Lines changed: 0 additions & 39 deletions
This file was deleted.

0 commit comments

Comments
 (0)