0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-26 02:08:31 +00:00
alerta_alerta/alerta/ganglia/daemon.py
Nick Satterly eda6395dd0 lots
2013-03-07 18:28:44 +00:00

408 lines
19 KiB
Python

import os
import sys
import urllib2
import json
import datetime
import time
import uuid
import re
import yaml
import stomp
from alerta.common import config
from alerta.common import log as logging
from alerta.common.daemon import Daemon
from alerta.alert import Alert, Heartbeat
from alerta.alert import syslog
from alerta.common.mq import Messaging
Version = '2.0.0'
LOG = logging.getLogger(__name__)
CONF = config.CONF
RULESFILE = '/opt/alerta/conf/alert-ganglia.yaml'
currentCount = dict()
currentState = dict()
previousSeverity = dict()
class GangliaDaemon(Daemon):
def run(self):
self.running = True
# Connect to message queue
self.mq = Messaging()
self.mq.connect()
while not self.shuttingdown:
try:
rules = GangliaDaemon.init_rules() # re-read rule config each time
self.check(rules)
LOG.debug('Send heartbeat...')
heartbeat = Heartbeat(version=Version)
self.mq.send(heartbeat)
LOG.debug('Waiting for next check run...')
time.sleep(CONF.loop_every)
except (KeyboardInterrupt, SystemExit):
self.shuttingdown = True
LOG.info('Shutdown request received...')
self.running = False
LOG.info('Disconnecting from message broker...')
self.mq.disconnect()
def check(self, rules):
for rule in rules:
# Check rule is valid
if len(rule['thresholdInfo']) != len(rule['text']):
LOG.warning('Skipping invalid rule %s - MUST define alert text for each threshold.', rule['event'])
continue
# Get list of metrics required to evaluate each rule
params = dict()
if 'filter' in rule and rule['filter'] is not None:
params[rule['filter']] = 1
for s in (' '.join(rule['text']), ' '.join(rule['thresholdInfo']), rule['value']):
matches = re.findall('\$([a-z0-9A-Z_]+)', s)
for m in matches:
if m != 'now':
params['metric=' + m] = 1
metric_filter = '&'.join(params.keys())
LOG.debug('Metric filter = %s', metric_filter)
# Get metric data for each rule
response = GangliaDaemon.get_metrics(metric_filter)
LOG.debug('Ganglia API response: %s', response)
# Make non-metric substitutions in value, thresholdInfo and text
now = int(time.time())
rule['value'] = re.sub('\$now', str(now), rule['value'])
idx = 0
for threshold in rule['thresholdInfo']:
rule['thresholdInfo'][idx] = re.sub('\$now', str(now), threshold)
idx += 1
idx = 0
for text in rule['text']:
rule['text'][idx] = re.sub('\$now', time.strftime('%Y/%m/%d %H:%M:%S', time.localtime(now)), text)
idx += 1
metric = dict()
for m in response:
# Make metric-based substitutions in resource eg. per instance, host or cluster
resource = re.sub('\$instance', m.get('instance', '__NA__'), rule['resource'])
resource = re.sub('\$host', m.get('host', '__NA__'), resource)
resource = re.sub('\$cluster', m.get('cluster', '__NA__'), resource)
if '__NA__' in resource:
LOG.debug('Metric %s doesnt match resource rule %s', m['id'], rule['resource'])
continue
LOG.debug('Metric %s matches rule %s => %s', m['id'], rule['resource'], resource)
# Don't generate cluster alerts from host-based metrics
if 'host' in m and not '$host' in rule['resource']:
LOG.debug('Skipping host-based metric for cluster-based rule')
continue
# Build up info for alert if metric value triggers threshold
if resource not in metric:
metric[resource] = dict()
if 'thresholdInfo' not in metric[resource]:
metric[resource]['thresholdInfo'] = list(rule['thresholdInfo'])
LOG.debug('Set thresholdInfo to %s', metric[resource]['thresholdInfo'])
if 'text' not in metric[resource]:
metric[resource]['text'] = list(rule['text'])
LOG.debug('Set text to %s', metric[resource]['text'])
if m['metric'] in rule['value']:
# Determine service and environment from rule if given
if 'environment' in rule:
metric[resource]['environment'] = [rule['environment']]
else:
metric[resource]['environment'] = [m['environment']]
LOG.debug('Set environment for alert to %s', metric[resource]['environment'])
if 'service' in rule:
metric[resource]['service'] = [rule['service']]
else:
metric[resource]['service'] = [m['service']]
LOG.debug('Set service for alert to %s', metric[resource]['service'])
# Use raw metric value, or sum or average if aggregated metric
if 'value' in m:
v = GangliaDaemon.quote(m['value']) # raw value
elif rule['value'].endswith('.sum'):
v = GangliaDaemon.quote(m['sum']) # aggregated sum value if "<metric>.sum"
else:
try:
v = "%.1f" % (float(m['sum']) / float(m['num'])) # average of aggregate value
except ZeroDivisionError:
v = 0.0
LOG.debug('Value for %s on %s is %s', m['id'], resource, v)
# If no value assign rule value
# FIXME(nsatterl): what does this do?
if 'value' not in metric[resource]:
metric[resource]['value'] = rule['value']
metric[resource]['value'] = re.sub('\$%s(\.sum)?' % m['metric'], str(v),
metric[resource]['value'])
metric[resource]['units'] = m['units']
# Assign tags
metric[resource]['tags'] = list()
metric[resource]['tags'].extend(rule['tags'])
metric[resource]['tags'].append('cluster:%s' % m['cluster'])
if 'tags' in m and m['tags'] is not None:
metric[resource]['tags'].extend(m['tags'])
# Assign graph URL
if 'graphUrl' not in metric[resource]:
metric[resource]['graphUrl'] = list()
if 'graphUrl' in m:
metric[resource]['graphUrl'].append(m['graphUrl'])
for g in rule['graphs']:
if '$host' in rule['resource'] and 'graphUrl' in m:
metric[resource]['graphUrl'].append('/'.join(m['graphUrl'].rsplit('/', 2)[0:2])
+ '/graph.php?c=%s&h=%s&m=%s&r=1day&v=0&z=default'
% (m['cluster'], m['host'], g))
if '$cluster' in rule['resource'] and 'graphUrl' in m:
metric[resource]['graphUrl'].append('/'.join(m['graphUrl'].rsplit('/', 2)[0:2])
+ '/graph.php?c=%s&m=%s&r=1day&v=0&z=default'
% (m['cluster'], g))
metric[resource]['moreInfo'] = ''
if '$host' in rule['resource'] and 'graphUrl' in m:
metric[resource]['moreInfo'] = '/'.join(
m['graphUrl'].rsplit('/', 2)[0:2]) + '/?c=%s&h=%s' % (m['cluster'], m['host'])
if '$cluster' in rule['resource'] and 'graphUrl' in m:
metric[resource]['moreInfo'] = '/'.join(m['graphUrl'].rsplit('/', 2)[0:2]) + '/?c=%s' % m['cluster']
# Substitutions for threshold info
if m['metric'] in ''.join(rule['thresholdInfo']):
LOG.debug('Text to be substituted: %s', ''.join(rule['thresholdInfo']))
if 'value' in m:
v = GangliaDaemon.quote(m['value'])
elif rule['value'].endswith('.sum'):
v = GangliaDaemon.quote(m['sum'])
else:
try:
v = "%.1f" % (float(m['sum']) / float(m['num']))
except ZeroDivisionError:
v = 0.0
idx = 0
for threshold in metric[resource]['thresholdInfo']:
metric[resource]['thresholdInfo'][idx] = re.sub('\$%s(\.sum)?' % m['metric'], str(v),
threshold)
idx += 1
# Substitutions for text
if m['metric'] in ''.join(rule['text']):
LOG.debug('Text to be substituted: %s', ''.join(rule['text']))
if 'value' in m:
v = GangliaDaemon.quote(m['value'])
elif rule['value'].endswith('.sum'):
v = GangliaDaemon.quote(m['sum'])
else:
try:
v = "%.1f" % (float(m['sum']) / float(m['num']))
except ZeroDivisionError:
v = 0.0
if m['type'] == 'timestamp' or m['units'] == 'timestamp':
v = time.strftime('%Y/%m/%d %H:%M:%S', time.localtime(float(v)))
LOG.debug('Metric resource text %s', metric)
idx = 0
for text in metric[resource]['text']:
metric[resource]['text'][idx] = re.sub('\$%s(\.sum)?' % m['metric'], str(v), text)
idx += 1
LOG.debug('end of metric loop')
for resource in metric:
LOG.debug('Calculate final value for resource %s', resource)
index = 0
try:
calculated_value = eval(metric[resource]['value'])
except KeyError:
LOG.warning('Could not calculate %s value for %s because %s is not being reported',
rule['event'], resource, rule['value'])
continue
except (SyntaxError, NameError):
LOG.error('Could not calculate %s value for %s => eval(%s)', rule['event'], resource,
metric[resource]['value'])
continue
except ZeroDivisionError:
LOG.debug(
'Could not calculate %s value for %s => eval(%s) (division by zero). Setting to 0 instead.',
rule['event'], resource, metric[resource]['value'])
calculated_value = 0
except Exception:
LOG.error('Could not calculate %s value for %s => eval(%s) (threw unknown exception)',
rule['event'], resource, metric[resource]['value'])
continue
LOG.debug('Calculated value for resource %s => %s', resource, calculated_value)
# Compare final value with each threshold
for ti in metric[resource]['thresholdInfo']:
sev, op, threshold = ti.split(':')
rule_eval = '%s %s %s' % (GangliaDaemon.quote(calculated_value), op, threshold)
try:
result = eval(rule_eval)
except SyntaxError:
LOG.error('Could not evaluate %s threshold for %s => eval(%s)', rule['event'],
resource, rule_eval)
result = False
if result:
# Set necessary state variables if currentState is unknown
if (resource, rule['event']) not in currentState:
currentState[(resource, rule['event'])] = sev
currentCount[(resource, rule['event'], sev)] = 0
previousSeverity[(resource, rule['event'])] = sev
if currentState[(resource, rule[
'event'])] != sev: # Change of threshold state
currentCount[(resource, rule['event'], sev)] = currentCount.get(
(resource, rule['event'], sev), 0) + 1
currentCount[(resource, rule['event'], currentState[(resource, rule[
'event'])])] = 0 # zero-out previous sev counter
currentState[(resource, rule['event'])] = sev
elif currentState[(resource, rule[
'event'])] == sev: # Threshold state has not changed
currentCount[(resource, rule['event'], sev)] += 1
LOG.debug('calculated_value = %s, currentState = %s, currentCount = %d',
calculated_value, currentState[(resource, rule['event'])],
currentCount[(resource, rule['event'], sev)])
# Determine if should send a repeat alert
try:
repeat = (currentCount[(resource, rule['event'], sev)] - rule.get('count',
1)) % rule.get(
'repeat', 1) == 0
except:
repeat = False
LOG.debug('Send alert if prevSev %s != %s AND thresh %d == %s',
previousSeverity[(resource, rule['event'])], sev,
currentCount[(resource, rule['event'], sev)], rule.get('count', 1))
LOG.debug('Repeat? %s (%d - %d %% %d)', repeat,
currentCount[(resource, rule['event'], sev)], rule.get('count', 1),
rule.get('repeat', 1))
# Determine if current threshold count requires an alert
if ((previousSeverity[(resource, rule['event'])] != sev and currentCount[
(resource, rule['event'], sev)] == rule.get('count', 1))
or (previousSeverity[(resource, rule['event'])] == sev and repeat)):
LOG.debug('%s %s %s %s rule fired %s %s %s %s',
','.join(metric[resource]['environment']),
','.join(metric[resource]['service']), sev, rule['event'], resource,
ti, rule['text'][index], calculated_value)
event = rule['event']
group = rule['group']
value = "%s%s" % (calculated_value, GangliaDaemon.format_units(metric[resource]['units']))
severity = sev
environment = metric[resource]['environment']
service = metric[resource]['service']
text = metric[resource]['text'][index]
tags = metric[resource]['tags']
threshold_info = ','.join(rule['thresholdInfo'])
more_info = metric[resource]['moreInfo']
graphs = metric[resource]['graphUrl']
gangliaAlert = Alert(
resource=resource,
event=event,
group=group,
value=value,
severity=severity,
environment=environment,
service=service,
text=text,
event_type='gangliaAlert',
tags=tags,
threshold_info=threshold_info,
# more_info= more_info, # TODO(nsatterl): add support for more info
# graphs= graphs, # TODO(nsatterl): add support for graphs
raw_data='', # TODO(nsatterl): put raw metric values used to do calculation here
)
self.mq.send(gangliaAlert)
# Keep track of previous severity
previousSeverity[(resource, rule['event'])] = sev
break # First match wins
index += 1
@staticmethod
def get_metrics(filter):
url = "http://%s:%s/ganglia/api/v1/metrics?%s" % (CONF.ganglia_host, CONF.ganglia_port, filter)
LOG.info('Metric request %s', url)
try:
r = urllib2.urlopen(url, None, 15)
except urllib2.URLError, e:
LOG.error('Could not retrieve metric data from %s - %s', url, e)
return dict()
if r.getcode() is None:
LOG.error('Error during connection or data transfer (timeout=%d)', 15)
return dict()
response = json.loads(r.read())['response']
if response['status'] == 'error':
LOG.error('No metrics retreived - %s', response['message'])
return dict()
LOG.info('Retreived %s matching metrics in %ss', response['total'], response['time'])
return response['metrics']
@staticmethod
def init_rules():
rules = list()
LOG.info('Loading rules...')
try:
rules = yaml.load(open(RULESFILE))
except Exception, e:
LOG.error('Failed to load alert rules: %s', e)
return rules
LOG.info('Loaded %d rules OK', len(rules))
return rules
@staticmethod
def quote(s):
try:
return int(s)
except TypeError:
float(s)
return "%.1f" % float(s)
except ValueError:
return '"%s"' % s
@staticmethod
def format_units(units):
if units in ['seconds', 's']:
return 's'
if units in ['percent', '%']:
return '%'
return ' ' + units