0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-22 16:36:42 +00:00

Add forwarder plugin for federated Alerta ()

* Add alerts and action forwarder plugin for federated setups

* Add tests for forwarder plugin
This commit is contained in:
Nick Satterly 2020-04-30 21:26:49 +02:00 committed by GitHub
parent 33723c0d8c
commit 1538a97717
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 745 additions and 34 deletions

View file

@ -1,2 +1,2 @@
[settings]
known_third_party = blinker,bson,click,flask,flask_compress,flask_cors,itsdangerous,jwt,ldap,mohawk,pkg_resources,psycopg2,pymongo,pyparsing,pytz,requests,requests_mock,saml2,sentry_sdk,setuptools,werkzeug,yaml
known_third_party = blinker,bson,click,flask,flask_compress,flask_cors,itsdangerous,jwt,ldap,mohawk,pkg_resources,psycopg2,pymongo,pyparsing,pytz,requests,requests_hawk,requests_mock,saml2,sentry_sdk,setuptools,werkzeug,yaml

View file

@ -17,7 +17,6 @@ repos:
- id: debug-statements
- id: double-quote-string-fixer
- id: end-of-file-fixer
- id: flake8
- id: fix-encoding-pragma
args: ['--remove']
- id: pretty-format-json
@ -26,6 +25,10 @@ repos:
args: ['--django']
- id: requirements-txt-fixer
- id: trailing-whitespace
- repo: https://gitlab.com/pycqa/flake8
rev: 3.7.7
hooks:
- id: flake8
- repo: https://github.com/asottile/pyupgrade
rev: v1.27.0
hooks:

View file

@ -3,7 +3,13 @@ from mohawk import Receiver
def get_credentials(key_id: str):
credentials_map = {creds['id']: creds for creds in current_app.config['HMAC_AUTH_CREDENTIALS']}
credentials_map = {
creds['key']: dict(
id=creds['key'], # access_key
key=creds['secret'], # secret_key
algorithm=creds.get('algorithm', 'sha256')
) for creds in current_app.config['HMAC_AUTH_CREDENTIALS']}
if key_id in credentials_map:
return credentials_map[key_id]
else:

View file

@ -33,6 +33,11 @@ class BlackoutPeriod(AlertaException):
pass
class ForwardingLoop(AlertaException):
"""Forwarding loop detected."""
pass
class InvalidAction(AlertaException):
"""Invalid or redundant action for the current alert status."""
pass

124
alerta/plugins/forwarder.py Normal file
View file

@ -0,0 +1,124 @@
import logging
from typing import TYPE_CHECKING, Any, Optional
from flask import request
from alerta.exceptions import ForwardingLoop
from alerta.plugins import PluginBase
from alerta.utils.client import Client
from alerta.utils.response import base_url
if TYPE_CHECKING:
from alerta.models.alert import Alert # noqa
LOG = logging.getLogger('alerta.plugins.forwarder')
X_LOOP_HEADER = 'X-Alerta-Loop'
def append_to_header(origin):
x_loop = request.headers.get(X_LOOP_HEADER)
return origin if not x_loop else '{},{}'.format(x_loop, origin)
def is_in_xloop(server):
x_loop = request.headers.get(X_LOOP_HEADER)
return server in x_loop if server and x_loop else False
class Forwarder(PluginBase):
"""
Alert and action forwarder for federated Alerta deployments
See https://docs.alerta.io/en/latest/federated.html
"""
def pre_receive(self, alert: 'Alert', **kwargs) -> 'Alert':
if is_in_xloop(base_url()):
http_origin = request.origin or '(unknown)'
raise ForwardingLoop('Alert forwarded by {} already processed by {}'.format(http_origin, base_url()))
return alert
def post_receive(self, alert: 'Alert', **kwargs) -> Optional['Alert']:
for remote, auth, actions in self.get_config('FWD_DESTINATIONS', default=[], type=list, **kwargs):
if is_in_xloop(remote):
LOG.debug('Forward [action=alerts]: {} ; Remote {} already processed alert. Skip.'.format(alert.id, remote))
continue
if not ('*' in actions or 'alerts' in actions):
LOG.debug('Forward [action=alerts]: {} ; Remote {} not configured for alerts. Skip.'.format(alert.id, remote))
continue
headers = {X_LOOP_HEADER: append_to_header(base_url())}
client = Client(endpoint=remote, headers=headers, **auth)
LOG.info('Forward [action=alerts]: {} ; {} -> {}'.format(alert.id, base_url(), remote))
try:
r = client.send_alert(**alert.get_body())
except Exception as e:
LOG.warning('Forward [action=alerts]: {} ; Failed to forward alert to {} - {}'.format(alert.id, remote, str(e)))
continue
LOG.debug('Forward [action=alerts]: {} ; [{}] {}'.format(alert.id, r.status_code, r.text))
return alert
def status_change(self, alert: 'Alert', status: str, text: str, **kwargs) -> Any:
return
def take_action(self, alert: 'Alert', action: str, text: str, **kwargs) -> Any:
if is_in_xloop(base_url()):
http_origin = request.origin or '(unknown)'
raise ForwardingLoop('Action {} forwarded by {} already processed by {}'.format(
action, http_origin, base_url())
)
for remote, auth, actions in self.get_config('FWD_DESTINATIONS', default=[], type=list, **kwargs):
if is_in_xloop(remote):
LOG.debug('Forward [action={}]: {} ; Remote {} already processed action. Skip.'.format(action, alert.id, remote))
continue
if not ('*' in actions or 'actions' in actions or action in actions):
LOG.debug('Forward [action={}]: {} ; Remote {} not configured for action. Skip.'.format(action, alert.id, remote))
continue
headers = {X_LOOP_HEADER: append_to_header(base_url())}
client = Client(endpoint=remote, headers=headers, **auth)
LOG.info('Forward [action={}]: {} ; {} -> {}'.format(action, alert.id, base_url(), remote))
try:
r = client.action(alert.id, action, text)
except Exception as e:
LOG.warning('Forward [action={}]: {} ; Failed to action alert on {} - {}'.format(action, alert.id, remote, str(e)))
continue
LOG.debug('Forward [action={}]: {} ; [{}] {}'.format(action, alert.id, r.status_code, r.text))
return alert
def delete(self, alert: 'Alert', **kwargs) -> bool:
if is_in_xloop(base_url()):
http_origin = request.origin or '(unknown)'
raise ForwardingLoop('Delete forwarded by {} already processed by {}'.format(http_origin, base_url()))
for remote, auth, actions in self.get_config('FWD_DESTINATIONS', default=[], type=list, **kwargs):
print('{} actions={}'.format(remote, actions))
if is_in_xloop(remote):
LOG.debug('Forward [action=delete]: {} ; Remote {} already processed delete. Skip.'.format(alert.id, remote))
continue
if not ('*' in actions or 'delete' in actions):
LOG.debug('Forward [action=delete]: {} ; Remote {} not configured for deletes. Skip.'.format(alert.id, remote))
continue
headers = {X_LOOP_HEADER: append_to_header(base_url())}
client = Client(endpoint=remote, headers=headers, **auth)
LOG.info('Forward [action=delete]: {} ; {} -> {}'.format(alert.id, base_url(), remote))
try:
r = client.delete_alert(alert.id)
except Exception as e:
LOG.warning('Forward [action=delete]: {} ; Failed to delete alert on {} - {}'.format(alert.id, remote, str(e)))
continue
LOG.debug('Forward [action=delete]: {} ; [{}] {}'.format(alert.id, r.status_code, r.text))
return True # always continue with local delete even if remote delete(s) fail

View file

@ -6,7 +6,7 @@
#
# Further information on settings can be found at https://docs.alerta.io
from typing import Any, Dict, List # noqa
from typing import Any, Dict, List, Tuple # noqa
DEBUG = False
@ -71,8 +71,8 @@ SIGNUP_ENABLED = True
HMAC_AUTH_CREDENTIALS = [
# {
# 'id': 'AKID001', # access key id
# 'key': 'supersecret', # secret key
# 'id': '', # access key id => $ uuidgen | tr '[:upper:]' '[:lower:]'
# 'key': '', # secret key => $ date | md5 | base64
# 'algorithm': 'sha256' # valid hmac algorithm eg. sha256, sha384, sha512
# }
] # type: List[Dict[str, Any]]
@ -206,7 +206,7 @@ GOOGLE_TRACKING_ID = None
AUTO_REFRESH_INTERVAL = 5000 # ms
# Plugins
PLUGINS = ['remote_ip', 'reject', 'heartbeat', 'blackout']
PLUGINS = ['remote_ip', 'reject', 'heartbeat', 'blackout', 'forwarder']
PLUGINS_RAISE_ON_ERROR = True # raise RuntimeError exception on first failure
# reject plugin settings
@ -219,3 +219,14 @@ BLACKOUT_DURATION = 3600 # default period = 1 hour
NOTIFICATION_BLACKOUT = False # True - set alert status=blackout, False - do not process alert (default)
BLACKOUT_ACCEPT = [] # type: List[str]
# BLACKOUT_ACCEPT = ['normal', 'ok', 'cleared'] # list of severities accepted during blackout period
# northbound interface
FWD_DESTINATIONS = [
# ('http://localhost:9000', {'username': 'user', 'password': 'pa55w0rd', 'timeout': 10}, ['alerts', 'actions']), # BasicAuth
# ('https://httpbin.org/anything', dict(username='foo', password='bar', ssl_verify=False), ['alerts', 'actions']),
# ('http://localhost:9000', {'key': 'access-key', 'secret': 'secret-key'}, ['alerts', 'actions']), # Hawk HMAC
# ('http://localhost:9000', {'key': 'my-api-key'}, ['alerts', 'actions']), # API key
# ('http://localhost:9000', {'token': 'bearer-token'}, ['alerts', 'actions']), # Bearer token
] # type: List[Tuple]
# valid actions=['*', 'alerts', 'actions', 'open', 'assign', 'ack', 'unack', 'shelve', 'unshelve', 'close', 'delete']

View file

@ -4,8 +4,8 @@ from typing import Optional, Tuple
from flask import current_app, g
from alerta.app import plugins
from alerta.exceptions import (ApiError, BlackoutPeriod, HeartbeatReceived,
RateLimit, RejectException)
from alerta.exceptions import (ApiError, BlackoutPeriod, ForwardingLoop,
HeartbeatReceived, RateLimit, RejectException)
from alerta.models.alert import Alert
from alerta.models.enums import Scope
@ -40,7 +40,7 @@ def process_alert(alert: Alert) -> Alert:
alert = plugin.pre_receive(alert, config=wanted_config)
except TypeError:
alert = plugin.pre_receive(alert) # for backward compatibility
except (RejectException, HeartbeatReceived, BlackoutPeriod, RateLimit):
except (RejectException, HeartbeatReceived, BlackoutPeriod, RateLimit, ForwardingLoop):
raise
except Exception as e:
if current_app.config['PLUGINS_RAISE_ON_ERROR']:
@ -97,23 +97,21 @@ def process_action(alert: Alert, action: str, text: str, timeout: int) -> Tuple[
updated = plugin.take_action(alert, action, text, timeout=timeout, config=wanted_config)
except NotImplementedError:
pass # plugin does not support action() method
except RejectException:
except (RejectException, ForwardingLoop):
raise
except Exception as e:
if current_app.config['PLUGINS_RAISE_ON_ERROR']:
raise ApiError("Error while running action plugin '{}': {}".format(plugin.name, str(e)))
else:
logging.error("Error while running action plugin '{}': {}".format(plugin.name, str(e)))
if updated:
try:
if len(updated) == 3:
alert, action, text = updated
elif len(updated) == 4:
alert, action, text, timeout = updated
else:
alert = updated
except Exception as e:
logging.error("Error while running action plugin '{}': {}".format(plugin.name, str(e)))
if isinstance(updated, Alert):
updated = updated, action, text, timeout
if isinstance(updated, tuple):
if len(updated) == 4:
alert, action, text, timeout = updated
elif len(updated) == 3:
alert, action, text = updated
# remove keys from attributes with None values
new_attrs = {k: v for k, v in alert.attributes.items() if v is not None}

164
alerta/utils/client.py Normal file
View file

@ -0,0 +1,164 @@
import datetime
import json
import os
import uuid
from http.client import HTTPConnection
from urllib.parse import urlencode
import requests
from requests.auth import AuthBase, HTTPBasicAuth
from requests_hawk import HawkAuth
from alerta.utils.collections import merge
class Client:
DEFAULT_ENDPOINT = 'http://localhost:8080'
def __init__(self, endpoint=None, key=None, secret=None, token=None, username=None, password=None, timeout=5.0, ssl_verify=True, headers=None, debug=False):
self.endpoint = endpoint or os.environ.get('ALERTA_ENDPOINT', self.DEFAULT_ENDPOINT)
if debug:
HTTPConnection.debuglevel = 1
key = key or os.environ.get('ALERTA_API_KEY', '')
self.http = HTTPClient(self.endpoint, key, secret, token, username, password, timeout, ssl_verify, headers, debug)
def send_alert(self, resource, event, **kwargs):
data = {
'id': kwargs.get('id'),
'resource': resource,
'event': event,
'environment': kwargs.get('environment'),
'severity': kwargs.get('severity'),
'correlate': kwargs.get('correlate', None) or list(),
'service': kwargs.get('service', None) or list(),
'group': kwargs.get('group'),
'value': kwargs.get('value'),
'text': kwargs.get('text'),
'tags': kwargs.get('tags', None) or list(),
'attributes': kwargs.get('attributes', None) or dict(),
'origin': kwargs.get('origin'),
'type': kwargs.get('type'),
'createTime': datetime.datetime.utcnow(),
'timeout': kwargs.get('timeout'),
'rawData': kwargs.get('raw_data'),
'customer': kwargs.get('customer')
}
return self.http.post('/alert', data)
def action(self, id, action, text='', timeout=None):
data = {
'action': action,
'text': text,
'timeout': timeout
}
return self.http.put('/alert/%s/action' % id, data)
def delete_alert(self, id):
return self.http.delete('/alert/%s' % id)
class ApiKeyAuth(AuthBase):
def __init__(self, api_key=None, auth_token=None):
self.api_key = api_key
self.auth_token = auth_token
def __call__(self, r):
r.headers['Authorization'] = 'Key {}'.format(self.api_key)
return r
class TokenAuth(AuthBase):
def __init__(self, auth_token=None):
self.auth_token = auth_token
def __call__(self, r):
r.headers['Authorization'] = 'Bearer {}'.format(self.auth_token)
return r
class HTTPClient:
def __init__(self, endpoint, key=None, secret=None, token=None, username=None, password=None, timeout=30.0,
ssl_verify=True, headers=None, debug=False):
self.endpoint = endpoint
self.auth = None
if username:
self.auth = HTTPBasicAuth(username, password)
elif secret:
self.auth = HawkAuth(id=key, key=secret) # HMAC
elif key:
self.auth = ApiKeyAuth(api_key=key)
elif token:
self.auth = TokenAuth(token)
self.timeout = timeout
self.session = requests.Session()
self.session.verify = ssl_verify # or use REQUESTS_CA_BUNDLE env var
self.headers = headers or dict()
merge(self.headers, self.default_headers())
self.debug = debug
@staticmethod
def default_headers():
return {
'X-Request-ID': str(uuid.uuid4()),
'Content-Type': 'application/json'
}
def get(self, path, query=None, **kwargs):
query = query or []
if 'page' in kwargs:
query.append(('page', kwargs['page']))
if 'page_size' in kwargs:
query.append(('page-size', kwargs['page_size']))
url = self.endpoint + path + '?' + urlencode(query, doseq=True)
try:
response = self.session.get(url, headers=self.headers, auth=self.auth, timeout=self.timeout)
except requests.exceptions.RequestException:
raise
return response
def post(self, path, data=None):
url = self.endpoint + path
try:
response = self.session.post(url, data=json.dumps(data, cls=CustomJsonEncoder),
headers=self.headers, auth=self.auth, timeout=self.timeout)
except requests.exceptions.RequestException:
raise
return response
def put(self, path, data=None):
url = self.endpoint + path
try:
response = self.session.put(url, data=json.dumps(data, cls=CustomJsonEncoder),
headers=self.headers, auth=self.auth, timeout=self.timeout)
except requests.exceptions.RequestException:
raise
return response
def delete(self, path):
url = self.endpoint + path
try:
response = self.session.delete(url, headers=self.headers, auth=self.auth, timeout=self.timeout)
except requests.exceptions.RequestException:
raise
return response
class CustomJsonEncoder(json.JSONEncoder):
def default(self, o): # pylint: disable=method-hidden
if isinstance(o, (datetime.date, datetime.datetime)):
return o.replace(microsecond=0).strftime('%Y-%m-%dT%H:%M:%S') + '.%03dZ' % (o.microsecond // 1000)
elif isinstance(o, datetime.timedelta):
return int(o.total_seconds())
else:
return json.JSONEncoder.default(self, o)

View file

@ -1,6 +1,5 @@
from functools import wraps
from os.path import join as path_join
from urllib.parse import urljoin, urlparse, urlunparse
from urllib.parse import urljoin
from flask import current_app, request
@ -21,11 +20,12 @@ def jsonp(func):
def absolute_url(path: str = '') -> str:
# ensure that "path" (see urlparse result) part of url has both leading and trailing slashes
conf_base_url = urlunparse([(x if i != 2 else path_join('/', x, ''))
for i, x in enumerate(urlparse(current_app.config.get('BASE_URL', '/')))])
try:
base_url = urljoin(request.base_url, conf_base_url)
except RuntimeError: # Working outside of request context
base_url = conf_base_url
return urljoin(base_url, path.lstrip('/'))
base_url = current_app.config.get('BASE_URL', request.url_root)
except Exception:
base_url = '/'
return urljoin(base_url, path) if path else base_url
def base_url():
return absolute_url(path='')

View file

@ -5,8 +5,9 @@ from flask_cors import cross_origin
from alerta.app import qb
from alerta.auth.decorators import permission
from alerta.exceptions import (ApiError, BlackoutPeriod, HeartbeatReceived,
InvalidAction, RateLimit, RejectException)
from alerta.exceptions import (ApiError, BlackoutPeriod, ForwardingLoop,
HeartbeatReceived, InvalidAction, RateLimit,
RejectException)
from alerta.models.alert import Alert
from alerta.models.enums import Scope
from alerta.models.metrics import Timer, timer
@ -62,6 +63,8 @@ def receive():
except BlackoutPeriod as e:
audit_trail_alert(event='alert-blackout')
return jsonify(status='ok', message=str(e), id=alert.id), 202
except ForwardingLoop as e:
return jsonify(status='ok', message=str(e)), 202
except Exception as e:
raise ApiError(str(e), 500)
@ -158,6 +161,8 @@ def action_alert(alert_id):
raise ApiError(str(e), 400)
except InvalidAction as e:
raise ApiError(str(e), 409)
except ForwardingLoop as e:
return jsonify(status='ok', message=str(e)), 202
except Exception as e:
raise ApiError(str(e), 500)

52
docker-compose.yml Normal file
View file

@ -0,0 +1,52 @@
version: '3'
services:
alerta-web-mom:
image: alerta/alerta-beta
ports:
- 8081:80
alerta-api-mom:
build: .
image: alerta-api
environment:
DATABASE_URL: postgres://postgres:postgres@mom-db/mom
depends_on:
- mom-db
ports:
- 8080:8080
alerta-web-mlm:
image: alerta/alerta-beta
ports:
- 9081:80
alerta-api-mlm:
build: .
image: alerta-api
environment:
DATABASE_URL: postgres://postgres:postgres@mlm-db/mlm
depends_on:
- mlm-db
ports:
- 9080:8080
mom-db:
image: postgres
volumes:
- ./pg-data-mom:/var/lib/postgresql/data
environment:
POSTGRES_DB: mom
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
restart: always
mlm-db:
image: postgres
volumes:
- ./pg-data-mlm:/var/lib/postgresql/data
environment:
POSTGRES_DB: mlm
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
restart: always

View file

@ -0,0 +1,18 @@
# DEBUG=True
LOG_LEVEL='INFO'
BASE_URL='http://mlm1.local.alerta.io:10001'
DATABASE_URL='postgres:///mlm1'
PLUGINS=['forwarder']
FWD_DESTINATIONS = [
(
'http://mom1.local.alerta.io:9001',
{'key': '5f2aa07e-59bb-477e-9cb7-55cfef2feb00', 'secret': 'MDczODYxM2YzZWI4NDMyNDg3MjU3ZDYyMjBkOTRkMTIK'},
['alerts', 'actions']
),
(
'http://mom2.local.alerta.io:9002',
{'key': '95ce8df0-9380-4e12-afc0-c21cb21ee4af', 'secret': 'NmI2OWU0ZmZiMzk5ODVlZTY2OGU4OWI4NThhYWY5Y2EK'},
['alerts', 'actions']
),
]

View file

@ -0,0 +1,18 @@
# DEBUG=True
LOG_LEVEL='INFO'
BASE_URL='http://mlm2.local.alerta.io:10002'
DATABASE_URL='postgres:///mlm2'
PLUGINS=['forwarder']
FWD_DESTINATIONS = [
(
'http://mom1.local.alerta.io:9001',
{'key': '07648181-3a9d-40fd-9d22-5f7457402adf', 'secret': 'YzdmZGUyYmVhOTAxY2U2MmVlM2Y1YWQwNmEyMjNiOTYK'},
['alerts', 'actions']
),
(
'http://mom2.local.alerta.io:9002',
{'key': '57814c53-d92d-4706-b1e3-3c8377f4cf03', 'secret': 'ZDg3Yjg0MTM2ODJkZDgyNzE3MjE5MzFlYTdiYzI0YzQK'},
['alerts', 'actions']
),
]

View file

@ -0,0 +1,32 @@
#DEBUG=True
LOG_LEVEL='INFO'
BASE_URL='http://mom1.local.alerta.io:9001'
DATABASE_URL='postgres:///mom1'
PLUGINS=['forwarder']
PLUGINS_RAISE_ON_ERROR=True
HMAC_AUTH_CREDENTIALS = [
{ # mom2
'key': '27c1fe05-d7aa-4982-9e16-3936bc7ea504',
'secret': 'ZGE1YWUwZWFiMmYxNDVjZTkwNWE4ZWY2ZWE4YmNkZTEK',
'algorithm': 'sha256'
},
{ # mlm1
'key': '5f2aa07e-59bb-477e-9cb7-55cfef2feb00',
'secret': 'MDczODYxM2YzZWI4NDMyNDg3MjU3ZDYyMjBkOTRkMTIK',
'algorithm': 'sha256'
},
{ # mlm2
'key': '07648181-3a9d-40fd-9d22-5f7457402adf',
'secret': 'YzdmZGUyYmVhOTAxY2U2MmVlM2Y1YWQwNmEyMjNiOTYK',
'algorithm': 'sha256'
},
]
FWD_DESTINATIONS = [
(
'http://mom2.local.alerta.io:9002',
{'key': 'c075b86b-293c-4539-b825-14ec0130f06d', 'secret': 'YzQ4OTVhNWE2ZTFmMGUzZmU1ODE2ZTE0N2M0OWZmZTAK'},
['alerts', 'actions']
),
]

View file

@ -0,0 +1,32 @@
# DEBUG=True
LOG_LEVEL='INFO'
BASE_URL='http://mom2.local.alerta.io:9002'
DATABASE_URL='postgres:///mom2'
PLUGINS=['forwarder']
PLUGINS_RAISE_ON_ERROR=True
HMAC_AUTH_CREDENTIALS = [
{ # mom1
'key': 'c075b86b-293c-4539-b825-14ec0130f06d',
'secret': 'YzQ4OTVhNWE2ZTFmMGUzZmU1ODE2ZTE0N2M0OWZmZTAK',
'algorithm': 'sha256'
},
{ # mlm1
'key': '95ce8df0-9380-4e12-afc0-c21cb21ee4af',
'secret': 'NmI2OWU0ZmZiMzk5ODVlZTY2OGU4OWI4NThhYWY5Y2EK',
'algorithm': 'sha256'
},
{ # mlm2
'key': '57814c53-d92d-4706-b1e3-3c8377f4cf03',
'secret': 'ZDg3Yjg0MTM2ODJkZDgyNzE3MjE5MzFlYTdiYzI0YzQK',
'algorithm': 'sha256'
},
]
FWD_DESTINATIONS = [
(
'http://mom1.local.alerta.io:9001',
{'key': '27c1fe05-d7aa-4982-9e16-3936bc7ea504', 'secret': 'ZGE1YWUwZWFiMmYxNDVjZTkwNWE4ZWY2ZWE4YmNkZTEK'},
['alerts', 'actions']
),
]

View file

@ -1,6 +1,6 @@
mypy
pre-commit
pylint
pylint==2.4.4
pytest
pytest-cov
python-dotenv

View file

@ -69,7 +69,8 @@ setuptools.setup(
'reject = alerta.plugins.reject:RejectPolicy',
'heartbeat = alerta.plugins.heartbeat:HeartbeatReceiver',
'blackout = alerta.plugins.blackout:BlackoutHandler',
'acked_by = alerta.plugins.acked_by:AckedBy'
'acked_by = alerta.plugins.acked_by:AckedBy',
'forwarder = alerta.plugins.forwarder:Forwarder'
],
'alerta.webhooks': [
'cloudwatch = alerta.webhooks.cloudwatch:CloudWatchWebhook',

242
tests/test_forwarder.py Normal file
View file

@ -0,0 +1,242 @@
import json
import unittest
from uuid import uuid4
import requests_mock
from alerta.app import create_app, db
from alerta.utils.response import base_url
class ForwarderTestCase(unittest.TestCase):
def setUp(self):
test_config = {
'DEBUG': True,
'TESTING': True,
'AUTH_REQUIRED': False,
'BASE_URL': 'http://localhost:8080',
'PLUGINS': ['forwarder']
}
FWD_DESTINATIONS = [
('http://localhost:9000', {'username': 'user', 'password': 'pa55w0rd', 'timeout': 10}, ['alerts', 'actions']), # BasicAuth
# ('https://httpbin.org/anything', dict(username='foo', password='bar', ssl_verify=False), ['*']),
('http://localhost:9001', {
'key': 'e3b8afc0-db18-4c51-865d-b95322742c5e',
'secret': 'MDhjZGMyYTRkY2YyNjk1MTEyMWFlNmM3Y2UxZDU1ZjIK'
}, ['actions']), # Hawk HMAC
('http://localhost:9002', {'key': 'demo-key'}, ['delete']), # API key
('http://localhost:9003', {'token': 'bearer-token'}, ['*']), # Bearer token
]
test_config['FWD_DESTINATIONS'] = FWD_DESTINATIONS
self.app = create_app(test_config)
self.client = self.app.test_client()
self.resource = str(uuid4()).upper()[:8]
self.major_alert = {
'event': 'node_marginal',
'resource': self.resource,
'environment': 'Production',
'service': ['Network'],
'severity': 'major',
'correlate': ['node_down', 'node_marginal', 'node_up'],
'timeout': 40
}
self.warn_alert = {
'event': 'node_marginal',
'resource': self.resource,
'environment': 'Production',
'service': ['Network'],
'severity': 'warning',
'correlate': ['node_down', 'node_marginal', 'node_up'],
'timeout': 50
}
self.normal_alert = {
'event': 'node_up',
'resource': self.resource,
'environment': 'Production',
'service': ['Network'],
'severity': 'normal',
'correlate': ['node_down', 'node_marginal', 'node_up'],
'timeout': 100
}
def tearDown(self):
db.destroy()
@requests_mock.mock()
def test_forward_alert(self, m):
ok_response = """
{"status": "ok"}
"""
m.post('http://localhost:9000/alert', text=ok_response)
m.post('http://localhost:9001/alert', text=ok_response)
m.post('http://localhost:9002/alert', text=ok_response)
m.post('http://localhost:9003/alert', text=ok_response)
headers = {
'Content-type': 'application/json',
'Origin': 'http://localhost:5000',
'X-Alerta-Loop': 'http://localhost:5000',
}
response = self.client.post('/alert', data=json.dumps(self.major_alert), headers=headers)
self.assertEqual(response.status_code, 201)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['status'], 'ok')
history = m.request_history
self.assertEqual(history[0].port, 9000)
self.assertEqual(history[1].port, 9003)
@requests_mock.mock()
def test_forward_action(self, m):
ok_response = """
{"status": "ok"}
"""
m.post('http://localhost:9000/alert', text=ok_response)
m.post('http://localhost:9003/alert', text=ok_response)
# create alert
headers = {
'Content-type': 'application/json'
}
response = self.client.post('/alert', data=json.dumps(self.warn_alert), headers=headers)
self.assertEqual(response.status_code, 201)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['alert']['status'], 'open')
alert_id = data['id']
m.put('http://localhost:9000/alert/{}/action'.format(alert_id), text=ok_response)
m.put('http://localhost:9001/alert/{}/action'.format(alert_id), text=ok_response)
m.put('http://localhost:9002/alert/{}/action'.format(alert_id), text=ok_response)
m.put('http://localhost:9003/alert/{}/action'.format(alert_id), text=ok_response)
headers = {
'Content-type': 'application/json',
'Origin': 'http://localhost:8000'
}
response = self.client.put('/alert/{}/action'.format(alert_id), data=json.dumps({'action': 'ack'}), headers=headers)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['status'], 'ok')
history = m.request_history
self.assertEqual(history[0].port, 9000)
self.assertEqual(history[1].port, 9003)
self.assertEqual(history[2].port, 9000)
self.assertEqual(history[3].port, 9001)
self.assertEqual(history[4].port, 9003)
@requests_mock.mock()
def test_forward_delete(self, m):
ok_response = """
{"status": "ok"}
"""
m.post('http://localhost:9000/alert', text=ok_response)
m.post('http://localhost:9003/alert', text=ok_response)
# create alert
headers = {
'Content-type': 'application/json'
}
response = self.client.post('/alert', data=json.dumps(self.warn_alert), headers=headers)
self.assertEqual(response.status_code, 201)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['alert']['status'], 'open')
alert_id = data['id']
m.delete('http://localhost:9002/alert/{}'.format(alert_id), text=ok_response)
m.delete('http://localhost:9003/alert/{}'.format(alert_id), text=ok_response)
headers = {
'Content-type': 'application/json',
'Origin': 'http://localhost:8000'
}
response = self.client.delete('/alert/{}'.format(alert_id), headers=headers)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['status'], 'ok')
history = m.request_history
self.assertEqual(history[0].port, 9000)
self.assertEqual(history[1].port, 9003)
self.assertEqual(history[2].port, 9002)
self.assertEqual(history[3].port, 9003)
@requests_mock.mock()
def test_forward_heartbeat(self, m):
# FIXME: currently not possible
pass
@requests_mock.mock()
def test_already_processed(self, m):
# Alert is not processed locally or forwarded when an Alerta server
# receives an alert which it has already processed. This is
# determined by checking to see if the BASE_URL of the server
# is already in the X-Alerta-Loop header. A 202 is returned because
# the alert was accepted, even though it wasn't processed.
ok_response = """
{"status": "ok"}
"""
m.post('http://localhost:9000/alert', text=ok_response)
m.post('http://localhost:9001/alert', text=ok_response)
m.post('http://localhost:9002/alert', text=ok_response)
m.post('http://localhost:9003/alert', text=ok_response)
headers = {
'Content-type': 'application/json',
'Origin': 'http://localhost:5000',
'X-Alerta-Loop': 'http://localhost:8080,http://localhost:5000',
}
response = self.client.post('/alert', data=json.dumps(self.major_alert), headers=headers)
self.assertEqual(response.status_code, 202)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['status'], 'ok')
self.assertEqual(data['message'], 'Alert forwarded by http://localhost:5000 already processed by http://localhost:8080')
self.assertEqual(m.called, False)
@requests_mock.mock()
def test_forward_loop(self, m):
# Alert is processed locally but not forwarded on to the remote
# because it is already in the X-Alerta-Loop header. A 201 is
# returned because the alert has been received and processed.
ok_response = """
{"status": "ok"}
"""
m.post('http://localhost:9000/alert', text=ok_response)
m.post('http://localhost:9001/alert', text=ok_response)
m.post('http://localhost:9002/alert', text=ok_response)
m.post('http://localhost:9003/alert', text=ok_response)
headers = {
'Content-type': 'application/json',
'X-Alerta-Loop': 'http://localhost:9000,http://localhost:9001,http://localhost:9002,http://localhost:9003',
}
response = self.client.post('/alert', data=json.dumps(self.warn_alert), headers=headers)
self.assertEqual(response.status_code, 201)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['status'], 'ok')
self.assertEqual(m.called, False)
def test_do_not_forward(self):
# check forwarding rule for remote
pass
def test_base_url(self):
with self.app.test_request_context('/'):
self.assertEqual(base_url(), 'http://localhost:8080')