0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-26 02:08:31 +00:00
alerta_alerta/contrib/experimental/alert-receiver.py
2013-03-09 23:21:08 +00:00

219 lines
No EOL
6.9 KiB
Python

#!/usr/bin/env python
########################################
#
# alert-receiver.py - Alert Receiver
#
########################################
import socket
import time
import simplejson as json
import os
import sys
import logging
import stomp
import yaml
import threading
import select
import uuid
import datetime
import errno
import re
__program__ = 'alert-receiver'
__version__ = '1.0.4'
BROKER_LIST = [('localhost', 61613)] # list of brokers for failover
ALERT_QUEUE = '/queue/alerts'
LOGFILE = '/var/log/alerta/alert-receiver.log'
PIDFILE = '/var/run/alerta/alert-receiver.pid'
CONFIGFILE = '/opt/alerta/conf/alert-receiver.yaml'
config = dict()
sock = list()
def init_config():
global config
logging.info('Loading config...')
try:
config = yaml.load(open(CONFIGFILE))
except Exception, e:
logging.error('Failed to load config: %s', e)
logging.info('Loaded %d config OK', len(config))
def send_heartbeat():
heartbeatid = str(uuid.uuid4()) # random UUID
createTime = datetime.datetime.utcnow()
headers = dict()
headers['type'] = "heartbeat"
headers['correlation-id'] = heartbeatid
heartbeat = dict()
heartbeat['id'] = heartbeatid
heartbeat['type'] = "heartbeat"
heartbeat['createTime'] = createTime.replace(microsecond=0).isoformat() + ".%03dZ" % (createTime.microsecond//1000)
heartbeat['origin'] = "%s/%s" % (__program__,os.uname()[1])
heartbeat['version'] = __version__
conn.send(json.dumps(heartbeat), headers, destination=ALERT_QUEUE)
broker = conn.get_host_and_port()
logging.info('%s : Sending heartbeat from the receiver to %s: %s', heartbeat['id'], broker[0], str(broker[1]))
def disconnect():
global sock
for s in sock:
s.close()
def connect():
global sock
for item in config:
while True:
try:
HOST, PORT = item['host'], item['port']
ADDR = (HOST, PORT)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ADDR))
sock.append(s)
logging.info('Connecting to %s port %s' % (HOST, PORT))
break
except socket.error, e:
if e.errno == errno.ECONNREFUSED :
logging.error('Server %s is not ready. Retrying later in 20 seconds.' % HOST)
time.sleep(20)
continue
else:
logging.error('Connection error to %s port %s' % (HOST, PORT))
sys.exit(2)
class ConcatJSONDecoder(json.JSONDecoder):
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
def decode(self, s, _w=WHITESPACE.match):
s_len = len(s)
objs = []
end = 0
while end != s_len:
obj, end = self.raw_decode(s, idx=_w(s, end).end())
end = _w(s, end).end()
objs.append(obj)
return objs
def main():
global conn, sock
logging.basicConfig(level=logging.INFO, format="%(asctime)s alert-receiver[%(process)d] %(threadName)s %(levelname)s - %(message)s", filename=LOGFILE)
logging.info('Starting up Alerta Poller version %s', __version__)
# Write pid file if not already running
if os.path.isfile(PIDFILE):
pid = open(PIDFILE).read()
try:
os.kill(int(pid), 0)
logging.error('Process with pid %s already exists, exiting', pid)
sys.exit(1)
except OSError:
pass
file(PIDFILE, 'w').write(str(os.getpid()))
# Initialiase config
init_config()
config_mod_time = os.path.getmtime(CONFIGFILE)
# Connect to message broker
try:
conn = stomp.Connection(BROKER_LIST)
conn.start()
conn.connect(wait=True)
except Exception, e:
logging.error('Could not connect to broker %s', e)
sys.exit(1)
# Connect to servers to get messages
connect()
while True:
try:
# Read (or re-read) config as necessary
if os.path.getmtime(CONFIGFILE) != config_mod_time:
init_config()
config_mod_time = os.path.getmtime(CONFIGFILE)
#Close connections and re-open on new config file
disconnect()
connect()
logging.info('Waiting on select ...')
ip, op, rdy = select.select(sock, [], [])
logging.info('Select finished!')
for i in ip:
alerts = list()
buf = ''
counter = 0
while True:
try:
data = i.recv(1024*4)
except socket.error:
logging.error('Error receiving from server')
logging.info('DATA >>>>%s<<<<', data)
if data == '':
logging.debug('Break emtpy buffer')
counter += 1
break
buf += data
try:
alerts = json.loads(buf, cls=ConcatJSONDecoder)
logging.info('JSON OK!')
break
except:
logging.info('JSON BAD!')
pass
if counter == 3:
logging.error('Server has gone away. Trying reconnecting ...')
disconnect()
connect()
logging.info('Received %s alerts', len(alerts))
if len(alerts) > 0:
logging.info('%s of alerts readed from the buffer', len(alerts))
for alert in alerts:
logging.info('Received alert message: %s', alert)
if 'type' in alert and 'id' in alert:
headers = dict()
headers['type'] = alert['type']
headers['correlation-id'] = alert['id']
conn.send(json.dumps(alert), headers, destination=ALERT_QUEUE)
broker = conn.get_host_and_port()
logging.info('%s : Alert sent to %s:%s', alert['id'], broker[0], str(broker[1]))
else:
logging.error('Skipping malformed alert')
else:
logging.debug('No data!')
send_heartbeat()
except (KeyboardInterrupt, SystemExit):
disconnect()
os.unlink(PIDFILE)
logging.info('Graceful exit.')
sys.exit(0)
conn.disconnect()
sys.exit(0)
if __name__ == '__main__':
main()