Skip to content

Commit 51dd72b

Browse files
author
Remi Hakim
committed
Improvements to the rabbitmq integration
Fix #682 , Fix #644
1 parent 2c510a0 commit 51dd72b

File tree

1 file changed

+149
-121
lines changed

1 file changed

+149
-121
lines changed

checks.d/rabbitmq.py

Lines changed: 149 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
import urllib2
22
import urlparse
3+
import time
34

45
from checks import AgentCheck
56
from util import json
67

7-
QUEUE_ATTRIBUTES = [
8+
EVENT_TYPE = SOURCE_TYPE_NAME = 'rabbitmq'
9+
QUEUE_TYPE = 'queues'
10+
NODE_TYPE = 'nodes'
11+
MAX_DETAILED_QUEUES = 200
12+
MAX_DETAILED_NODES = 100
13+
ALERT_THRESHOLD = 0.9 # Post an event in the stream when the number of queues or nodes to collect is above 90% of the limit
14+
QUEUE_ATTRIBUTES = [
815
'active_consumers',
916
'consumers',
1017
'memory',
@@ -14,31 +21,49 @@
1421
]
1522

1623
NODE_ATTRIBUTES = [
17-
'disk_free',
18-
'disk_free_limit',
1924
'fd_total',
2025
'fd_used',
2126
'mem_limit',
2227
'mem_used',
23-
'proc_total',
24-
'proc_used',
25-
'processors',
2628
'run_queue',
2729
'sockets_total',
2830
'sockets_used',
2931
]
3032

31-
MAX_QUEUES = 5
32-
MAX_NODES = 3
33+
ATTRIBUTES = {
34+
QUEUE_TYPE: QUEUE_ATTRIBUTES,
35+
NODE_TYPE: NODE_ATTRIBUTES,
36+
}
3337

34-
QUEUE_LIMIT = 100
35-
NODE_LIMIT = 100
38+
39+
40+
TAGS_MAP = {
41+
QUEUE_TYPE: {
42+
'node':'node',
43+
'name':'queue',
44+
'vhost':'vhost',
45+
'policy':'policy',
46+
},
47+
NODE_TYPE: {
48+
'name':'node',
49+
}
50+
}
51+
52+
METRIC_SUFFIX = {
53+
QUEUE_TYPE: "queue",
54+
NODE_TYPE: "node",
55+
}
3656

3757
class RabbitMQ(AgentCheck):
3858
"""This check is for gathering statistics from the RabbitMQ
3959
Management Plugin (http://www.rabbitmq.com/management.html)
4060
"""
41-
def check(self, instance):
61+
62+
def __init__(self, name, init_config, agentConfig, instances=None):
63+
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
64+
self.already_alerted = []
65+
66+
def _get_config(self, instance):
4267
# make sure 'rabbitmq_api_url; is present
4368
if 'rabbitmq_api_url' not in instance:
4469
raise Exception('Missing "rabbitmq_api_url" in RabbitMQ config.')
@@ -50,14 +75,32 @@ def check(self, instance):
5075
username = instance.get('rabbitmq_user', 'guest')
5176
password = instance.get('rabbitmq_pass', 'guest')
5277

78+
# Limit of queues/nodes to collect metrics from
79+
max_detailed = {
80+
QUEUE_TYPE: int(instance.get('max_detailed_queues', MAX_DETAILED_QUEUES)),
81+
NODE_TYPE: int(instance.get('max_detailed_nodes', MAX_DETAILED_NODES)),
82+
}
83+
84+
# List of queues/nodes to collect metrics from
85+
specified = {
86+
QUEUE_TYPE: instance.get('queues', []),
87+
NODE_TYPE: instance.get('nodes', []),
88+
}
89+
5390
# setup urllib2 for Basic Auth
5491
auth_handler = urllib2.HTTPBasicAuthHandler()
5592
auth_handler.add_password(realm='RabbitMQ Management', uri=base_url, user=username, passwd=password)
5693
opener = urllib2.build_opener(auth_handler)
5794
urllib2.install_opener(opener)
5895

59-
self.get_queue_stats(instance, base_url)
60-
self.get_node_stats(instance, base_url)
96+
return base_url, max_detailed, specified
97+
98+
99+
def check(self, instance):
100+
base_url, max_detailed, specified = self._get_config(instance)
101+
self.get_stats(instance, base_url, QUEUE_TYPE, max_detailed[QUEUE_TYPE], specified[QUEUE_TYPE])
102+
self.get_stats(instance, base_url, NODE_TYPE, max_detailed[NODE_TYPE], specified[NODE_TYPE])
103+
61104

62105
def _get_data(self, url):
63106
try:
@@ -69,118 +112,103 @@ def _get_data(self, url):
69112
return data
70113

71114

72-
def _get_metrics_for_queue(self, queue, is_gauge=False, send_histogram=True):
73-
if is_gauge:
115+
def get_stats(self, instance, base_url, object_type, max_detailed, specified):
116+
data = self._get_data(urlparse.urljoin(base_url, object_type))
117+
118+
if len(data) > ALERT_THRESHOLD * max_detailed and not specified:
119+
self.alert(base_url, max_detailed, len(data), object_type)
120+
121+
if len(data) > max_detailed and not specified:
122+
self.warning("Too many queues to fetch. You must choose the queues you are interested in by editing the rabbitmq.yaml configuration file or get in touch with Datadog Support")
123+
124+
if len(specified) > max_detailed:
125+
raise Exception("The maximum number of %s you can specify is %d." % (object_type, max_detailed))
126+
127+
limit_reached = False
128+
detailed = 0
129+
for data_line in data:
130+
name = data_line.get("name")
131+
absolute_name = name
132+
133+
if object_type == QUEUE_TYPE:
134+
absolute_name = '%s/%s' % (data_line.get("vhost"), name)
135+
136+
if len(data) < max_detailed:
137+
# The number of queues or nodes is below the limit.
138+
# We can collect detailed metrics for those
139+
self._get_metrics(data_line, object_type, detailed=True)
140+
detailed += 1
141+
142+
elif name in specified:
143+
# This queue/node is specified in the config
144+
# We can collect detailed metrics for those
145+
self._get_metrics(data_line, object_type, detailed=True)
146+
detailed += 1
147+
specified.remove(name)
148+
149+
elif absolute_name in specified:
150+
# This queue/node is specified in the config
151+
# We can collect detailed metrics for those
152+
self._get_metrics(data_line, object_type, detailed=True)
153+
detailed += 1
154+
specified.remove(absolute_name)
155+
156+
elif not limit_reached and not specified:
157+
# No queues/nodes are specified in the config but we haven't reached the limit yet
158+
# We can collect detailed metrics for those
159+
self._get_metrics(data_line, object_type, detailed=True)
160+
detailed += 1
161+
162+
limit_reached = detailed >= max_detailed
163+
164+
if limit_reached or len(data) > max_detailed and not specified:
165+
self._get_metrics(data_line, object_type, detailed=False)
166+
167+
def _get_metrics(self, data, object_type, detailed):
168+
if detailed:
74169
tags = []
75-
tag_list = {
76-
'node':'node',
77-
'name':'queue',
78-
'vhost':'vhost',
79-
'policy':'policy',
80-
}
170+
tag_list = TAGS_MAP[object_type]
81171
for t in tag_list.keys():
82-
tag = queue.get(t, None)
172+
tag = data.get(t, None)
83173
if tag is not None:
84174
tags.append('rabbitmq_%s:%s' % (tag_list[t], tag))
85175

86-
else:
87-
tags = None
88-
89-
for attribute in QUEUE_ATTRIBUTES:
90-
value = queue.get(attribute, None)
176+
for attribute in ATTRIBUTES[object_type]:
177+
value = data.get(attribute, None)
91178
if value is not None:
92-
if send_histogram:
93-
self.histogram('rabbitmq.queue.%s.hist' % attribute, int(value))
94-
if is_gauge:
95-
self.gauge('rabbitmq.queue.%s' % attribute, int(value), tags=tags)
96-
97-
98-
def _get_metrics_for_node(self, node, is_gauge=False, send_histogram=True):
99-
if is_gauge:
100-
tags = []
101-
if 'name' in node:
102-
tags.append('rabbitmq_node:%s' % node['name'])
179+
self.histogram('rabbitmq.%s.%s.hist' % (METRIC_SUFFIX[object_type], attribute), int(value))
180+
if detailed:
181+
self.gauge('rabbitmq.%s.%s' % (METRIC_SUFFIX[object_type], attribute), int(value), tags=tags)
182+
183+
def alert(self, base_url, max_detailed, size, object_type):
184+
key = "%s%s" % (base_url, object_type)
185+
if key in self.already_alerted:
186+
# We already posted an event
187+
return
188+
189+
self.already_alerted.append(key)
190+
191+
title = "RabbitMQ integration is approaching the limit on %s" % self.hostname
192+
msg = """%s %s are present. The limit is %s.
193+
Please get in touch with Datadog support to increase the limit.""" % (size, object_type, max_detailed)
194+
195+
event = {
196+
"timestamp": int(time.time()),
197+
"event_type": EVENT_TYPE,
198+
"api_key": self.agentConfig['api_key'],
199+
"msg_title": title,
200+
"msg_text": msg,
201+
"alert_type": 'warning',
202+
"source_type_name": SOURCE_TYPE_NAME,
203+
"host": self.hostname,
204+
"tags": ["base_url:%s" % base_url, "host:%s" % self.hostname],
205+
"event_object": key,
206+
}
207+
208+
self.event(event)
209+
210+
211+
212+
213+
103214

104-
for attribute in NODE_ATTRIBUTES:
105-
value = node.get(attribute, None)
106-
if value is not None:
107-
if send_histogram:
108-
self.histogram('rabbitmq.node.%s.hist' % attribute, int(value))
109-
if is_gauge:
110-
self.gauge('rabbitmq.node.%s' % attribute, int(value), tags=tags)
111-
112-
113-
def get_queue_stats(self, instance, base_url):
114-
url = urlparse.urljoin(base_url, 'queues')
115-
queues = self._get_data(url)
116-
117-
if len(queues) > 100 and not instance.get('queues', None):
118-
self.warning("Too many queues to fetch. You must choose the queues you are interested in by editing the rabbitmq.yaml configuration file")
119-
120-
allowed_queues = instance.get('queues', [])
121-
if len(allowed_queues) > MAX_QUEUES:
122-
raise Exception("The maximum number of queues you can specify is %d." % MAX_QUEUES)
123-
124-
if not allowed_queues:
125-
allowed_queues = [q.get('name') for q in queues[:MAX_QUEUES]]
126-
# If no queues are specified in the config, we only get metrics for the 5 first ones.
127-
# Others will be aggregated
128-
129-
i = 0
130-
queue_Limit_reached = False
131-
for queue in queues:
132-
name = queue.get('name')
133-
if name in allowed_queues:
134-
self._get_metrics_for_queue(queue, is_gauge=True, send_histogram=len(queues) > MAX_QUEUES)
135-
allowed_queues.remove(name)
136-
elif queue_Limit_reached:
137-
if not allowed_queues:
138-
# We have reached the limit and we have already processed the config specified queues
139-
break
140-
# We have reached the limit but some queues specified in the config still haven't been processed
141-
continue
142-
else:
143-
self._get_metrics_for_queue(queue)
144-
145-
i += 1
146-
if i > QUEUE_LIMIT:
147-
self.warning("More than %s queues are present. Only collecting data using the 100 first" % QUEUE_LIMIT)
148-
queue_Limit_reached = True
149-
150-
151-
def get_node_stats(self, instance, base_url):
152-
url = urlparse.urljoin(base_url, 'nodes')
153-
nodes = self._get_data(url)
154-
155-
if len(nodes) > 100 and not instance.get('nodes', None):
156-
self.warning("Too many queues to fetch. You must choose the queues you are interested in by editing the rabbitmq.yaml configuration file")
157-
158-
allowed_nodes = instance.get('nodes', [])
159-
if len(allowed_nodes) > MAX_NODES:
160-
raise Exception("The maximum number of nodes you can specify is %d." % MAX_NODES)
161-
162-
if not allowed_nodes:
163-
allowed_nodes = [n.get('name') for n in nodes[:MAX_NODES]]
164-
# If no nodes are specified in the config, we only get metrics for the 5 first ones.
165-
# Others will be aggregated
166-
167-
i = 0
168-
node_limit_reached = False
169-
for node in nodes:
170-
name = node.get('name')
171-
if name in allowed_nodes:
172-
self._get_metrics_for_node(node, is_gauge=True, send_histogram=len(nodes) > MAX_NODES)
173-
allowed_nodes.remove(name)
174-
elif node_limit_reached:
175-
if not allowed_nodes:
176-
# We have reached the limit and we have already processed the config specified nodes
177-
break
178-
# We have reached the limit but some nodes specified in the config still haven't been processed
179-
continue
180-
else:
181-
self._get_metrics_for_node(node)
182-
183-
i += 1
184-
if i > NODE_LIMIT:
185-
self.warning("More than %s nodes are present. Only collecting data using the 100 first" % NODE_LIMIT)
186-
node_limit_reached = True

0 commit comments

Comments
 (0)