0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-30 11:36:20 +00:00
alerta_alerta/contrib/experimental/notify/daemon.py
2013-07-06 23:00:59 +02:00

338 lines
9.4 KiB
Python

import os
import time
import threading
import json
import urllib
import urllib2
import yaml
from alerta.common import config
from alerta.common import log as logging
from alerta.common.daemon import Daemon
from alerta.common.heartbeat import Heartbeat
from alerta.common import severity_code
from alerta.common.mq import Messaging, MessageHandler
Version = '2.0.0'
LOG = logging.getLogger(__name__)
CONF = config.CONF
# AQL constancs
USERNAME = ''
PASSWORD = ''
API_URL = 'http://gw.aql.com/sms/sms_gw.php'
#AQL API Responses
status = {
'0': 'SMS successfully queued',
'1': 'SMS queued partially',
'2': 'Authentication error',
'3': 'Destination number(s) error',
'4': 'Send time error',
'5': 'Insufficient credit or invalid number of msg/destination',
'9': 'Undefined error',
}
# Global dicts
owners = dict()
hold = dict()
alert = dict()
tokens = dict()
_TokenThread = None # Worker thread object
_NotifyThread = None
_Lock = threading.Lock() # Synchronization lock
TOKEN_LIMIT = 10
_token_rate = 60 # Add a token every 60 seconds
INITIAL_TOKENS = 5
class NotifyMessage(MessageHandler):
def __init__(self, mq):
self.mq = mq
MessageHandler.__init__(self)
def on_message(self, headers, body):
global alert, hold
LOG.debug("Received: %s", body)
alertid = json.loads(body)['id']
alert[alertid] = json.loads(body)
LOG.info('%s : [%s] %s', alert[alertid]['lastReceiveId'], alert[alertid]['status'],
alert[alertid]['summary'])
if not should_we_notify(alertid):
LOG.debug('%s : NOT PAGING for [%s] %s', alert[alertid]['lastReceiveId'], alert[alertid]['status'],
alert[alertid]['summary'])
del alert[alertid]
return
if alertid in hold:
if alert[alertid]['severity'] == severity_code.NORMAL:
LOG.info('%s : Dropping NORMAL alert %s', alert[alertid]['lastReceiveId'], alertid)
del hold[alertid]
del alert[alertid]
else:
LOG.info('%s : Update alert %s details', alert[alertid]['lastReceiveId'], alertid)
else:
hold[alertid] = time.time() + CONF.notify_wait
LOG.info('%s : Holding onto alert %s for %s seconds', alert[alertid]['lastReceiveId'], alertid,
CONF.notify_wait)
def on_disconnected(self):
self.mq.reconnect()
class NotifyDaemon(Daemon):
def run(self):
self.running = True
# Initialiase alert config
init_config()
# Start token bucket thread
_TokenThread = TokenTopUp()
_TokenThread.start()
# Start notify thread
_NotifyThread = ReleaseThread()
_NotifyThread.start()
# Connect to message queue
self.mq = Messaging()
self.mq.connect(callback=NotifyMessage(self.mq))
self.mq.subscribe(destination=CONF.outbound_topic)
while not self.shuttingdown:
try:
# Read (or re-read) config as necessary
if os.path.getmtime(CONF.yaml_config) != config_mod_time:
init_config()
config_mod_time = os.path.getmtime(CONF.yaml_config)
LOG.debug('Waiting for email messages...')
time.sleep(CONF.loop_every)
LOG.debug('Send heartbeat...')
heartbeat = Heartbeat(version=Version)
self.mq.send(heartbeat)
except (KeyboardInterrupt, SystemExit):
self.shuttingdown = True
_TokenThread.shutdown()
_NotifyThread.shutdown()
LOG.info('Shutdown request received...')
self.running = False
LOG.info('Disconnecting from message broker...')
self.mq.disconnect()
def should_we_notify(alertid):
for tag in alert[alertid]['tags']:
if tag.startswith('sms:') or tag.startswith('email:'):
return 1
return 0
def who_to_notify(tag):
owner = tag.split(':')[1]
LOG.info('Identifing owner as %s', owner)
return owner
def sms_notify(alertid, username, password, destination, url=API_URL):
message = alert[alertid]['summary']
data = urllib.urlencode(
{'username': username, 'password': password, 'destination': destination, 'message': message})
LOG.info('Api call %s', url + '?' + data)
req = urllib2.Request(url, data)
f = urllib2.urlopen(req)
response = f.read()
f.close()
#response = '0:1 SMS successfully queued'
#response = '2:0 Authentication error'
# Api call response syntax.
# <status no>:<no of credits used> <description>
LOG.info('Api response %s', response)
# Verify response
if status['0'] in response:
return 0
else:
return
def init_tokens():
global tokens
try:
for owner in owners:
tokens[owner, 'sms'] = INITIAL_TOKENS
tokens[owner, 'email'] = INITIAL_TOKENS
except Exception, e:
LOG.error('Failed to initialize tokens %s', e)
pass
def init_config():
global owners, USERNAME, PASSWORD
LOG.info('Loading config.')
try:
config = yaml.load(open(CONF.yaml_config))
except Exception, e:
LOG.error('Failed to load alert config: %s', e)
pass
USERNAME = config['global']['USERNAME']
PASSWORD = config['global']['PASSWORD']
owners = config['owners']
LOG.info('Loaded %d owners in config.', len(owners))
init_tokens()
def send_notify(alertid):
global tokens, hold
try:
for tag in alert[alertid]['tags']:
if tag.startswith('sms:') or tag.startswith('email:'):
who = who_to_notify(tag)
message = alert[alertid]['summary']
if tag.startswith('sms:') and tokens[who, 'sms'] > 0:
_Lock.acquire()
tokens[who, 'sms'] -= 1
_Lock.release()
LOG.debug('Taken a sms token from %s, there are only %d left', who, tokens[who, 'sms'])
sms_notify(alertid, USERNAME, PASSWORD, owners[who]['mobile'])
elif tokens[who, 'sms'] == 0:
LOG.error('%s run out of sms tokens. Failed to notify %s.', who,
alert[alertid]['lastReceiveId'])
if tag.startswith('email:') and tokens[who, 'email'] > 0:
_Lock.acquire()
tokens[who, 'email'] -= 1
_Lock.release()
LOG.debug('Taken a email token from %s, there are only %d left', who, tokens[who, 'sms'])
email_notify(alertid, owners[who]['email'])
elif tokens[who, 'email'] == 0:
LOG.error('%s run out of email tokens. Failed to notify %s.', who,
alert[alertid]['lastReceiveId'])
except Exception, e:
LOG.error('Notify sending failed for "%s" - %s - %s', alert[alertid]['lastReceiveId'], message, e)
pass
def on_disconnected(self):
global conn
LOG.warning('Connection lost. Attempting auto-reconnect to %s', NOTIFY_TOPIC)
conn.start()
conn.connect(wait=True)
conn.subscribe(destination=NOTIFY_TOPIC, ack='auto', headers={'selector': "repeat = 'false'"})
class ReleaseThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.running = False
self.shuttingdown = False
def shutdown(self):
self.shuttingdown = True
if not self.running:
return
self.join()
def run(self):
global alert, hold
self.running = True
while not self.shuttingdown:
if self.shuttingdown:
break
notified = dict()
for alertid in hold:
if hold[alertid] < time.time():
LOG.warning('Hold expired for %s and trigger notification', alertid)
send_notify(alertid)
notified[alertid] = 1
for alertid in notified:
del alert[alertid]
del hold[alertid]
if not self.shuttingdown:
time.sleep(5)
self.running = False
class TokenTopUp(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.running = False
self.shuttingdown = False
def shutdown(self):
self.shuttingdown = True
if not self.running:
return
self.join()
def run(self):
global tokens, _token_rate
self.running = True
i = 0
while not self.shuttingdown:
if self.shuttingdown:
break
if i == 6:
try:
i = 0
for owner in owners:
if tokens[owner, 'sms'] < TOKEN_LIMIT:
_Lock.acquire()
tokens[owner, 'sms'] += 1
_Lock.release()
if tokens[owner, 'email'] < TOKEN_LIMIT:
_Lock.acquire()
tokens[owner, 'email'] += 1
_Lock.release()
except OSError:
pass
if not self.shuttingdown:
time.sleep(_token_rate / 6)
i += 1
self.running = False