0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-24 17:29:39 +00:00
alerta_alerta/bin/alert-urlmon.py
2012-12-04 14:39:05 +00:00

489 lines
19 KiB
Python
Executable file

#!/usr/bin/env python
########################################
#
# alert-urlmon.py - Alert URL Monitor
#
########################################
import os
import sys
import time
import urllib2
try:
import json
except ImportError:
import simplejson
import threading
from Queue import Queue
import yaml
import stomp
import datetime
import logging
import uuid
import re
from BaseHTTPServer import BaseHTTPRequestHandler as BHRH
HTTP_RESPONSES = dict([(k, v[0]) for k, v in BHRH.responses.items()])
__program__ = 'alert-urlmon'
__version__ = '1.5.12'
BROKER_LIST = [('localhost', 61613)] # list of brokers for failover
ALERT_QUEUE = '/queue/alerts'
DEFAULT_TIMEOUT = 86400
EXPIRATION_TIME = 600 # seconds = 10 minutes
URLFILE = '/opt/alerta/conf/alert-urlmon.yaml'
LOGFILE = '/var/log/alerta/alert-urlmon.log'
PIDFILE = '/var/run/alerta/alert-urlmon.pid'
REQUEST_TIMEOUT = 15 # seconds
NUM_THREADS = 10
GMETRIC_SEND = True
GMETRIC_CMD = '/usr/bin/gmetric'
GMETRIC_OPTIONS = '--spoof 10.1.1.1:urlmon --conf /etc/ganglia/alerta/gmond-alerta.conf'
HTTP_ALERTS = [
'HttpConnectionError',
'HttpServerError',
'HttpClientError',
'HttpRedirection',
'HttpContentError',
'HttpResponseSlow',
'HttpResponseOK',
]
# Add missing responses
HTTP_RESPONSES[102] = 'Processing'
HTTP_RESPONSES[207] = 'Multi-Status'
HTTP_RESPONSES[422] = 'Unprocessable Entity'
HTTP_RESPONSES[423] = 'Locked'
HTTP_RESPONSES[424] = 'Failed Dependency'
HTTP_RESPONSES[506] = 'Variant Also Negotiates'
HTTP_RESPONSES[507] = 'Insufficient Storage'
HTTP_RESPONSES[510] = 'Not Extended'
SEVERITY_CODE = {
# ITU RFC5674 -> Syslog RFC5424
'CRITICAL': 1, # Alert
'MAJOR': 2, # Crtical
'MINOR': 3, # Error
'WARNING': 4, # Warning
'NORMAL': 5, # Notice
'INFORM': 6, # Informational
'DEBUG': 7, # Debug
}
_check_rate = 60 # Check rate of alerts
# Global variables
urls = dict()
queue = Queue()
currentCount = dict()
currentState = dict()
previousEvent = dict()
# Do not follow redirects
class NoRedirection(urllib2.HTTPRedirectHandler):
def http_error_302(self, req, fp, code, msg, headers):
result = urllib2.HTTPRedirectHandler.http_error_302(self, req, fp, code, msg, headers)
result.status = code
result.code = code
return result
http_error_301 = http_error_303 = http_error_307 = http_error_302
class WorkerThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.input_queue = queue
def run(self):
global conn
while True:
flag,item = self.input_queue.get()
if flag == 'stop':
logging.info('%s is shutting down.', self.getName())
break
if flag == 'timestamp':
urlmon_cycletime = time.time() - item
logging.info('Took %d seconds to schedule all checks.', urlmon_cycletime)
if GMETRIC_SEND:
gmetric_cmd = "%s --name urlmon_cycletime --value %d --type uint16 --units seconds --slope both --group urlmon %s" % (
GMETRIC_CMD, urlmon_cycletime, GMETRIC_OPTIONS)
logging.debug("%s", gmetric_cmd)
os.system("%s" % gmetric_cmd)
self.input_queue.task_done()
continue
# defaults
search_string = item.get('search', None)
rule = item.get('rule', None)
warn_thold = item.get('warning', 2000) # ms
crit_thold = item.get('critical', 5000) # ms
post = item.get('post', None)
logging.info('%s checking %s', self.getName(), item['url'])
response = ''
code = None
status = None
start = time.time()
headers = dict()
if 'headers' in item:
headers = dict(item['headers'])
username = item.get('username', None)
password = item.get('password', None)
realm = item.get('realm', None)
uri = item.get('uri', None)
proxy = item.get('proxy', False)
if proxy:
proxy_handler = urllib2.ProxyHandler(proxy)
if username and password:
auth_handler = urllib2.HTTPBasicAuthHandler()
auth_handler.add_password(realm = realm,
uri = uri,
user = username,
passwd = password)
if proxy:
opener = urllib2.build_opener(auth_handler, proxy_handler)
else:
opener = urllib2.build_opener(auth_handler)
else:
redir_handler = NoRedirection()
if proxy:
opener = urllib2.build_opener(redir_handler, proxy_handler)
else:
opener = urllib2.build_opener(redir_handler)
urllib2.install_opener(opener)
if 'User-agent' not in headers:
headers['User-agent'] = 'alert-urlmon/%s Python-urllib/%s' % (__version__, urllib2.__version__)
try:
if post:
req = urllib2.Request(item['url'], json.dumps(post), headers=headers)
else:
req = urllib2.Request(item['url'], headers=headers)
response = urllib2.urlopen(req, None, REQUEST_TIMEOUT)
except ValueError, e:
logging.error('Request failed: %s', e)
continue
except urllib2.URLError, e:
if hasattr(e, 'reason'):
reason = str(e.reason)
elif hasattr(e, 'code'):
code = e.code
else:
code = response.getcode()
body = response.read()
rtt = int((time.time() - start) * 1000) # round-trip time
try:
status = HTTP_RESPONSES[code]
except KeyError:
status = 'undefined'
if code is None:
event = 'HttpConnectionError'
severity = 'MAJOR'
value = reason
descrStr = 'Error during connection or data transfer (timeout=%d).' % (REQUEST_TIMEOUT)
elif code >= 500:
event = 'HttpServerError'
severity = 'MAJOR'
value = '%s (%d)' % (status, code)
descrStr = 'HTTP server responded with status code %d in %dms' % (code, rtt)
elif code >= 400:
event = 'HttpClientError'
severity = 'MINOR'
value = '%s (%d)' % (status, code)
descrStr = 'HTTP server responded with status code %d in %dms' % (code, rtt)
elif code >= 300:
event = 'HttpRedirection'
severity = 'MINOR'
value = '%s (%d)' % (status, code)
descrStr = 'HTTP server responded with status code %d in %dms' % (code, rtt)
elif code >= 200:
event = 'HttpResponseOK'
severity = 'NORMAL'
value = '%s (%d)' % (status, code)
descrStr = 'HTTP server responded with status code %d in %dms' % (code, rtt)
if rtt > crit_thold:
event = 'HttpResponseSlow'
severity = 'CRITICAL'
value = '%dms' % rtt
descrStr = 'Website available but exceeding critical RT thresholds of %dms' % (crit_thold)
elif rtt > warn_thold:
event = 'HttpResponseSlow'
severity = 'WARNING'
value = '%dms' % rtt
descrStr = 'Website available but exceeding warning RT thresholds of %dms' % (warn_thold)
if search_string:
logging.debug('Searching for %s', search_string)
found = False
for line in body.split('\n'):
m = re.search(search_string, line)
if m:
found = True
logging.debug("Regex: Found %s in %s", search_string, line)
break
if not found:
event = 'HttpContentError'
severity = 'MINOR'
value = 'Search failed'
descrStr = 'Website available but pattern "%s" not found' % (search_string)
elif rule:
logging.debug('Evaluating rule %s', rule)
if 'Content-type' in headers and headers['Content-type'] == 'application/json':
body = json.loads(body)
try:
eval(rule)
except:
logging.error('Could not evaluate rule %s', rule)
else:
if not eval(rule):
event = 'HttpContentError'
severity = 'MINOR'
value = 'Rule failed'
descrStr = 'Website available but rule evaluation failed (%s)' % (rule)
elif code >= 100:
event = 'HttpInformational'
severity = 'NORMAL'
value = '%s (%d)' % (status, code)
descrStr = 'HTTP server responded with status code %d in %dms' % (code, rtt)
logging.debug("URL: %s, Status: %s (%s), Round-Trip Time: %dms -> %s", item['url'], status, code, rtt, event)
# Forward metric data to Ganglia
if code and code < 300:
avail = 100.0 # 1xx, 2xx -> 100% available
else:
avail = 0.0
if GMETRIC_SEND:
gmetric_cmd = "%s --name availability-%s --value %.1f --type float --units \" \" --slope both --group %s %s" % (
GMETRIC_CMD, item['resource'], avail, ','.join(item['service']), GMETRIC_OPTIONS) # XXX - gmetric doesn't support multiple groups
logging.debug("%s", gmetric_cmd)
os.system("%s" % gmetric_cmd)
gmetric_cmd = "%s --name response_time-%s --value %d --type uint16 --units ms --slope both --group %s %s" % (
GMETRIC_CMD, item['resource'], rtt, ','.join(item['service']), GMETRIC_OPTIONS)
logging.debug("%s", gmetric_cmd)
os.system("%s" % gmetric_cmd)
# Set necessary state variables if currentState is unknown
res = item['resource']
if (res) not in currentState:
currentState[(res)] = event
currentCount[(res, event)] = 0
previousEvent[(res)] = event
if currentState[(res)] != event: # Change of threshold state
currentCount[(res, event)] = currentCount.get((res, event), 0) + 1
currentCount[(res, currentState[(res)])] = 0 # zero-out previous event counter
currentState[(res)] = event
elif currentState[(res)] == event: # Threshold state has not changed
currentCount[(res, event)] += 1
logging.debug('currentState = %s, currentCount = %d', currentState[(res)], currentCount[(res, event)])
# Determine if should send a repeat alert
if currentCount[(res, event)] < item.get('count', 1):
repeat = False
logging.debug('Send repeat alert = %s (curr %s < threshold %s)', repeat, currentCount[(res, event)], item.get('count', 1))
else:
repeat = (currentCount[(res, event)] - item.get('count', 1)) % item.get('repeat', 1) == 0
logging.debug('Send repeat alert = %s (%d - %d %% %d)', repeat, currentCount[(res, event)], item.get('count', 1), item.get('repeat', 1))
logging.debug('Send alert if prevEvent %s != %s AND thresh %d == %s', previousEvent[(res)], event, currentCount[(res, event)], item.get('count', 1))
# Determine if current threshold count requires an alert
if ((previousEvent[(res)] != event and currentCount[(res, event)] == item.get('count', 1))
or (previousEvent[(res)] == event and repeat)):
alertid = str(uuid.uuid4()) # random UUID
createTime = datetime.datetime.utcnow()
headers = dict()
headers['type'] = "serviceAlert"
headers['correlation-id'] = alertid
# standard alert info
alert = dict()
alert['id'] = alertid
alert['resource'] = item['resource']
alert['event'] = event
alert['group'] = 'Web'
alert['value'] = value
alert['severity'] = severity
alert['severityCode'] = SEVERITY_CODE[severity]
alert['environment'] = item['environment']
alert['service'] = item['service']
alert['text'] = descrStr
alert['type'] = 'serviceAlert'
alert['tags'] = item.get('tags', list())
alert['summary'] = '%s - %s %s is %s on %s %s' % (','.join(item['environment']), severity, event, value, ','.join(item['service']), item['resource'])
alert['createTime'] = createTime.replace(microsecond=0).isoformat() + ".%03dZ" % (createTime.microsecond//1000)
alert['origin'] = "%s/%s" % (__program__, os.uname()[1])
alert['thresholdInfo'] = "%s : RT > %d RT > %d x %s" % (item['url'], warn_thold, crit_thold, item.get('count', 1))
alert['timeout'] = DEFAULT_TIMEOUT
alert['correlatedEvents'] = HTTP_ALERTS
logging.info('%s : %s', alertid, json.dumps(alert))
while not conn.is_connected():
logging.warning('Waiting for message broker to become available')
time.sleep(1.0)
try:
conn.send(json.dumps(alert), headers, destination=ALERT_QUEUE)
broker = conn.get_host_and_port()
logging.info('%s : Alert sent to %s:%s', alertid, broker[0], str(broker[1]))
except Exception, e:
logging.error('Failed to send alert to broker %s', e)
# Keep track of previous event
previousEvent[(res)] = event
self.input_queue.task_done()
logging.info('%s check complete.', self.getName())
self.input_queue.task_done()
return
class MessageHandler(object):
def on_error(self, headers, body):
logging.error('Received an error %s', body)
def on_disconnected(self):
global conn
logging.warning('Connection lost. Attempting auto-reconnect to %s', ALERT_QUEUE)
conn.start()
conn.connect(wait=True)
conn.subscribe(destination=ALERT_QUEUE, ack='auto')
def send_heartbeat():
global conn
heartbeatid = str(uuid.uuid4()) # random UUID
createTime = datetime.datetime.utcnow()
headers = dict()
headers['type'] = "heartbeat"
headers['correlation-id'] = heartbeatid
# headers['persistent'] = 'false'
# headers['expires'] = int(time.time() * 1000) + EXPIRATION_TIME * 1000
heartbeat = dict()
heartbeat['id'] = heartbeatid
heartbeat['type'] = "heartbeat"
heartbeat['createTime'] = createTime.replace(microsecond=0).isoformat() + ".%03dZ" % (createTime.microsecond//1000)
heartbeat['origin'] = "%s/%s" % (__program__, os.uname()[1])
heartbeat['version'] = __version__
try:
conn.send(json.dumps(heartbeat), headers, destination=ALERT_QUEUE)
broker = conn.get_host_and_port()
logging.info('%s : Heartbeat sent to %s:%s', heartbeatid, broker[0], str(broker[1]))
except Exception, e:
logging.error('Failed to send heartbeat to broker %s', e)
# Initialise Rules
def init_urls():
global urls
logging.info('Loading URLs...')
try:
urls = yaml.load(open(URLFILE))
except Exception, e:
logging.error('Failed to load URLs: %s', e)
logging.info('Loaded %d URLs OK', len(urls))
def main():
global urls, conn
logging.basicConfig(level=logging.INFO, format="%(asctime)s alert-urlmon[%(process)d] %(threadName)s %(levelname)s - %(message)s", filename=LOGFILE)
logging.info('Starting up URL monitor version %s', __version__)
# Write pid file if not already running
if os.path.isfile(PIDFILE):
pid = open(PIDFILE).read()
try:
os.kill(int(pid), 0)
logging.error('Process with pid %s already exists, exiting', pid)
sys.exit(1)
except OSError:
pass
file(PIDFILE, 'w').write(str(os.getpid()))
# Connect to message broker
logging.info('Connect to broker')
try:
conn = stomp.Connection(
BROKER_LIST,
reconnect_sleep_increase = 5.0,
reconnect_sleep_max = 120.0,
reconnect_attempts_max = 20
)
conn.set_listener('', MessageHandler())
conn.start()
conn.connect(wait=True)
except Exception, e:
logging.error('Stomp connection error: %s', e)
sys.exit(1)
# Initialiase alert rules
init_urls()
url_mod_time = os.path.getmtime(URLFILE)
# Start worker threads
for i in range(NUM_THREADS):
w = WorkerThread(queue)
w.start()
logging.info('Starting thread: %s', w.getName())
while True:
try:
# Read (or re-read) urls as necessary
if os.path.getmtime(URLFILE) != url_mod_time:
init_urls()
url_mod_time = os.path.getmtime(URLFILE)
for url in urls:
queue.put(('url',url))
queue.put(('timestamp', time.time()))
send_heartbeat()
time.sleep(_check_rate)
urlmon_qsize = queue.qsize()
logging.info('URL check queue length is %d', urlmon_qsize)
if GMETRIC_SEND:
gmetric_cmd = "%s --name urlmon_qsize --value %d --type uint16 --units \" \" --slope both --group urlmon %s" % (
GMETRIC_CMD, urlmon_qsize, GMETRIC_OPTIONS)
logging.debug("%s", gmetric_cmd)
os.system("%s" % gmetric_cmd)
except (KeyboardInterrupt, SystemExit):
conn.disconnect()
for i in range(NUM_THREADS):
queue.put(('stop',None))
w.join()
os.unlink(PIDFILE)
logging.info('Graceful exit.')
sys.exit(0)
if __name__ == '__main__':
main()