0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-25 01:39:40 +00:00
alerta_alerta/alerta/webhooks/cloudwatch.py

82 lines
3 KiB
Python

import json
from datetime import datetime
from typing import Any, Dict
from flask import current_app
from alerta.app import alarm_model
from alerta.models.alert import Alert
from . import WebhookBase
JSON = Dict[str, Any]
class CloudWatchWebhook(WebhookBase):
"""
Amazon CloudWatch notifications via SNS HTTPS endpoint subscription
See https://docs.aws.amazon.com/sns/latest/dg/sns-http-https-endpoint-as-subscriber.html
"""
@staticmethod
def cw_state_to_severity(state: str) -> str:
if state == 'ALARM':
return 'major'
elif state == 'INSUFFICIENT_DATA':
return 'warning'
elif state == 'OK':
return alarm_model.DEFAULT_NORMAL_SEVERITY
else:
return 'unknown'
def incoming(self, path, query_string, payload):
notification = json.loads(payload)
if notification['Type'] == 'SubscriptionConfirmation':
return Alert(
resource=notification['TopicArn'],
event=notification['Type'],
environment=current_app.config['DEFAULT_ENVIRONMENT'],
severity='informational',
service=['Unknown'],
group='AWS/CloudWatch',
text='{} <a href="{}" target="_blank">SubscribeURL</a>'.format(
notification['Message'], notification['SubscribeURL']),
origin=notification['TopicArn'],
event_type='cloudwatchAlarm',
create_time=datetime.strptime(notification['Timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ'),
raw_data=notification
)
elif notification['Type'] == 'Notification':
alarm = json.loads(notification['Message'])
if 'Trigger' not in alarm:
raise ValueError('SNS message is not a Cloudwatch notification')
resource = notification['TopicArn'].split(':')[-1]
if alarm['Trigger']['Dimensions']:
resource = '{}:{}'.format(alarm['Trigger']['Dimensions'][0]['name'],
alarm['Trigger']['Dimensions'][0]['value'])
return Alert(
resource=resource,
event=alarm['AlarmName'],
environment='Production',
severity=self.cw_state_to_severity(alarm['NewStateValue']),
service=[alarm['AWSAccountId']],
group=alarm['Trigger']['Namespace'],
value=alarm['NewStateValue'],
text=alarm['AlarmDescription'],
tags=[alarm['Region']],
attributes={
'incidentKey': alarm['AlarmName'],
'thresholdInfo': alarm['Trigger']
},
origin=notification['TopicArn'],
event_type='cloudwatchAlarm',
create_time=datetime.strptime(notification['Timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ'),
raw_data=alarm
)
else:
raise ValueError('No SNS notification in payload')