Skip to content

Commit 56387a0

Browse files
committed
Add metrics collection for data stream statistics
Signed-off-by: Mike Eves <[email protected]>
1 parent b536294 commit 56387a0

File tree

5 files changed

+294
-0
lines changed

5 files changed

+294
-0
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ elasticsearch_exporter --help
5858
| es.shards | 1.0.3rc1 | If true, query stats for all indices in the cluster, including shard-level stats (implies `es.indices=true`). | false |
5959
| es.snapshots | 1.0.4rc1 | If true, query stats for the cluster snapshots. | false |
6060
| es.slm | | If true, query stats for SLM. | false |
61+
| es.data_stream | | If true, query state for Data Steams. | false |
6162
| es.timeout | 1.0.2 | Timeout for trying to get stats from Elasticsearch. (ex: 20s) | 5s |
6263
| es.ca | 1.0.2 | Path to PEM file that contains trusted Certificate Authorities for the Elasticsearch connection. | |
6364
| es.client-private-key | 1.0.2 | Path to PEM file that contains the private key for client auth when connecting to Elasticsearch. | |
@@ -89,6 +90,7 @@ es.indices_settings | `indices` `monitor` (per index or `*`) |
8990
es.shards | not sure if `indices` or `cluster` `monitor` or both |
9091
es.snapshots | `cluster:admin/snapshot/status` and `cluster:admin/repository/get` | [ES Forum Post](https://discuss.elastic.co/t/permissions-for-backup-user-with-x-pack/88057)
9192
es.slm | `read_slm`
93+
es.data_stream | `monitor` or `manage` (per index or `*`) |
9294

9395
Further Information
9496
- [Build in Users](https://www.elastic.co/guide/en/elastic-stack-overview/7.3/built-in-users.html)
@@ -240,6 +242,11 @@ Further Information
240242
| elasticsearch_slm_stats_snapshots_deleted_total | counter | 1 | Snapshots deleted by policy
241243
| elasticsearch_slm_stats_snapshot_deletion_failures_total | counter | 1 | Snapshot deletion failures by policy
242244
| elasticsearch_slm_stats_operation_mode | gauge | 1 | SLM operation mode (Running, stopping, stopped)
245+
| elasticsearch_data_stream_stats_up | gauge | 0 | Up metric for Data Stream collection
246+
| elasticsearch_data_stream_stats_total_scrapes | counter | 0 | Total scrapes for Data Stream stats
247+
| elasticsearch_data_stream_stats_json_parse_failures | counter | 0 | Number of parsing failures for Data Stream stats
248+
| elasticsearch_data_stream_backing_indices_total | gauge | 1 | Number of backing indices for Data Stream
249+
| elasticsearch_data_stream_store_size_bytes | gauge | 1 | Current size of data stream backing indices in bytes
243250

244251

245252
### Alerts & Recording Rules

collector/data_stream.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Copyright 2022 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package collector
15+
16+
import (
17+
"encoding/json"
18+
"fmt"
19+
"io/ioutil"
20+
"net/http"
21+
"net/url"
22+
"path"
23+
24+
"github.com/go-kit/log"
25+
"github.com/go-kit/log/level"
26+
"github.com/prometheus/client_golang/prometheus"
27+
)
28+
29+
type dataStreamMetric struct {
30+
Type prometheus.ValueType
31+
Desc *prometheus.Desc
32+
Value func(dataStreamStats DataStreamStatsDataStream) float64
33+
Labels func(dataStreamStats DataStreamStatsDataStream) []string
34+
}
35+
36+
var (
37+
defaultDataStreamLabels = []string{"data_stream"}
38+
defaultDataStreamLabelValues = func(dataStreamStats DataStreamStatsDataStream) []string {
39+
return []string{dataStreamStats.DataStream}
40+
}
41+
)
42+
43+
// DataStream Information Struct
44+
type DataStream struct {
45+
logger log.Logger
46+
client *http.Client
47+
url *url.URL
48+
49+
up prometheus.Gauge
50+
totalScrapes, jsonParseFailures prometheus.Counter
51+
52+
dataStreamMetrics []*dataStreamMetric
53+
}
54+
55+
// NewDataStream defines DataStream Prometheus metrics
56+
func NewDataStream(logger log.Logger, client *http.Client, url *url.URL) *DataStream {
57+
return &DataStream{
58+
logger: logger,
59+
client: client,
60+
url: url,
61+
62+
up: prometheus.NewGauge(prometheus.GaugeOpts{
63+
Name: prometheus.BuildFQName(namespace, "data_stream_stats", "up"),
64+
Help: "Was the last scrape of the ElasticSearch Data Stream stats endpoint successful.",
65+
}),
66+
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
67+
Name: prometheus.BuildFQName(namespace, "data_stream_stats", "total_scrapes"),
68+
Help: "Current total ElasticSearch Data STream scrapes.",
69+
}),
70+
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
71+
Name: prometheus.BuildFQName(namespace, "data_stream_stats", "json_parse_failures"),
72+
Help: "Number of errors while parsing JSON.",
73+
}),
74+
dataStreamMetrics: []*dataStreamMetric{
75+
{
76+
Type: prometheus.CounterValue,
77+
Desc: prometheus.NewDesc(
78+
prometheus.BuildFQName(namespace, "data_stream", "backing_indices_total"),
79+
"Number of backing indices",
80+
defaultDataStreamLabels, nil,
81+
),
82+
Value: func(dataStreamStats DataStreamStatsDataStream) float64 {
83+
return float64(dataStreamStats.BackingIndices)
84+
},
85+
Labels: defaultDataStreamLabelValues,
86+
},
87+
{
88+
Type: prometheus.CounterValue,
89+
Desc: prometheus.NewDesc(
90+
prometheus.BuildFQName(namespace, "data_stream", "store_size_bytes"),
91+
"Store size of data stream",
92+
defaultDataStreamLabels, nil,
93+
),
94+
Value: func(dataStreamStats DataStreamStatsDataStream) float64 {
95+
return float64(dataStreamStats.StoreSizeBytes)
96+
},
97+
Labels: defaultDataStreamLabelValues,
98+
},
99+
},
100+
}
101+
}
102+
103+
// Describe adds DataStream metrics descriptions
104+
func (ds *DataStream) Describe(ch chan<- *prometheus.Desc) {
105+
for _, metric := range ds.dataStreamMetrics {
106+
ch <- metric.Desc
107+
}
108+
109+
ch <- ds.up.Desc()
110+
ch <- ds.totalScrapes.Desc()
111+
ch <- ds.jsonParseFailures.Desc()
112+
}
113+
114+
func (ds *DataStream) fetchAndDecodeDataStreamStats() (DataStreamStatsResponse, error) {
115+
var dsr DataStreamStatsResponse
116+
117+
u := *ds.url
118+
u.Path = path.Join(u.Path, "/_data_stream/*/_stats")
119+
res, err := ds.client.Get(u.String())
120+
if err != nil {
121+
return dsr, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s",
122+
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
123+
}
124+
125+
defer func() {
126+
err = res.Body.Close()
127+
if err != nil {
128+
_ = level.Warn(ds.logger).Log(
129+
"msg", "failed to close http.Client",
130+
"err", err,
131+
)
132+
}
133+
}()
134+
135+
if res.StatusCode != http.StatusOK {
136+
return dsr, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
137+
}
138+
139+
bts, err := ioutil.ReadAll(res.Body)
140+
if err != nil {
141+
ds.jsonParseFailures.Inc()
142+
return dsr, err
143+
}
144+
145+
if err := json.Unmarshal(bts, &dsr); err != nil {
146+
ds.jsonParseFailures.Inc()
147+
return dsr, err
148+
}
149+
150+
return dsr, nil
151+
}
152+
153+
// Collect gets DataStream metric values
154+
func (ds *DataStream) Collect(ch chan<- prometheus.Metric) {
155+
ds.totalScrapes.Inc()
156+
defer func() {
157+
ch <- ds.up
158+
ch <- ds.totalScrapes
159+
ch <- ds.jsonParseFailures
160+
}()
161+
162+
dataStreamStatsResp, err := ds.fetchAndDecodeDataStreamStats()
163+
if err != nil {
164+
ds.up.Set(0)
165+
_ = level.Warn(ds.logger).Log(
166+
"msg", "failed to fetch and decode data stream stats",
167+
"err", err,
168+
)
169+
return
170+
}
171+
172+
ds.up.Set(1)
173+
174+
for _, metric := range ds.dataStreamMetrics {
175+
for _, dataStream := range dataStreamStatsResp.DataStreamStats {
176+
fmt.Printf("Metric: %+v", dataStream)
177+
ch <- prometheus.MustNewConstMetric(
178+
metric.Desc,
179+
metric.Type,
180+
metric.Value(dataStream),
181+
metric.Labels(dataStream)...,
182+
)
183+
}
184+
}
185+
}

collector/data_stream_response.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright 2022 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package collector
15+
16+
// DataStreamStatsResponse is a representation of the Data Stream stats
17+
type DataStreamStatsResponse struct {
18+
Shards DataStreamStatsShards `json:"_shards"`
19+
DataStreamCount int64 `json:"data_stream_count"`
20+
BackingIndices int64 `json:"backing_indices"`
21+
TotalStoreSizeBytes int64 `json:"total_store_size_bytes"`
22+
DataStreamStats []DataStreamStatsDataStream `json:"data_streams"`
23+
}
24+
25+
// DataStreamStatsShards defines data stream stats shards information structure
26+
type DataStreamStatsShards struct {
27+
Total int64 `json:"total"`
28+
Successful int64 `json:"successful"`
29+
Failed int64 `json:"failed"`
30+
}
31+
32+
// DataStreamStatsDataStream defines the structure of per data stream stats
33+
type DataStreamStatsDataStream struct {
34+
DataStream string `json:"data_stream"`
35+
BackingIndices int64 `json:"backing_indices"`
36+
StoreSizeBytes int64 `json:"store_size_bytes"`
37+
MaximumTimestamp int64 `json:"maximum_timestamp"`
38+
}

collector/data_stream_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2022 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package collector
15+
16+
import (
17+
"fmt"
18+
"net/http"
19+
"net/http/httptest"
20+
"net/url"
21+
"testing"
22+
23+
"github.com/go-kit/log"
24+
)
25+
26+
func TestDataStream(t *testing.T) {
27+
tcs := map[string]string{
28+
"7.15.0": `{"_shards":{"total":30,"successful":30,"failed":0},"data_stream_count":2,"backing_indices":7,"total_store_size_bytes":1103028116,"data_streams":[{"data_stream":"foo","backing_indices":5,"store_size_bytes":429205396,"maximum_timestamp":1656079894000},{"data_stream":"bar","backing_indices":2,"store_size_bytes":673822720,"maximum_timestamp":1656028796000}]}`,
29+
}
30+
for ver, out := range tcs {
31+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
32+
fmt.Fprintln(w, out)
33+
}))
34+
defer ts.Close()
35+
36+
u, err := url.Parse(ts.URL)
37+
if err != nil {
38+
t.Fatalf("Failed to parse URL: %s", err)
39+
}
40+
s := NewDataStream(log.NewNopLogger(), http.DefaultClient, u)
41+
stats, err := s.fetchAndDecodeDataStreamStats()
42+
if err != nil {
43+
t.Fatalf("Failed to fetch or decode data stream stats: %s", err)
44+
}
45+
t.Logf("[%s] Data Stream Response: %+v", ver, stats)
46+
dataStreamStats := stats.DataStreamStats[0]
47+
48+
if dataStreamStats.BackingIndices != 5 {
49+
t.Errorf("Bad number of backing indices")
50+
}
51+
52+
if dataStreamStats.StoreSizeBytes != 429205396 {
53+
t.Errorf("Bad store size bytes valuee")
54+
}
55+
}
56+
57+
}

main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ func main() {
8888
esExportSLM = kingpin.Flag("es.slm",
8989
"Export stats for SLM snapshots.").
9090
Default("false").Bool()
91+
esExportDataStream = kingpin.Flag("es.data_stream",
92+
"Export stas for Data Streams.").
93+
Default("false").Bool()
9194
esClusterInfoInterval = kingpin.Flag("es.clusterinfo.interval",
9295
"Cluster info update interval for the cluster label").
9396
Default("5m").Duration()
@@ -201,6 +204,10 @@ func main() {
201204
prometheus.MustRegister(collector.NewSLM(logger, httpClient, esURL))
202205
}
203206

207+
if *esExportDataStream {
208+
prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL))
209+
}
210+
204211
if *esExportClusterSettings {
205212
prometheus.MustRegister(collector.NewClusterSettings(logger, httpClient, esURL))
206213
}

0 commit comments

Comments
 (0)