mirror of
https://github.com/alerta/alerta.git
synced 2025-01-24 09:19:40 +00:00
164 lines
5.5 KiB
Python
164 lines
5.5 KiB
Python
import datetime
|
|
import json
|
|
import os
|
|
import uuid
|
|
from http.client import HTTPConnection
|
|
from urllib.parse import urlencode
|
|
|
|
import requests
|
|
from requests.auth import AuthBase, HTTPBasicAuth
|
|
from requests_hawk import HawkAuth
|
|
|
|
from alerta.utils.collections import merge
|
|
|
|
|
|
class Client:
|
|
|
|
DEFAULT_ENDPOINT = 'http://localhost:8080'
|
|
|
|
def __init__(self, endpoint=None, key=None, secret=None, token=None, username=None, password=None, timeout=5.0, ssl_verify=True, headers=None, debug=False):
|
|
self.endpoint = endpoint or os.environ.get('ALERTA_ENDPOINT', self.DEFAULT_ENDPOINT)
|
|
|
|
if debug:
|
|
HTTPConnection.debuglevel = 1
|
|
|
|
key = key or os.environ.get('ALERTA_API_KEY', '')
|
|
self.http = HTTPClient(self.endpoint, key, secret, token, username, password, timeout, ssl_verify, headers, debug)
|
|
|
|
def send_alert(self, resource, event, **kwargs):
|
|
data = {
|
|
'id': kwargs.get('id'),
|
|
'resource': resource,
|
|
'event': event,
|
|
'environment': kwargs.get('environment'),
|
|
'severity': kwargs.get('severity'),
|
|
'correlate': kwargs.get('correlate', None) or list(),
|
|
'service': kwargs.get('service', None) or list(),
|
|
'group': kwargs.get('group'),
|
|
'value': kwargs.get('value'),
|
|
'text': kwargs.get('text'),
|
|
'tags': kwargs.get('tags', None) or list(),
|
|
'attributes': kwargs.get('attributes', None) or dict(),
|
|
'origin': kwargs.get('origin'),
|
|
'type': kwargs.get('type'),
|
|
'createTime': datetime.datetime.utcnow(),
|
|
'timeout': kwargs.get('timeout'),
|
|
'rawData': kwargs.get('raw_data'),
|
|
'customer': kwargs.get('customer')
|
|
}
|
|
return self.http.post('/alert', data)
|
|
|
|
def action(self, id, action, text='', timeout=None):
|
|
data = {
|
|
'action': action,
|
|
'text': text,
|
|
'timeout': timeout
|
|
}
|
|
return self.http.put(f'/alert/{id}/action', data)
|
|
|
|
def delete_alert(self, id):
|
|
return self.http.delete(f'/alert/{id}')
|
|
|
|
|
|
class ApiKeyAuth(AuthBase):
|
|
|
|
def __init__(self, api_key=None, auth_token=None):
|
|
self.api_key = api_key
|
|
self.auth_token = auth_token
|
|
|
|
def __call__(self, r):
|
|
r.headers['Authorization'] = f'Key {self.api_key}'
|
|
return r
|
|
|
|
|
|
class TokenAuth(AuthBase):
|
|
|
|
def __init__(self, auth_token=None):
|
|
self.auth_token = auth_token
|
|
|
|
def __call__(self, r):
|
|
r.headers['Authorization'] = f'Bearer {self.auth_token}'
|
|
return r
|
|
|
|
|
|
class HTTPClient:
|
|
|
|
def __init__(self, endpoint, key=None, secret=None, token=None, username=None, password=None, timeout=30.0,
|
|
ssl_verify=True, headers=None, debug=False):
|
|
self.endpoint = endpoint
|
|
self.auth = None
|
|
|
|
if username:
|
|
self.auth = HTTPBasicAuth(username, password)
|
|
elif secret:
|
|
self.auth = HawkAuth(id=key, key=secret) # HMAC
|
|
elif key:
|
|
self.auth = ApiKeyAuth(api_key=key)
|
|
elif token:
|
|
self.auth = TokenAuth(token)
|
|
|
|
self.timeout = timeout
|
|
self.session = requests.Session()
|
|
self.session.verify = ssl_verify # or use REQUESTS_CA_BUNDLE env var
|
|
|
|
self.headers = headers or dict()
|
|
merge(self.headers, self.default_headers())
|
|
|
|
self.debug = debug
|
|
|
|
@staticmethod
|
|
def default_headers():
|
|
return {
|
|
'X-Request-ID': str(uuid.uuid4()),
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
def get(self, path, query=None, **kwargs):
|
|
query = query or []
|
|
if 'page' in kwargs:
|
|
query.append(('page', kwargs['page']))
|
|
if 'page_size' in kwargs:
|
|
query.append(('page-size', kwargs['page_size']))
|
|
|
|
url = self.endpoint + path + '?' + urlencode(query, doseq=True)
|
|
try:
|
|
response = self.session.get(url, headers=self.headers, auth=self.auth, timeout=self.timeout)
|
|
except requests.exceptions.RequestException:
|
|
raise
|
|
return response
|
|
|
|
def post(self, path, data=None):
|
|
url = self.endpoint + path
|
|
try:
|
|
response = self.session.post(url, data=json.dumps(data, cls=CustomJsonEncoder),
|
|
headers=self.headers, auth=self.auth, timeout=self.timeout)
|
|
except requests.exceptions.RequestException:
|
|
raise
|
|
return response
|
|
|
|
def put(self, path, data=None):
|
|
url = self.endpoint + path
|
|
try:
|
|
response = self.session.put(url, data=json.dumps(data, cls=CustomJsonEncoder),
|
|
headers=self.headers, auth=self.auth, timeout=self.timeout)
|
|
except requests.exceptions.RequestException:
|
|
raise
|
|
return response
|
|
|
|
def delete(self, path):
|
|
url = self.endpoint + path
|
|
try:
|
|
response = self.session.delete(url, headers=self.headers, auth=self.auth, timeout=self.timeout)
|
|
except requests.exceptions.RequestException:
|
|
raise
|
|
return response
|
|
|
|
|
|
class CustomJsonEncoder(json.JSONEncoder):
|
|
def default(self, o): # pylint: disable=method-hidden
|
|
if isinstance(o, (datetime.date, datetime.datetime)):
|
|
return o.replace(microsecond=0).strftime('%Y-%m-%dT%H:%M:%S') + f'.{int(o.microsecond // 1000):03}Z'
|
|
elif isinstance(o, datetime.timedelta):
|
|
return int(o.total_seconds())
|
|
else:
|
|
return json.JSONEncoder.default(self, o)
|