0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-02-21 20:36:07 +00:00
alerta_alerta/contrib/experimental/notify/daemon.py

339 lines
9.4 KiB
Python
Raw Normal View History

2013-04-09 18:32:54 +00:00
2013-03-02 16:44:58 +00:00
import os
import time
import threading
import json
import urllib
import urllib2
import yaml
2013-04-09 18:32:54 +00:00
from alerta.common import config
2013-03-02 16:44:58 +00:00
from alerta.common import log as logging
from alerta.common.daemon import Daemon
2013-04-09 18:32:54 +00:00
from alerta.common.heartbeat import Heartbeat
from alerta.common import severity_code
from alerta.common.mq import Messaging, MessageHandler
2013-03-02 16:44:58 +00:00
2013-03-03 10:48:32 +00:00
Version = '2.0.0'
2013-03-02 16:44:58 +00:00
LOG = logging.getLogger(__name__)
CONF = config.CONF
# AQL constancs
USERNAME = ''
PASSWORD = ''
API_URL = 'http://gw.aql.com/sms/sms_gw.php'
#AQL API Responses
status = {
'0': 'SMS successfully queued',
'1': 'SMS queued partially',
'2': 'Authentication error',
'3': 'Destination number(s) error',
'4': 'Send time error',
'5': 'Insufficient credit or invalid number of msg/destination',
'9': 'Undefined error',
}
# Global dicts
owners = dict()
hold = dict()
alert = dict()
tokens = dict()
_TokenThread = None # Worker thread object
_NotifyThread = None
_Lock = threading.Lock() # Synchronization lock
TOKEN_LIMIT = 10
_token_rate = 60 # Add a token every 60 seconds
INITIAL_TOKENS = 5
class NotifyMessage(MessageHandler):
def __init__(self, mq):
self.mq = mq
MessageHandler.__init__(self)
2013-03-08 21:18:48 +00:00
def on_message(self, headers, body):
global alert, hold
LOG.debug("Received: %s", body)
alertid = json.loads(body)['id']
alert[alertid] = json.loads(body)
LOG.info('%s : [%s] %s', alert[alertid]['lastReceiveId'], alert[alertid]['status'],
2013-03-08 23:07:43 +00:00
alert[alertid]['summary'])
2013-03-08 21:18:48 +00:00
if not should_we_notify(alertid):
LOG.debug('%s : NOT PAGING for [%s] %s', alert[alertid]['lastReceiveId'], alert[alertid]['status'],
2013-03-08 23:07:43 +00:00
alert[alertid]['summary'])
2013-03-08 21:18:48 +00:00
del alert[alertid]
return
if alertid in hold:
if alert[alertid]['severity'] == severity_code.NORMAL:
2013-03-08 21:18:48 +00:00
LOG.info('%s : Dropping NORMAL alert %s', alert[alertid]['lastReceiveId'], alertid)
del hold[alertid]
del alert[alertid]
else:
LOG.info('%s : Update alert %s details', alert[alertid]['lastReceiveId'], alertid)
else:
hold[alertid] = time.time() + CONF.notify_wait
LOG.info('%s : Holding onto alert %s for %s seconds', alert[alertid]['lastReceiveId'], alertid,
2013-03-08 23:07:43 +00:00
CONF.notify_wait)
2013-03-08 21:18:48 +00:00
def on_disconnected(self):
self.mq.reconnect()
2013-03-08 21:18:48 +00:00
2013-03-02 16:44:58 +00:00
class NotifyDaemon(Daemon):
2013-03-08 21:18:48 +00:00
def run(self):
self.running = True
# Initialiase alert config
init_config()
# Start token bucket thread
_TokenThread = TokenTopUp()
_TokenThread.start()
# Start notify thread
_NotifyThread = ReleaseThread()
_NotifyThread.start()
2013-03-02 16:44:58 +00:00
2013-03-08 21:18:48 +00:00
# Connect to message queue
self.mq = Messaging()
self.mq.connect(callback=NotifyMessage(self.mq))
2013-04-11 16:14:20 +00:00
self.mq.subscribe(destination=CONF.outbound_topic)
2013-03-02 16:44:58 +00:00
2013-03-08 21:18:48 +00:00
while not self.shuttingdown:
try:
# Read (or re-read) config as necessary
if os.path.getmtime(CONF.yaml_config) != config_mod_time:
init_config()
config_mod_time = os.path.getmtime(CONF.yaml_config)
2013-03-02 16:44:58 +00:00
2013-03-08 21:18:48 +00:00
LOG.debug('Waiting for email messages...')
time.sleep(CONF.loop_every)
LOG.debug('Send heartbeat...')
heartbeat = Heartbeat(version=Version)
self.mq.send(heartbeat)
except (KeyboardInterrupt, SystemExit):
self.shuttingdown = True
_TokenThread.shutdown()
_NotifyThread.shutdown()
LOG.info('Shutdown request received...')
self.running = False
LOG.info('Disconnecting from message broker...')
self.mq.disconnect()
2013-03-02 16:44:58 +00:00
def should_we_notify(alertid):
for tag in alert[alertid]['tags']:
if tag.startswith('sms:') or tag.startswith('email:'):
return 1
return 0
def who_to_notify(tag):
owner = tag.split(':')[1]
2013-03-08 21:18:48 +00:00
LOG.info('Identifing owner as %s', owner)
2013-03-02 16:44:58 +00:00
return owner
def sms_notify(alertid, username, password, destination, url=API_URL):
message = alert[alertid]['summary']
data = urllib.urlencode(
{'username': username, 'password': password, 'destination': destination, 'message': message})
2013-03-08 21:18:48 +00:00
LOG.info('Api call %s', url + '?' + data)
2013-03-02 16:44:58 +00:00
req = urllib2.Request(url, data)
f = urllib2.urlopen(req)
response = f.read()
f.close()
#response = '0:1 SMS successfully queued'
#response = '2:0 Authentication error'
# Api call response syntax.
# <status no>:<no of credits used> <description>
2013-03-08 21:18:48 +00:00
LOG.info('Api response %s', response)
2013-03-02 16:44:58 +00:00
# Verify response
if status['0'] in response:
return 0
else:
return
def init_tokens():
global tokens
try:
for owner in owners:
tokens[owner, 'sms'] = INITIAL_TOKENS
tokens[owner, 'email'] = INITIAL_TOKENS
except Exception, e:
2013-03-08 21:18:48 +00:00
LOG.error('Failed to initialize tokens %s', e)
2013-03-02 16:44:58 +00:00
pass
def init_config():
global owners, USERNAME, PASSWORD
2013-03-08 21:18:48 +00:00
LOG.info('Loading config.')
2013-03-02 16:44:58 +00:00
try:
2013-03-08 21:18:48 +00:00
config = yaml.load(open(CONF.yaml_config))
2013-03-02 16:44:58 +00:00
except Exception, e:
2013-03-08 21:18:48 +00:00
LOG.error('Failed to load alert config: %s', e)
2013-03-02 16:44:58 +00:00
pass
USERNAME = config['global']['USERNAME']
PASSWORD = config['global']['PASSWORD']
owners = config['owners']
2013-03-08 21:18:48 +00:00
LOG.info('Loaded %d owners in config.', len(owners))
2013-03-02 16:44:58 +00:00
init_tokens()
def send_notify(alertid):
global tokens, hold
try:
for tag in alert[alertid]['tags']:
if tag.startswith('sms:') or tag.startswith('email:'):
who = who_to_notify(tag)
message = alert[alertid]['summary']
if tag.startswith('sms:') and tokens[who, 'sms'] > 0:
_Lock.acquire()
tokens[who, 'sms'] -= 1
_Lock.release()
2013-03-08 21:18:48 +00:00
LOG.debug('Taken a sms token from %s, there are only %d left', who, tokens[who, 'sms'])
2013-03-02 16:44:58 +00:00
sms_notify(alertid, USERNAME, PASSWORD, owners[who]['mobile'])
elif tokens[who, 'sms'] == 0:
2013-03-08 21:18:48 +00:00
LOG.error('%s run out of sms tokens. Failed to notify %s.', who,
2013-03-08 23:07:43 +00:00
alert[alertid]['lastReceiveId'])
2013-03-02 16:44:58 +00:00
if tag.startswith('email:') and tokens[who, 'email'] > 0:
_Lock.acquire()
tokens[who, 'email'] -= 1
_Lock.release()
2013-03-08 21:18:48 +00:00
LOG.debug('Taken a email token from %s, there are only %d left', who, tokens[who, 'sms'])
2013-03-02 16:44:58 +00:00
email_notify(alertid, owners[who]['email'])
elif tokens[who, 'email'] == 0:
2013-03-08 21:18:48 +00:00
LOG.error('%s run out of email tokens. Failed to notify %s.', who,
2013-03-08 23:07:43 +00:00
alert[alertid]['lastReceiveId'])
2013-03-02 16:44:58 +00:00
except Exception, e:
2013-03-08 21:18:48 +00:00
LOG.error('Notify sending failed for "%s" - %s - %s', alert[alertid]['lastReceiveId'], message, e)
2013-03-02 16:44:58 +00:00
pass
def on_disconnected(self):
global conn
2013-03-08 21:18:48 +00:00
LOG.warning('Connection lost. Attempting auto-reconnect to %s', NOTIFY_TOPIC)
2013-03-02 16:44:58 +00:00
conn.start()
conn.connect(wait=True)
conn.subscribe(destination=NOTIFY_TOPIC, ack='auto', headers={'selector': "repeat = 'false'"})
class ReleaseThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.running = False
self.shuttingdown = False
def shutdown(self):
self.shuttingdown = True
if not self.running:
return
self.join()
def run(self):
global alert, hold
self.running = True
while not self.shuttingdown:
if self.shuttingdown:
break
notified = dict()
for alertid in hold:
if hold[alertid] < time.time():
2013-03-08 21:18:48 +00:00
LOG.warning('Hold expired for %s and trigger notification', alertid)
2013-03-02 16:44:58 +00:00
send_notify(alertid)
notified[alertid] = 1
for alertid in notified:
del alert[alertid]
del hold[alertid]
if not self.shuttingdown:
time.sleep(5)
self.running = False
class TokenTopUp(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.running = False
self.shuttingdown = False
def shutdown(self):
self.shuttingdown = True
if not self.running:
return
self.join()
def run(self):
global tokens, _token_rate
self.running = True
i = 0
while not self.shuttingdown:
if self.shuttingdown:
break
if i == 6:
try:
i = 0
for owner in owners:
if tokens[owner, 'sms'] < TOKEN_LIMIT:
_Lock.acquire()
tokens[owner, 'sms'] += 1
_Lock.release()
if tokens[owner, 'email'] < TOKEN_LIMIT:
_Lock.acquire()
tokens[owner, 'email'] += 1
_Lock.release()
except OSError:
pass
if not self.shuttingdown:
time.sleep(_token_rate / 6)
i += 1
self.running = False