0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-30 11:36:20 +00:00
alerta_alerta/alerta/cloudwatch/daemon.py
2014-04-07 20:41:07 +01:00

177 lines
5.4 KiB
Python

import sys
import json
import time
import datetime
import boto.sqs
from boto.sqs.message import RawMessage
from boto import exception
from alerta.common import config
from alerta.common import log as logging
from alerta.common.daemon import Daemon
from alerta.common.alert import Alert
from alerta.common.heartbeat import Heartbeat
from alerta.common import severity_code
from alerta.common.transform import Transformers
from alerta.common.dedup import DeDup
from alerta.common.api import ApiClient
from alerta.common.graphite import StatsD
__version__ = '3.0.3'
LOG = logging.getLogger(__name__)
CONF = config.CONF
class CloudWatchDaemon(Daemon):
cloudwatch_opts = {
'cloudwatch_sqs_region': 'eu-west-1',
'cloudwatch_sqs_queue': 'cloudwatch-to-alerta',
'cloudwatch_access_key': '022QF06E7MXBSAMPLE',
'cloudwatch_secret_key': ''
}
def __init__(self, prog, **kwargs):
config.register_opts(CloudWatchDaemon.cloudwatch_opts)
Daemon.__init__(self, prog, kwargs)
def run(self):
self.running = True
self.statsd = StatsD() # graphite metrics
# Connect to message queue
self.api = ApiClient()
self.dedup = DeDup(by_value=True)
LOG.info('Connecting to SQS queue %s', CONF.cloudwatch_sqs_queue)
try:
connection = boto.sqs.connect_to_region(
CONF.cloudwatch_sqs_region,
aws_access_key_id=CONF.cloudwatch_access_key,
aws_secret_access_key=CONF.cloudwatch_secret_key
)
except boto.exception.SQSError, e:
LOG.error('SQS API call failed: %s', e)
sys.exit(1)
try:
sqs = connection.create_queue(CONF.cloudwatch_sqs_queue)
sqs.set_message_class(RawMessage)
except boto.exception.SQSError, e:
LOG.error('SQS queue error: %s', e)
sys.exit(1)
while not self.shuttingdown:
try:
LOG.info('Waiting for CloudWatch alarms...')
try:
message = sqs.read(wait_time_seconds=20)
except boto.exception.SQSError, e:
LOG.warning('Could not read from queue: %s', e)
time.sleep(20)
continue
if message:
body = message.get_body()
cloudwatchAlert = self.parse_notification(body)
if self.dedup.is_send(cloudwatchAlert):
try:
self.api.send(cloudwatchAlert)
except Exception, e:
LOG.warning('Failed to send alert: %s', e)
sqs.delete_message(message)
LOG.debug('Send heartbeat...')
heartbeat = Heartbeat(tags=[__version__])
try:
self.api.send(heartbeat)
except Exception, e:
LOG.warning('Failed to send heartbeat: %s', e)
except (KeyboardInterrupt, SystemExit):
self.shuttingdown = True
LOG.info('Shutdown request received...')
self.running = False
def parse_notification(self, message):
LOG.debug('Parsing CloudWatch notification message...')
notification = json.loads(message)
if 'Message' in notification:
alarm = notification['Message']
else:
return
if 'Trigger' not in alarm:
return
# Defaults
resource = alarm['Trigger']['Dimensions'][0]['value']
event = alarm['AlarmName']
severity = self.cw_state_to_severity(alarm['NewStateValue'])
group = 'CloudWatch'
value = alarm['NewStateValue']
text = alarm['AlarmDescription']
environment = ['INFRA']
service = [alarm['AWSAccountId']]
tags = [notification['MessageId'], alarm['Region']]
correlate = list()
origin = [notification['TopicArn']]
timeout = None
threshold_info = alarm['NewStateReason']
more_info = notification['Subject']
create_time = datetime.datetime.strptime(notification['Timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ')
raw_data = notification['Message']
cloudwatchAlert = Alert(
resource=resource,
event=event,
correlate=correlate,
group=group,
value=value,
severity=severity,
environment=environment,
service=service,
text=text,
event_type='cloudwatchAlarm',
tags=tags,
attributes={
'thresholdInfo': threshold_info,
'moreInfo': more_info
},
origin=origin,
timeout=timeout,
create_time=create_time,
raw_data=raw_data,
)
suppress = Transformers.normalise_alert(cloudwatchAlert)
if suppress:
LOG.info('Suppressing %s alert', event)
LOG.debug('%s', cloudwatchAlert)
return
return cloudwatchAlert
@staticmethod
def cw_state_to_severity(state):
if state == 'ALARM':
return severity_code.MAJOR
elif state == 'INSUFFICIENT_DATA':
return severity_code.WARNING
elif state == 'OK':
return severity_code.NORMAL
else:
return severity_code.UNKNOWN