mirror of
https://github.com/alerta/alerta.git
synced 2025-01-24 17:29:39 +00:00
462 lines
21 KiB
Python
Executable file
462 lines
21 KiB
Python
Executable file
#!/usr/bin/env python
|
|
########################################
|
|
#
|
|
# alerta.py - Alert Server Module
|
|
#
|
|
########################################
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
try:
|
|
import json
|
|
except ImportError:
|
|
import simplejson as json
|
|
import yaml
|
|
import threading
|
|
from Queue import Queue
|
|
import stomp
|
|
import pymongo
|
|
import datetime
|
|
import pytz
|
|
import logging
|
|
import re
|
|
|
|
__program__ = 'alerta'
|
|
__version__ = '1.6.1'
|
|
|
|
BROKER_LIST = [('localhost', 61613)] # list of brokers for failover
|
|
ALERT_QUEUE = '/queue/alerts' # inbound
|
|
NOTIFY_TOPIC = '/topic/notify' # outbound
|
|
LOGGER_QUEUE = '/queue/logger' # outbound
|
|
|
|
DEFAULT_TIMEOUT = 86400 # expire OPEN alerts after 1 day
|
|
EXPIRATION_TIME = 600 # seconds = 10 minutes
|
|
|
|
LOGFILE = '/var/log/alerta/alerta.log'
|
|
PIDFILE = '/var/run/alerta/alerta.pid'
|
|
ALERTCONF = '/opt/alerta/conf/alerta.yaml'
|
|
PARSERDIR = '/opt/alerta/bin/parsers'
|
|
|
|
NUM_THREADS = 4
|
|
|
|
# Global variables
|
|
conn = None
|
|
db = None
|
|
alerts = None
|
|
mgmt = None
|
|
queue = Queue()
|
|
|
|
# Extend JSON Encoder to support ISO 8601 format dates
|
|
class DateEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, (datetime.date, datetime.datetime)):
|
|
return obj.replace(microsecond=0).isoformat() + ".%03dZ" % (obj.microsecond//1000)
|
|
else:
|
|
return json.JSONEncoder.default(self, obj)
|
|
|
|
class WorkerThread(threading.Thread):
|
|
|
|
def __init__(self, queue):
|
|
threading.Thread.__init__(self)
|
|
self.input_queue = queue
|
|
|
|
def run(self):
|
|
global db, alerts, mgmt, hb, conn, queue
|
|
|
|
while True:
|
|
alert = self.input_queue.get()
|
|
if not alert:
|
|
logging.info('%s is shutting down.', self.getName())
|
|
break
|
|
|
|
start = time.time()
|
|
alertid = alert['id']
|
|
logging.info('%s : %s', alertid, alert['summary'])
|
|
|
|
# Load alert transforms
|
|
try:
|
|
alertconf = yaml.load(open(ALERTCONF))
|
|
logging.info('Loaded %d alert transforms and blackout rules OK', len(alertconf))
|
|
except Exception, e:
|
|
alertconf = dict()
|
|
logging.warning('Failed to load alert transforms and blackout rules: %s', e)
|
|
|
|
# Apply alert transforms and blackouts
|
|
suppress = False
|
|
for conf in alertconf:
|
|
logging.debug('alertconf: %s', conf)
|
|
if all(item in alert.items() for item in conf['match'].items()):
|
|
if 'parser' in conf:
|
|
logging.debug('Loading parser %s', conf['parser'])
|
|
try:
|
|
exec(open('%s/%s.py' % (PARSERDIR, conf['parser']))) in globals(), locals()
|
|
logging.info('Parser %s/%s exec OK', PARSERDIR, conf['parser'])
|
|
except Exception, e:
|
|
logging.warning('Parser %s failed: %s', conf['parser'], e)
|
|
if 'event' in conf:
|
|
event = conf['event']
|
|
if 'resource' in conf:
|
|
resource = conf['resource']
|
|
if 'severity' in conf:
|
|
severity = conf['severity']
|
|
if 'group' in conf:
|
|
group = conf['group']
|
|
if 'value' in conf:
|
|
value = conf['value']
|
|
if 'text' in conf:
|
|
text = conf['text']
|
|
if 'environment' in conf:
|
|
environment = [conf['environment']]
|
|
if 'service' in conf:
|
|
service = [conf['service']]
|
|
if 'tags' in conf:
|
|
tags = conf['tags']
|
|
if 'correlatedEvents' in conf:
|
|
correlate = conf['correlatedEvents']
|
|
if 'thresholdInfo' in conf:
|
|
threshold = conf['thresholdInfo']
|
|
if 'suppress' in conf:
|
|
suppress = conf['suppress']
|
|
break
|
|
|
|
if suppress:
|
|
logging.info('%s : Suppressing alert %s', alert['id'], alert['summary'])
|
|
return
|
|
|
|
createTime = datetime.datetime.strptime(alert['createTime'], '%Y-%m-%dT%H:%M:%S.%fZ')
|
|
createTime = createTime.replace(tzinfo=pytz.utc)
|
|
|
|
receiveTime = datetime.datetime.strptime(alert['receiveTime'], '%Y-%m-%dT%H:%M:%S.%fZ')
|
|
receiveTime = receiveTime.replace(tzinfo=pytz.utc)
|
|
|
|
# Add expire timestamp
|
|
if 'timeout' in alert and alert['timeout'] == 0:
|
|
expireTime = ''
|
|
elif 'timeout' in alert and alert['timeout'] > 0:
|
|
expireTime = createTime + datetime.timedelta(seconds=alert['timeout'])
|
|
else:
|
|
alert['timeout'] = DEFAULT_TIMEOUT
|
|
expireTime = createTime + datetime.timedelta(seconds=alert['timeout'])
|
|
|
|
if alerts.find_one({"environment": alert['environment'], "resource": alert['resource'], "event": alert['event'], "severity": alert['severity']}):
|
|
logging.info('%s : Duplicate alert -> update dup count', alertid)
|
|
# Duplicate alert .. 1. update existing document with lastReceiveTime, lastReceiveId, text, summary, value, tags and origin
|
|
# 2. increment duplicate count
|
|
|
|
# FIXME - no native find_and_modify method in this version of pymongo
|
|
no_obj_error = "No matching object found"
|
|
alert = db.command("findAndModify", 'alerts',
|
|
allowable_errors=[no_obj_error],
|
|
query={ "environment": alert['environment'], "resource": alert['resource'], "event": alert['event'] },
|
|
update={ '$set': { "lastReceiveTime": receiveTime, "expireTime": expireTime,
|
|
"lastReceiveId": alertid, "text": alert['text'], "summary": alert['summary'], "value": alert['value'],
|
|
"tags": alert['tags'], "repeat": True, "origin": alert['origin'] },
|
|
'$inc': { "duplicateCount": 1 }},
|
|
new=True,
|
|
fields={ "history": 0 })['value']
|
|
|
|
if alert['status'] not in ['OPEN','ACK','CLOSED']:
|
|
if alert['severity'] != 'NORMAL':
|
|
status = 'OPEN'
|
|
else:
|
|
status = 'CLOSED'
|
|
else:
|
|
status = None
|
|
|
|
if status:
|
|
alert['status'] = status
|
|
updateTime = datetime.datetime.utcnow()
|
|
updateTime = updateTime.replace(tzinfo=pytz.utc)
|
|
alerts.update(
|
|
{ "environment": alert['environment'], "resource": alert['resource'], '$or': [{"event": alert['event']}, {"correlatedEvents": alert['event']}]},
|
|
{ '$set': { "status": status },
|
|
'$push': { "history": { "status": status, "updateTime": updateTime } }})
|
|
logging.info('%s : Alert status for duplicate %s %s alert changed to %s', alertid, alert['severity'], alert['event'], status)
|
|
else:
|
|
logging.info('%s : Alert status for duplicate %s %s alert unchanged because either OPEN, ACK or CLOSED', alertid, alert['severity'], alert['event'])
|
|
|
|
self.input_queue.task_done()
|
|
|
|
elif alerts.find_one({"environment": alert['environment'], "resource": alert['resource'], '$or': [{"event": alert['event']}, {"correlatedEvents": alert['event']}]}):
|
|
previousSeverity = alerts.find_one({"environment": alert['environment'], "resource": alert['resource'], '$or': [{"event": alert['event']}, {"correlatedEvents": alert['event']}]}, { "severity": 1 , "_id": 0})['severity']
|
|
logging.info('%s : Event and/or severity change %s %s -> %s update details', alertid, alert['event'], previousSeverity, alert['severity'])
|
|
# Diff sev alert ... 1. update existing document with severity, createTime, receiveTime, lastReceiveTime, previousSeverity,
|
|
# severityCode, lastReceiveId, text, summary, value, tags and origin
|
|
# 2. set duplicate count to zero
|
|
# 3. push history
|
|
|
|
# FIXME - no native find_and_modify method in this version of pymongo
|
|
no_obj_error = "No matching object found"
|
|
alert = db.command("findAndModify", 'alerts',
|
|
allowable_errors=[no_obj_error],
|
|
query={ "environment": alert['environment'], "resource": alert['resource'], '$or': [{"event": alert['event']}, {"correlatedEvents": alert['event']}]},
|
|
update={ '$set': { "event": alert['event'], "severity": alert['severity'], "severityCode": alert['severityCode'],
|
|
"createTime": createTime, "receiveTime": receiveTime, "lastReceiveTime": receiveTime, "expireTime": expireTime,
|
|
"previousSeverity": previousSeverity, "lastReceiveId": alertid, "text": alert['text'], "summary": alert['summary'], "value": alert['value'],
|
|
"tags": alert['tags'], "repeat": False, "origin": alert['origin'], "thresholdInfo": alert['thresholdInfo'], "duplicateCount": 0 },
|
|
'$push': { "history": { "createTime": createTime, "receiveTime": receiveTime, "severity": alert['severity'], "event": alert['event'],
|
|
"severityCode": alert['severityCode'], "value": alert['value'], "text": alert['text'], "id": alertid }}},
|
|
new=True,
|
|
fields={ "history": 0 })['value']
|
|
|
|
# Update alert status
|
|
status = None
|
|
|
|
if alert['severity'] in ['DEBUG','INFORM']:
|
|
status = 'OPEN'
|
|
elif alert['severity'] == 'NORMAL':
|
|
status = 'CLOSED'
|
|
elif alert['severity'] == 'WARNING':
|
|
if previousSeverity in ['NORMAL']:
|
|
status = 'OPEN'
|
|
elif alert['severity'] == 'MINOR':
|
|
if previousSeverity in ['NORMAL','WARNING']:
|
|
status = 'OPEN'
|
|
elif alert['severity'] == 'MAJOR':
|
|
if previousSeverity in ['NORMAL','WARNING','MINOR']:
|
|
status = 'OPEN'
|
|
elif alert['severity'] == 'CRITICAL':
|
|
if previousSeverity in ['NORMAL','WARNING','MINOR','MAJOR']:
|
|
status = 'OPEN'
|
|
else:
|
|
status = 'UNKNOWN'
|
|
|
|
if status:
|
|
alert['status'] = status
|
|
updateTime = datetime.datetime.utcnow()
|
|
updateTime = updateTime.replace(tzinfo=pytz.utc)
|
|
alerts.update(
|
|
{ "environment": alert['environment'], "resource": alert['resource'], '$or': [{"event": alert['event']}, {"correlatedEvents": alert['event']}]},
|
|
{ '$set': { "status": status },
|
|
'$push': { "history": { "status": status, "updateTime": updateTime } }})
|
|
logging.info('%s : Alert status for %s %s alert with diff event/severity changed to %s', alertid, alert['severity'], alert['event'], status)
|
|
|
|
# Forward alert to notify topic and logger queue
|
|
while not conn.is_connected():
|
|
logging.warning('Waiting for message broker to become available')
|
|
time.sleep(1.0)
|
|
|
|
# Use object id as canonical alert id
|
|
alert['id'] = alert['_id']
|
|
del alert['_id']
|
|
|
|
headers = dict()
|
|
headers['type'] = alert['type']
|
|
headers['correlation-id'] = alert['id']
|
|
|
|
logging.info('%s : Fwd alert to %s', alert['id'], NOTIFY_TOPIC)
|
|
try:
|
|
conn.send(json.dumps(alert, cls=DateEncoder), headers, destination=NOTIFY_TOPIC)
|
|
except Exception, e:
|
|
logging.error('Failed to send alert to broker %s', e)
|
|
|
|
logging.info('%s : Fwd alert to %s', alert['id'], LOGGER_QUEUE)
|
|
try:
|
|
conn.send(json.dumps(alert, cls=DateEncoder), headers, destination=LOGGER_QUEUE)
|
|
except Exception, e:
|
|
logging.error('Failed to send alert to broker %s', e)
|
|
|
|
self.input_queue.task_done()
|
|
logging.info('%s : Alert forwarded to %s and %s', alert['id'], NOTIFY_TOPIC, LOGGER_QUEUE)
|
|
|
|
else:
|
|
logging.info('%s : New alert -> insert', alertid)
|
|
# New alert so ... 1. insert entire document
|
|
# 2. push history
|
|
# 3. set duplicate count to zero
|
|
|
|
# Use alert id as object id
|
|
alertid = alert['id']
|
|
alert['_id'] = alertid
|
|
del alert['id']
|
|
|
|
alert['lastReceiveId'] = alertid
|
|
alert['createTime'] = createTime
|
|
alert['receiveTime'] = receiveTime
|
|
alert['lastReceiveTime'] = receiveTime
|
|
alert['expireTime'] = expireTime
|
|
alert['previousSeverity'] = 'UNKNOWN'
|
|
alert['repeat'] = False
|
|
alert['duplicateCount'] = 0
|
|
alert['status'] = 'OPEN'
|
|
|
|
alert['history'] = [ { "createTime": createTime, "receiveTime": receiveTime, "severity": alert['severity'], "event": alert['event'],
|
|
"severityCode": alert['severityCode'], "value": alert['value'], "text": alert['text'], "id": alertid } ]
|
|
alerts.insert(alert, safe=True)
|
|
|
|
if alert['severity'] != 'NORMAL':
|
|
status = 'OPEN'
|
|
else:
|
|
status = 'CLOSED'
|
|
|
|
updateTime = datetime.datetime.utcnow()
|
|
updateTime = updateTime.replace(tzinfo=pytz.utc)
|
|
alerts.update(
|
|
{ "environment": alert['environment'], "resource": alert['resource'], "event": alert['event'] },
|
|
{ '$set': { "status": status },
|
|
'$push': { "history": { "status": status, "updateTime": updateTime } }}, safe=True)
|
|
logging.info('%s : Alert status for new %s %s alert set to %s', alertid, alert['severity'], alert['event'], status)
|
|
|
|
# Forward alert to notify topic and logger queue
|
|
while not conn.is_connected():
|
|
logging.warning('Waiting for message broker to become available')
|
|
time.sleep(1.0)
|
|
|
|
alert = alerts.find_one({"_id": alertid}, {"_id": 0, "history": 0})
|
|
alert['id'] = alertid
|
|
|
|
headers = dict()
|
|
headers['type'] = alert['type']
|
|
headers['correlation-id'] = alert['id']
|
|
|
|
logging.info('%s : Fwd alert to %s', alert['id'], NOTIFY_TOPIC)
|
|
try:
|
|
conn.send(json.dumps(alert, cls=DateEncoder), headers, destination=NOTIFY_TOPIC)
|
|
except Exception, e:
|
|
logging.error('Failed to send alert to broker %s', e)
|
|
|
|
logging.info('%s : Fwd alert to %s', alert['id'], LOGGER_QUEUE)
|
|
try:
|
|
conn.send(json.dumps(alert, cls=DateEncoder), headers, destination=LOGGER_QUEUE)
|
|
except Exception, e:
|
|
logging.error('Failed to send alert to broker %s', e)
|
|
|
|
self.input_queue.task_done()
|
|
logging.info('%s : Alert forwarded to %s and %s', alert['id'], NOTIFY_TOPIC, LOGGER_QUEUE)
|
|
|
|
# Update management stats
|
|
proc_latency = int((time.time() - start) * 1000)
|
|
mgmt.update(
|
|
{ "group": "alerts", "name": "processed", "type": "timer", "title": "Alert process rate and duration", "description": "Time taken to process the alert" },
|
|
{ '$inc': { "count": 1, "totalTime": proc_latency}},
|
|
True)
|
|
delta = receiveTime - createTime
|
|
recv_latency = int(delta.days * 24 * 60 * 60 * 1000 + delta.seconds * 1000 + delta.microseconds / 1000)
|
|
mgmt.update(
|
|
{ "group": "alerts", "name": "received", "type": "timer", "title": "Alert receive rate and latency", "description": "Time taken for alert to be received by the server" },
|
|
{ '$inc': { "count": 1, "totalTime": recv_latency}},
|
|
True)
|
|
queue_len = queue.qsize()
|
|
mgmt.update(
|
|
{ "group": "alerts", "name": "queue", "type": "gauge", "title": "Alert internal queue length", "description": "Length of internal alert queue" },
|
|
{ '$set': { "value": queue_len }},
|
|
True)
|
|
logging.info('%s : Alert receive latency = %s ms, process latency = %s ms, queue length = %s', alertid, recv_latency, proc_latency, queue_len)
|
|
|
|
heartbeatTime = datetime.datetime.utcnow()
|
|
heartbeatTime = heartbeatTime.replace(tzinfo=pytz.utc)
|
|
hb.update(
|
|
{ "origin": "%s/%s" % (__program__, os.uname()[1]) },
|
|
{ "origin": "%s/%s" % (__program__, os.uname()[1]), "version": __version__, "createTime": heartbeatTime, "receiveTime": heartbeatTime },
|
|
True)
|
|
|
|
self.input_queue.task_done()
|
|
return
|
|
|
|
class MessageHandler(object):
|
|
|
|
def on_error(self, headers, body):
|
|
logging.error('Received an error %s', body)
|
|
|
|
def on_message(self, headers, body):
|
|
global hb, queue
|
|
|
|
logging.debug("Received alert : %s", body)
|
|
|
|
alert = dict()
|
|
try:
|
|
alert = json.loads(body)
|
|
except ValueError, e:
|
|
logging.error("Could not decode JSON - %s", e)
|
|
return
|
|
|
|
# Set receiveTime
|
|
receiveTime = datetime.datetime.utcnow()
|
|
alert['receiveTime'] = receiveTime.replace(microsecond=0).isoformat() + ".%03dZ" % (receiveTime.microsecond//1000)
|
|
|
|
# Get createTime
|
|
createTime = datetime.datetime.strptime(alert['createTime'], '%Y-%m-%dT%H:%M:%S.%fZ')
|
|
createTime = createTime.replace(tzinfo=pytz.utc)
|
|
|
|
# Handle heartbeats
|
|
if alert['type'] == 'heartbeat':
|
|
hb.update(
|
|
{ "origin": alert['origin'] },
|
|
{ "origin": alert['origin'], "version": alert['version'], "createTime": createTime, "receiveTime": receiveTime },
|
|
True)
|
|
logging.info('%s : heartbeat from %s', alert['id'], alert['origin'])
|
|
return
|
|
|
|
# Queue alert for processing
|
|
queue.put(alert)
|
|
|
|
def on_disconnected(self):
|
|
global conn
|
|
|
|
logging.warning('Connection lost. Attempting auto-reconnect to %s', ALERT_QUEUE)
|
|
conn.start()
|
|
conn.connect(wait=True)
|
|
conn.subscribe(destination=ALERT_QUEUE, ack='auto')
|
|
|
|
def main():
|
|
global db, alerts, mgmt, hb, conn
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s alerta[%(process)d] %(threadName)s %(levelname)s - %(message)s", filename=LOGFILE)
|
|
logging.info('Starting up Alerta version %s', __version__)
|
|
|
|
# Write pid file if not already running
|
|
if os.path.isfile(PIDFILE):
|
|
pid = open(PIDFILE).read()
|
|
try:
|
|
os.kill(int(pid), 0)
|
|
logging.error('Process with pid %s already exists, exiting', pid)
|
|
sys.exit(1)
|
|
except OSError:
|
|
pass
|
|
file(PIDFILE, 'w').write(str(os.getpid()))
|
|
|
|
# Connection to MongoDB
|
|
try:
|
|
mongo = pymongo.Connection()
|
|
db = mongo.monitoring
|
|
alerts = db.alerts
|
|
mgmt = db.status
|
|
hb = db.heartbeats
|
|
except pymongo.errors.ConnectionFailure, e:
|
|
logging.error('Mongo connection failure: %s', e)
|
|
sys.exit(1)
|
|
|
|
# Connect to message broker
|
|
try:
|
|
conn = stomp.Connection(
|
|
BROKER_LIST,
|
|
reconnect_sleep_increase = 5.0,
|
|
reconnect_sleep_max = 120.0,
|
|
reconnect_attempts_max = 20
|
|
)
|
|
conn.set_listener('', MessageHandler())
|
|
conn.start()
|
|
conn.connect(wait=True)
|
|
conn.subscribe(destination=ALERT_QUEUE, ack='auto')
|
|
except Exception, e:
|
|
logging.error('Stomp connection error: %s', e)
|
|
|
|
# Start worker thread
|
|
for i in range(NUM_THREADS):
|
|
w = WorkerThread(queue)
|
|
w.start()
|
|
logging.info('Starting alert forwarding thread: %s', w.getName())
|
|
|
|
while True:
|
|
try:
|
|
time.sleep(0.01)
|
|
except (KeyboardInterrupt, SystemExit):
|
|
for i in range(NUM_THREADS):
|
|
queue.put(None)
|
|
conn.disconnect()
|
|
os.unlink(PIDFILE)
|
|
sys.exit(0)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|