0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-02-05 14:09:44 +00:00
alerta_alerta/alerta/logger/daemon.py
2014-01-31 15:44:56 +00:00

116 lines
No EOL
3.5 KiB
Python

import time
import json
import urllib2
import datetime
from alerta.common import config
from alerta.common import log as logging
from alerta.common.daemon import Daemon
from alerta.common.mq import Messaging, MessageHandler
from alerta.common.alert import Alert
from alerta.common.heartbeat import Heartbeat
from alerta.common.utils import DateEncoder
Version = '2.1.0'
LOG = logging.getLogger(__name__)
CONF = config.CONF
class LoggerMessage(MessageHandler):
def __init__(self, mq):
self.mq = mq
MessageHandler.__init__(self)
def on_message(self, headers, body):
LOG.debug("Received: %s", body)
try:
logAlert = Alert.parse_alert(body)
except ValueError:
return
if logAlert:
LOG.info('%s : [%s] %s', logAlert.last_receive_id, logAlert.status, logAlert.summary)
source_host, _, source_path = logAlert.resource.partition(':')
document = {
'@message': logAlert.summary,
'@source': logAlert.resource,
'@source_host': source_host,
'@source_path': source_path,
'@tags': logAlert.tags,
'@timestamp': logAlert.last_receive_time,
'@type': logAlert.event_type,
'@fields': logAlert.get_body()
}
LOG.debug('Index payload %s', document)
index_url = "http://%s:%s/%s/%s" % (CONF.es_host, CONF.es_port,
datetime.datetime.utcnow().strftime(CONF.es_index), logAlert.event_type)
LOG.debug('Index URL: %s', index_url)
try:
response = urllib2.urlopen(index_url, json.dumps(document, cls=DateEncoder)).read()
except Exception, e:
LOG.error('%s : Alert indexing to %s failed - %s', logAlert.last_receive_id, index_url, e)
return
try:
es_id = json.loads(response)['_id']
LOG.info('%s : Alert indexed at %s/%s', logAlert.last_receive_id, index_url, es_id)
except Exception, e:
LOG.error('%s : Could not parse elasticsearch reponse: %s', e)
def on_disconnected(self):
self.mq.reconnect()
class LoggerDaemon(Daemon):
"""
Index alerts in ElasticSearch using Logstash format so that logstash GUI and/or Kibana can be used as front-ends
"""
logger_opts = {
'es_host': 'localhost',
'es_port': 9200,
'es_index': 'alerta-%Y.%m.%d', # NB. Kibana config must match this index
}
def __init__(self, prog, **kwargs):
config.register_opts(LoggerDaemon.logger_opts)
Daemon.__init__(self, prog, kwargs)
def run(self):
self.running = True
# Connect to message queue
self.mq = Messaging()
self.mq.connect(callback=LoggerMessage(self.mq))
self.mq.subscribe(destination=CONF.outbound_queue)
while not self.shuttingdown:
try:
LOG.debug('Waiting for log messages...')
time.sleep(CONF.loop_every)
LOG.debug('Send heartbeat...')
heartbeat = Heartbeat(version=Version)
self.mq.send(heartbeat)
except (KeyboardInterrupt, SystemExit):
self.shuttingdown = True
LOG.info('Shutdown request received...')
self.running = False
LOG.info('Disconnecting from message broker...')
self.mq.disconnect()