Skip to content

Commit 422c7a0

Browse files
authored
publish total shards on a node (#535)
* publish total shards on a node Signed-off-by: avinash kumar <[email protected]> * publish total shards on a node, fixes go.sum Signed-off-by: avinash kumar <[email protected]> * publish total shards on a node, fixes go-kit import Signed-off-by: avinash kumar <[email protected]> * publish total shards on a node, fixes review comments Signed-off-by: avinash kumar <[email protected]>
1 parent 4456c97 commit 422c7a0

File tree

2 files changed

+183
-0
lines changed

2 files changed

+183
-0
lines changed

collector/shards.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright 2022 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package collector
14+
15+
import (
16+
"encoding/json"
17+
"fmt"
18+
"net/http"
19+
"net/url"
20+
"path"
21+
22+
"github.com/go-kit/log"
23+
"github.com/go-kit/log/level"
24+
"github.com/prometheus/client_golang/prometheus"
25+
)
26+
27+
var (
28+
defaultNodeShardLabels = []string{"node"}
29+
30+
defaultNodeShardLabelValues = func(node string) []string {
31+
return []string{
32+
node,
33+
}
34+
}
35+
)
36+
37+
// ShardResponse has shard's node and index info
38+
type ShardResponse struct {
39+
Index string `json:"index"`
40+
Shard string `json:"shard"`
41+
Node string `json:"node"`
42+
}
43+
44+
// Shards information struct
45+
type Shards struct {
46+
logger log.Logger
47+
client *http.Client
48+
url *url.URL
49+
50+
nodeShardMetrics []*nodeShardMetric
51+
jsonParseFailures prometheus.Counter
52+
}
53+
54+
// NodeShard Information per node struct
55+
type NodeShard struct {
56+
node string
57+
shards int64
58+
}
59+
60+
type nodeShardMetric struct {
61+
Type prometheus.ValueType
62+
Desc *prometheus.Desc
63+
Value func(shards float64) float64
64+
Labels func(node string) []string
65+
}
66+
67+
// NewShards defines Shards Prometheus metrics
68+
func NewShards(logger log.Logger, client *http.Client, url *url.URL) *Shards {
69+
return &Shards{
70+
logger: logger,
71+
client: client,
72+
url: url,
73+
74+
nodeShardMetrics: []*nodeShardMetric{
75+
{
76+
Type: prometheus.GaugeValue,
77+
Desc: prometheus.NewDesc(
78+
prometheus.BuildFQName(namespace, "node_shards", "total"),
79+
"Total shards per node",
80+
defaultNodeShardLabels, nil,
81+
),
82+
Value: func(shards float64) float64 {
83+
return shards
84+
},
85+
Labels: defaultNodeShardLabelValues,
86+
}},
87+
88+
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
89+
Name: prometheus.BuildFQName(namespace, "node_shards", "json_parse_failures"),
90+
Help: "Number of errors while parsing JSON.",
91+
}),
92+
}
93+
}
94+
95+
// Describe Shards
96+
func (s *Shards) Describe(ch chan<- *prometheus.Desc) {
97+
ch <- s.jsonParseFailures.Desc()
98+
99+
for _, metric := range s.nodeShardMetrics {
100+
ch <- metric.Desc
101+
}
102+
}
103+
104+
func (s *Shards) getAndParseURL(u *url.URL) ([]ShardResponse, error) {
105+
res, err := s.client.Get(u.String())
106+
if err != nil {
107+
return nil, fmt.Errorf("failed to get from %s://%s:%s%s: %s",
108+
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
109+
}
110+
111+
defer func() {
112+
err = res.Body.Close()
113+
if err != nil {
114+
_ = level.Warn(s.logger).Log(
115+
"msg", "failed to close http.Client",
116+
"err", err,
117+
)
118+
}
119+
}()
120+
121+
if res.StatusCode != http.StatusOK {
122+
return nil, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
123+
}
124+
var sfr []ShardResponse
125+
if err := json.NewDecoder(res.Body).Decode(&sfr); err != nil {
126+
s.jsonParseFailures.Inc()
127+
return nil, err
128+
}
129+
return sfr, nil
130+
}
131+
132+
func (s *Shards) fetchAndDecodeShards() ([]ShardResponse, error) {
133+
134+
u := *s.url
135+
u.Path = path.Join(u.Path, "/_cat/shards")
136+
q := u.Query()
137+
q.Set("format", "json")
138+
u.RawQuery = q.Encode()
139+
sfr, err := s.getAndParseURL(&u)
140+
if err != nil {
141+
return sfr, err
142+
}
143+
return sfr, err
144+
}
145+
146+
// Collect number of shards on each nodes
147+
func (s *Shards) Collect(ch chan<- prometheus.Metric) {
148+
149+
defer func() {
150+
ch <- s.jsonParseFailures
151+
}()
152+
153+
sr, err := s.fetchAndDecodeShards()
154+
if err != nil {
155+
_ = level.Warn(s.logger).Log(
156+
"msg", "failed to fetch and decode node shards stats",
157+
"err", err,
158+
)
159+
return
160+
}
161+
162+
nodeShards := make(map[string]float64)
163+
164+
for _, shard := range sr {
165+
if val, ok := nodeShards[shard.Node]; ok {
166+
nodeShards[shard.Node] = val + 1
167+
} else {
168+
nodeShards[shard.Node] = 1
169+
}
170+
}
171+
172+
for node, shards := range nodeShards {
173+
for _, metric := range s.nodeShardMetrics {
174+
ch <- prometheus.MustNewConstMetric(
175+
metric.Desc,
176+
metric.Type,
177+
metric.Value(shards),
178+
metric.Labels(node)...,
179+
)
180+
}
181+
}
182+
}

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ func main() {
167167
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))
168168

169169
if *esExportIndices || *esExportShards {
170+
prometheus.MustRegister(collector.NewShards(logger, httpClient, esURL))
170171
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards)
171172
prometheus.MustRegister(iC)
172173
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {

0 commit comments

Comments
 (0)