0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-24 17:29:39 +00:00
alerta_alerta/alerta/database/backends/mongodb/base.py
Pierre-Louis BOUTRUCHE 2fdea8fecf
feat: add db schema (#1839)
Added the query to look in the schemas and not in the database.

* Replace the default value of schema by public
2023-06-22 20:56:09 +02:00

1722 lines
70 KiB
Python

from collections import defaultdict
from datetime import datetime, timedelta
from flask import current_app
from pymongo import ASCENDING, TEXT, MongoClient, ReturnDocument
from pymongo.errors import ConnectionFailure
from alerta.app import alarm_model
from alerta.database.base import Database
from alerta.exceptions import NoCustomerMatch
from alerta.models.enums import ADMIN_SCOPES
from alerta.models.heartbeat import HeartbeatStatus
from .utils import Query
# See https://github.com/MongoEngine/flask-mongoengine/blob/master/flask_mongoengine/__init__.py
# See https://github.com/dcrosta/flask-pymongo/blob/master/flask_pymongo/__init__.py
class Backend(Database):
def create_engine(self, app, uri, dbname=None, schema=None, raise_on_error=True):
self.uri = uri
self.dbname = dbname
db = self.connect()
try:
self._create_indexes(db)
except Exception as e:
if raise_on_error:
raise
app.logger.warning(e)
try:
self._update_lookups(db)
except Exception as e:
if raise_on_error:
raise
app.logger.warning(e)
def connect(self):
self.client = MongoClient(self.uri)
if self.dbname:
return self.client[self.dbname]
else:
return self.client.get_database()
@staticmethod
def _create_indexes(db):
db.alerts.create_index(
[('environment', ASCENDING), ('customer', ASCENDING), ('resource', ASCENDING), ('event', ASCENDING)],
unique=True
)
db.alerts.create_index([('$**', TEXT)])
db.customers.drop_indexes() # FIXME: should only drop customers index if it's unique (ie. the old one)
db.customers.create_index([('match', ASCENDING)])
db.heartbeats.create_index([('origin', ASCENDING), ('customer', ASCENDING)], unique=True)
db.keys.create_index([('key', ASCENDING)], unique=True)
db.perms.create_index([('match', ASCENDING)], unique=True)
db.users.drop_indexes()
db.users.create_index([('login', ASCENDING)], unique=True,
partialFilterExpression={'login': {'$type': 'string'}})
db.users.create_index([('email', ASCENDING)], unique=True,
partialFilterExpression={'email': {'$type': 'string'}})
db.groups.create_index([('name', ASCENDING)], unique=True)
db.metrics.create_index([('group', ASCENDING), ('name', ASCENDING)], unique=True)
@staticmethod
def _update_lookups(db):
for severity, code in alarm_model.Severity.items():
db.codes.update_one(
{'severity': severity},
{'$set': {'severity': severity, 'code': code}},
upsert=True
)
for status, state in alarm_model.Status.items():
db.states.update_one(
{'status': status},
{'$set': {'status': status, 'state': state}},
upsert=True
)
@property
def name(self):
return self.get_db().name
@property
def version(self):
return self.get_db().client.server_info()['version']
@property
def is_alive(self):
try:
self.get_db().client.admin.command('ismaster')
except ConnectionFailure:
return False
return True
def close(self, db):
self.client.close()
def destroy(self):
db = self.connect()
self.client.drop_database(db.name)
# ALERTS
def get_severity(self, alert):
"""
Get severity of correlated alert. Used to determine previous severity.
"""
query = {
'environment': alert.environment,
'resource': alert.resource,
'$or': [
{
'event': alert.event,
'severity': {'$ne': alert.severity}
},
{
'event': {'$ne': alert.event},
'correlate': alert.event
}],
'customer': alert.customer
}
r = self.get_db().alerts.find_one(query, projection={'severity': 1, '_id': 0})
return r['severity'] if r else None
def get_status(self, alert):
"""
Get status of correlated or duplicate alert. Used to determine previous status.
"""
query = {
'environment': alert.environment,
'resource': alert.resource,
'$or': [
{
'event': alert.event
},
{
'correlate': alert.event,
}
],
'customer': alert.customer
}
r = self.get_db().alerts.find_one(query, projection={'status': 1, '_id': 0})
return r['status'] if r else None
def is_duplicate(self, alert):
query = {
'environment': alert.environment,
'resource': alert.resource,
'event': alert.event,
'severity': alert.severity,
'customer': alert.customer
}
return self.get_db().alerts.find_one(query)
def is_correlated(self, alert):
query = {
'environment': alert.environment,
'resource': alert.resource,
'$or': [
{
'event': alert.event,
'severity': {'$ne': alert.severity}
},
{
'event': {'$ne': alert.event},
'correlate': alert.event
}],
'customer': alert.customer
}
return self.get_db().alerts.find_one(query)
def is_flapping(self, alert, window=1800, count=2):
"""
Return true if alert severity has changed more than X times in Y seconds
"""
pipeline = [
{'$match': {
'environment': alert.environment,
'resource': alert.resource,
'event': alert.event,
'customer': alert.customer
}},
{'$unwind': '$history'},
{'$match': {
'history.updateTime': {'$gt': datetime.utcnow() - timedelta(seconds=window)},
'history.type': 'severity'
}},
{'$group': {'_id': '$history.type', 'count': {'$sum': 1}}}
]
responses = self.get_db().alerts.aggregate(pipeline)
for r in responses:
if r['count'] > count:
return True
return False
def dedup_alert(self, alert, history):
"""
Update alert status, service, value, text, timeout and rawData, increment duplicate count and set
repeat=True, and keep track of last receive id and time but don't append to history unless status changes.
"""
query = {
'environment': alert.environment,
'resource': alert.resource,
'event': alert.event,
'severity': alert.severity,
'customer': alert.customer
}
now = datetime.utcnow()
update = {
'$set': {
'status': alert.status,
'service': alert.service,
'value': alert.value,
'text': alert.text,
'timeout': alert.timeout,
'rawData': alert.raw_data,
'repeat': True,
'lastReceiveId': alert.id,
'lastReceiveTime': now
},
'$addToSet': {'tags': {'$each': alert.tags}},
'$inc': {'duplicateCount': 1}
}
# only update those attributes that are specifically defined
attributes = {'attributes.' + k: v for k, v in alert.attributes.items()}
update['$set'].update(attributes)
if alert.update_time:
update['$set']['updateTime'] = alert.update_time
if history:
update['$push'] = {
'history': {
'$each': [history.serialize],
'$slice': current_app.config['HISTORY_LIMIT'],
'$position': 0
}
}
return self.get_db().alerts.find_one_and_update(
query,
update=update,
return_document=ReturnDocument.AFTER
)
def correlate_alert(self, alert, history):
"""
Update alert key attributes, reset duplicate count and set repeat=False, keep track of last
receive id and time, appending all to history. Append to history again if status changes.
"""
query = {
'environment': alert.environment,
'resource': alert.resource,
'$or': [
{
'event': alert.event,
'severity': {'$ne': alert.severity}
},
{
'event': {'$ne': alert.event},
'correlate': alert.event
}],
'customer': alert.customer
}
update = {
'$set': {
'event': alert.event,
'severity': alert.severity,
'status': alert.status,
'service': alert.service,
'value': alert.value,
'text': alert.text,
'createTime': alert.create_time,
'timeout': alert.timeout,
'rawData': alert.raw_data,
'duplicateCount': alert.duplicate_count,
'repeat': alert.repeat,
'previousSeverity': alert.previous_severity,
'trendIndication': alert.trend_indication,
'receiveTime': alert.receive_time,
'lastReceiveId': alert.last_receive_id,
'lastReceiveTime': alert.last_receive_time
},
'$addToSet': {'tags': {'$each': alert.tags}},
'$push': {
'history': {
'$each': [h.serialize for h in history],
'$slice': current_app.config['HISTORY_LIMIT'],
'$position': 0
}
}
}
# only update those attributes that are specifically defined
attributes = {'attributes.' + k: v for k, v in alert.attributes.items()}
update['$set'].update(attributes)
if alert.update_time:
update['$set']['updateTime'] = alert.update_time
return self.get_db().alerts.find_one_and_update(
query,
update=update,
return_document=ReturnDocument.AFTER
)
def create_alert(self, alert):
data = {
'_id': alert.id,
'resource': alert.resource,
'event': alert.event,
'environment': alert.environment,
'severity': alert.severity,
'correlate': alert.correlate,
'status': alert.status,
'service': alert.service,
'group': alert.group,
'value': alert.value,
'text': alert.text,
'tags': alert.tags,
'attributes': alert.attributes,
'origin': alert.origin,
'type': alert.event_type,
'createTime': alert.create_time,
'timeout': alert.timeout,
'rawData': alert.raw_data,
'customer': alert.customer,
'duplicateCount': alert.duplicate_count,
'repeat': alert.repeat,
'previousSeverity': alert.previous_severity,
'trendIndication': alert.trend_indication,
'receiveTime': alert.receive_time,
'lastReceiveId': alert.last_receive_id,
'lastReceiveTime': alert.last_receive_time,
'updateTime': alert.update_time,
'history': [h.serialize for h in alert.history]
}
if self.get_db().alerts.insert_one(data).inserted_id == alert.id:
return data
def set_alert(self, id, severity, status, tags, attributes, timeout, previous_severity, update_time, history=None):
query = {'_id': {'$regex': '^' + id}}
update = {
'$set': {
'severity': severity,
'status': status,
'attributes': attributes,
'timeout': timeout,
'previousSeverity': previous_severity,
'updateTime': update_time
},
'$addToSet': {'tags': {'$each': tags}},
'$push': {
'history': {
'$each': [h.serialize for h in history],
'$slice': current_app.config['HISTORY_LIMIT'],
'$position': 0
}
}
}
return self.get_db().alerts.find_one_and_update(
query,
update=update,
return_document=ReturnDocument.AFTER
)
def get_alert(self, id, customers=None):
if len(id) == 8:
query = {'$or': [{'_id': {'$regex': '^' + id}}, {'lastReceiveId': {'$regex': '^' + id}}]}
else:
query = {'$or': [{'_id': id}, {'lastReceiveId': id}]}
if customers:
query['customer'] = {'$in': customers}
return self.get_db().alerts.find_one(query)
# STATUS, TAGS, ATTRIBUTES
def set_status(self, id, status, timeout, update_time, history=None):
"""
Set status and update history.
"""
query = {'_id': {'$regex': '^' + id}}
update = {
'$set': {'status': status, 'timeout': timeout, 'updateTime': update_time},
'$push': {
'history': {
'$each': [history.serialize],
'$slice': current_app.config['HISTORY_LIMIT'],
'$position': 0
}
}
}
return self.get_db().alerts.find_one_and_update(
query,
update=update,
return_document=ReturnDocument.AFTER
)
def tag_alert(self, id, tags):
"""
Append tags to tag list. Don't add same tag more than once.
"""
response = self.get_db().alerts.update_one(
{'_id': {'$regex': '^' + id}}, {'$addToSet': {'tags': {'$each': tags}}})
return response.matched_count > 0
def untag_alert(self, id, tags):
"""
Remove tags from tag list.
"""
response = self.get_db().alerts.update_one({'_id': {'$regex': '^' + id}}, {'$pullAll': {'tags': tags}})
return response.matched_count > 0
def update_tags(self, id, tags):
response = self.get_db().alerts.update_one({'_id': {'$regex': '^' + id}}, update={'$set': {'tags': tags}})
return response.matched_count > 0
def update_attributes(self, id, old_attrs, new_attrs):
update = dict()
set_value = {'attributes.' + k: v for k, v in new_attrs.items() if v is not None}
if set_value:
update['$set'] = set_value
unset_value = {'attributes.' + k: v for k, v in new_attrs.items() if v is None}
if unset_value:
update['$unset'] = unset_value
if update:
return self.get_db().alerts.find_one_and_update(
{'_id': {'$regex': '^' + id}},
update=update,
return_document=ReturnDocument.AFTER
)['attributes']
return {}
def delete_alert(self, id):
response = self.get_db().alerts.delete_one({'_id': {'$regex': '^' + id}})
return True if response.deleted_count == 1 else False
# BULK
def tag_alerts(self, query=None, tags=None):
query = query or Query()
updated = list(self.get_db().alerts.find(query.where, projection={'_id': 1}))
response = self.get_db().alerts.update(query.where, {'$addToSet': {'tags': {'$each': tags}}})
return updated if response['n'] else []
def untag_alerts(self, query=None, tags=None):
query = query or Query()
updated = list(self.get_db().alerts.find(query.where, projection={'_id': 1}))
response = self.get_db().alerts.update(query.where, {'$pullAll': {'tags': tags}})
return updated if response['n'] else []
def update_attributes_by_query(self, query=None, attributes=None):
query = query or Query()
update = dict()
set_value = {'attributes.' + k: v for k, v in attributes.items() if v is not None}
if set_value:
update['$set'] = set_value
unset_value = {'attributes.' + k: v for k, v in attributes.items() if v is None}
if unset_value:
update['$unset'] = unset_value
updated = list(self.get_db().alerts.find(query.where, projection={'_id': 1}))
response = self.get_db().alerts.update_many(query.where, update=update)
return updated if response.matched_count > 0 else []
def delete_alerts(self, query=None):
query = query or Query()
deleted = list(self.get_db().alerts.find(query.where, projection={'_id': 1}))
response = self.get_db().alerts.remove(query.where)
return deleted if response['n'] else []
# SEARCH & HISTORY
def add_history(self, id, history):
query = {'_id': {'$regex': '^' + id}}
update = {
'$push': {
'history': {
'$each': [history.serialize],
'$slice': current_app.config['HISTORY_LIMIT'],
'$position': 0
}
}
}
return self.get_db().alerts.find_one_and_update(
query,
update=update,
return_document=ReturnDocument.AFTER
)
def get_alerts(self, query=None, raw_data=False, history=False, page=None, page_size=None):
query = query or Query()
fields = dict()
if not raw_data:
fields['rawData'] = 0
if not history:
fields['history'] = 0
pipeline = [
{'$lookup': {
'from': 'codes',
'localField': 'severity',
'foreignField': 'severity',
'as': 'fromCodes'
}},
{'$replaceRoot': {'newRoot': {'$mergeObjects': [{'$arrayElemAt': ['$fromCodes', 0]}, '$$ROOT']}}},
{'$project': {'fromCodes': 0}},
{'$lookup': {
'from': 'states',
'localField': 'status',
'foreignField': 'status',
'as': 'fromStates'
}},
{'$replaceRoot': {'newRoot': {'$mergeObjects': [{'$arrayElemAt': ['$fromStates', 0]}, '$$ROOT']}}},
{'$project': {'fromStates': 0}},
{'$match': query.where},
{'$project': fields},
{'$sort': {k: v for k, v in query.sort}},
{'$skip': (page - 1) * page_size},
{'$limit': page_size}
]
return self.get_db().alerts.aggregate(pipeline)
def get_alert_history(self, alert, page=None, page_size=None):
query = {
'environment': alert.environment,
'resource': alert.resource,
'$or': [
{
'event': alert.event
},
{
'correlate': alert.event,
}
],
'customer': alert.customer
}
fields = {
'resource': 1,
'event': 1,
'environment': 1,
'customer': 1,
'service': 1,
'group': 1,
'tags': 1,
'attributes': 1,
'origin': 1,
'type': 1,
'history': 1
}
pipeline = [
{'$unwind': '$history'},
{'$match': query},
{'$project': fields},
{'$sort': {'history.updateTime': -1}},
{'$skip': (page - 1) * page_size},
{'$limit': page_size},
]
responses = self.get_db().alerts.aggregate(pipeline)
history = list()
for response in responses:
history.append(
{
'id': response['history']['id'],
'resource': response['resource'],
'event': response['history'].get('event'),
'environment': response['environment'],
'severity': response['history'].get('severity'),
'service': response['service'],
'status': response['history'].get('status'),
'group': response['group'],
'value': response['history'].get('value'),
'text': response['history'].get('text'),
'tags': response['tags'],
'attributes': response['attributes'],
'origin': response['origin'],
'updateTime': response['history']['updateTime'],
'user': response['history'].get('user'),
'timeout': response['history'].get('timeout'),
'type': response['history'].get('type', 'unknown'),
'customer': response.get('customer')
}
)
return history
def get_history(self, query=None, page=None, page_size=None):
query = query or Query()
fields = {
'resource': 1,
'event': 1,
'environment': 1,
'customer': 1,
'service': 1,
'group': 1,
'tags': 1,
'attributes': 1,
'origin': 1,
'user': 1,
'timeout': 1,
'type': 1,
'history': 1
}
pipeline = [
{'$unwind': '$history'},
{'$match': query.where},
{'$project': fields},
{'$sort': {'history.updateTime': -1}},
{'$skip': (page - 1) * page_size},
{'$limit': page_size},
]
responses = self.get_db().alerts.aggregate(pipeline)
history = list()
for response in responses:
history.append(
{
'id': response['history']['id'],
'resource': response['resource'],
'event': response['history']['event'],
'environment': response['environment'],
'severity': response['history']['severity'],
'service': response['service'],
'status': response['history']['status'],
'group': response['group'],
'value': response['history']['value'],
'text': response['history']['text'],
'tags': response['tags'],
'attributes': response['attributes'],
'origin': response['origin'],
'updateTime': response['history']['updateTime'],
'user': response.get('user'),
'timeout': response.get('timeout'),
'type': response['history'].get('type', 'unknown'),
'customer': response.get('customer', None)
}
)
return history
# COUNTS
def get_count(self, query=None):
"""
Return total number of alerts that meet the query filter.
"""
query = query or Query()
return self.get_db().alerts.count_documents(query.where)
def get_counts(self, query=None, group=None):
query = query or Query()
if group is None:
raise ValueError('Must define a group')
pipeline = [
{'$match': query.where},
{'$project': {group: 1}},
{'$group': {'_id': '$' + group, 'count': {'$sum': 1}}}
]
responses = self.get_db().alerts.aggregate(pipeline)
counts = dict()
for response in responses:
counts[response['_id']] = response['count']
return counts
def get_counts_by_severity(self, query=None):
query = query or Query()
return self.get_counts(query, group='severity')
def get_counts_by_status(self, query=None):
query = query or Query()
return self.get_counts(query, group='status')
def get_topn_count(self, query=None, group='event', topn=100):
query = query or Query()
pipeline = [
{'$match': query.where},
{'$unwind': '$service'},
{
'$group': {
'_id': f'${group}',
'count': {'$sum': 1},
'duplicateCount': {'$sum': '$duplicateCount'},
'environments': {'$addToSet': '$environment'},
'services': {'$addToSet': '$service'},
'resources': {'$addToSet': {'id': '$_id', 'resource': '$resource'}}
}
},
{'$sort': {'count': -1, 'duplicateCount': -1}},
{'$limit': topn}
]
responses = self.get_db().alerts.aggregate(pipeline, allowDiskUse=True)
top = list()
for response in responses:
top.append(
{
f'{group}': response['_id'],
'environments': response['environments'],
'services': response['services'],
'resources': response['resources'],
'count': response['count'],
'duplicateCount': response['duplicateCount']
}
)
return top
def get_topn_flapping(self, query=None, group='event', topn=100):
query = query or Query()
pipeline = [
{'$match': query.where},
{'$unwind': '$service'},
{'$unwind': '$history'},
{'$match': {'history.type': 'severity'}},
{
'$group': {
'_id': f'${group}',
'count': {'$sum': 1},
'duplicateCount': {'$max': '$duplicateCount'},
'environments': {'$addToSet': '$environment'},
'services': {'$addToSet': '$service'},
'resources': {'$addToSet': {'id': '$_id', 'resource': '$resource'}}
}
},
{'$sort': {'count': -1, 'duplicateCount': -1}},
{'$limit': topn}
]
responses = self.get_db().alerts.aggregate(pipeline, allowDiskUse=True)
top = list()
for response in responses:
top.append(
{
f'{group}': response['_id'],
'environments': response['environments'],
'services': response['services'],
'resources': response['resources'],
'count': response['count'],
'duplicateCount': response['duplicateCount']
}
)
return top
def get_topn_standing(self, query=None, group='event', topn=100):
query = query or Query()
pipeline = [
{'$match': query.where},
{'$unwind': '$service'},
{
'$group': {
'_id': f'${group}',
'count': {'$sum': 1},
'duplicateCount': {'$sum': '$duplicateCount'},
'lifeTime': {'$sum': {'$subtract': ['$lastReceiveTime', '$createTime']}},
'environments': {'$addToSet': '$environment'},
'services': {'$addToSet': '$service'},
'resources': {'$addToSet': {'id': '$_id', 'resource': '$resource'}}
}
},
{'$sort': {'lifeTime': -1, 'duplicateCount': -1}},
{'$limit': topn}
]
responses = self.get_db().alerts.aggregate(pipeline, allowDiskUse=True)
top = list()
for response in responses:
top.append(
{
f'{group}': response['_id'],
'environments': response['environments'],
'services': response['services'],
'resources': response['resources'],
'count': response['count'],
'duplicateCount': response['duplicateCount']
}
)
return top
# ENVIRONMENTS
def get_environments(self, query=None, topn=1000):
query = query or Query()
def pipeline(group_by):
return [
{'$match': query.where},
{'$project': {'environment': 1, group_by: 1}},
{'$group':
{
'_id': {'environment': '$environment', group_by: '$' + group_by},
'count': {'$sum': 1}
}
},
{'$limit': topn}
]
response_severity = self.get_db().alerts.aggregate(pipeline('severity'))
severity_count = defaultdict(list)
for r in response_severity:
severity_count[r['_id']['environment']].append((r['_id']['severity'], r['count']))
response_status = self.get_db().alerts.aggregate(pipeline('status'))
status_count = defaultdict(list)
for r in response_status:
status_count[r['_id']['environment']].append((r['_id']['status'], r['count']))
environments = self.get_db().alerts.find().distinct('environment')
return [
{
'environment': env,
'severityCounts': dict(severity_count[env]),
'statusCounts': dict(status_count[env]),
'count': sum(t[1] for t in severity_count[env])
} for env in environments]
# SERVICES
def get_services(self, query=None, topn=1000):
query = query or Query()
def pipeline(group_by):
return [
{'$unwind': '$service'},
{'$match': query.where},
{'$project': {'environment': 1, 'service': 1, group_by: 1}},
{'$group':
{
'_id': {'environment': '$environment', 'service': '$service', group_by: '$' + group_by},
'count': {'$sum': 1}
}
},
{'$limit': topn}
]
response_severity = self.get_db().alerts.aggregate(pipeline('severity'))
severity_count = defaultdict(list)
for r in response_severity:
severity_count[(r['_id']['environment'], r['_id']['service'])].append((r['_id']['severity'], r['count']))
response_status = self.get_db().alerts.aggregate(pipeline('status'))
status_count = defaultdict(list)
for r in response_status:
status_count[(r['_id']['environment'], r['_id']['service'])].append((r['_id']['status'], r['count']))
pipeline = [
{'$unwind': '$service'},
{'$group': {'_id': {'environment': '$environment', 'service': '$service'}}},
{'$limit': topn}
]
services = list(self.get_db().alerts.aggregate(pipeline))
return [
{
'environment': svc['_id']['environment'],
'service': svc['_id']['service'],
'severityCounts': dict(severity_count[(svc['_id']['environment'], svc['_id']['service'])]),
'statusCounts': dict(status_count[(svc['_id']['environment'], svc['_id']['service'])]),
'count': sum(t[1] for t in severity_count[(svc['_id']['environment'], svc['_id']['service'])])
} for svc in services]
# ALERT GROUPS
def get_alert_groups(self, query=None, topn=1000):
query = query or Query()
pipeline = [
{'$match': query.where},
{'$project': {'environment': 1, 'group': 1}},
{'$limit': topn},
{'$group': {'_id': {'environment': '$environment', 'group': '$group'}, 'count': {'$sum': 1}}}
]
responses = self.get_db().alerts.aggregate(pipeline)
groups = list()
for response in responses:
groups.append(
{
'environment': response['_id']['environment'],
'group': response['_id']['group'],
'count': response['count']
}
)
return groups
# ALERT TAGS
def get_alert_tags(self, query=None, topn=1000):
query = query or Query()
pipeline = [
{'$match': query.where},
{'$unwind': '$tags'},
{'$project': {'environment': 1, 'tags': 1}},
{'$limit': topn},
{'$group': {'_id': {'environment': '$environment', 'tag': '$tags'}, 'count': {'$sum': 1}}}
]
responses = self.get_db().alerts.aggregate(pipeline)
tags = list()
for response in responses:
tags.append(
{
'environment': response['_id']['environment'],
'tag': response['_id']['tag'],
'count': response['count']
}
)
return tags
# BLACKOUTS
def create_blackout(self, blackout):
data = {
'_id': blackout.id,
'priority': blackout.priority,
'environment': blackout.environment,
'startTime': blackout.start_time,
'endTime': blackout.end_time,
'duration': blackout.duration,
'user': blackout.user,
'createTime': blackout.create_time,
'text': blackout.text,
}
if blackout.service:
data['service'] = blackout.service
if blackout.resource:
data['resource'] = blackout.resource
if blackout.event:
data['event'] = blackout.event
if blackout.group:
data['group'] = blackout.group
if blackout.tags:
data['tags'] = blackout.tags
if blackout.origin:
data['origin'] = blackout.origin
if blackout.customer:
data['customer'] = blackout.customer
if self.get_db().blackouts.insert_one(data).inserted_id == blackout.id:
return data
def get_blackout(self, id, customers=None):
query = {'_id': id}
if customers:
query['customer'] = {'$in': customers}
return self.get_db().blackouts.find_one(query)
def get_blackouts(self, query=None, page=None, page_size=None):
query = query or Query()
return self.get_db().blackouts.find(query.where, sort=query.sort).skip((page - 1) * page_size).limit(page_size)
def get_blackouts_count(self, query=None):
query = query or Query()
return self.get_db().blackouts.count_documents(query.where)
def is_blackout_period(self, alert):
query = dict()
query['startTime'] = {'$lte': alert.create_time}
query['endTime'] = {'$gt': alert.create_time}
query['environment'] = alert.environment
query['$and'] = [{'$or': [
{'resource': None, 'service': None, 'event': None, 'group': None, 'tags': None, 'origin': None},
{'resource': None, 'service': None, 'event': None, 'group': None, 'tags': None, 'origin': alert.origin},
{'resource': None, 'service': None, 'event': None, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': None, 'service': None, 'event': None, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': None, 'service': None, 'event': None, 'group': alert.group, 'tags': None, 'origin': None},
{'resource': None, 'service': None, 'event': None, 'group': alert.group, 'tags': None, 'origin': alert.origin},
{'resource': None, 'service': None, 'event': None, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': None, 'service': None, 'event': None, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': None, 'service': None, 'event': alert.event, 'group': None, 'tags': None, 'origin': None},
{'resource': None, 'service': None, 'event': alert.event, 'group': None, 'tags': None, 'origin': alert.origin},
{'resource': None, 'service': None, 'event': alert.event, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': None, 'service': None, 'event': alert.event, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': None, 'service': None, 'event': alert.event, 'group': alert.group, 'tags': None, 'origin': None},
{'resource': None, 'service': None, 'event': alert.event, 'group': alert.group, 'tags': None, 'origin': alert.origin},
{'resource': None, 'service': None, 'event': alert.event, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': None, 'service': None, 'event': alert.event, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': None, 'tags': None, 'origin': None},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': None, 'tags': None, 'origin': alert.origin},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': alert.group, 'tags': None, 'origin': None},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': alert.group, 'tags': None, 'origin': alert.origin},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': None, 'tags': None, 'origin': None},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': None, 'tags': None, 'origin': alert.origin},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': alert.group, 'tags': None, 'origin': None},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': alert.group, 'tags': None, 'origin': alert.origin},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': None, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': alert.resource, 'service': None, 'event': None, 'group': None, 'tags': None, 'origin': None},
{'resource': alert.resource, 'service': None, 'event': None, 'group': None, 'tags': None, 'origin': alert.origin},
{'resource': alert.resource, 'service': None, 'event': None, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': alert.resource, 'service': None, 'event': None, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': alert.resource, 'service': None, 'event': None, 'group': alert.group, 'tags': None, 'origin': None},
{'resource': alert.resource, 'service': None, 'event': None, 'group': alert.group, 'tags': None, 'origin': alert.origin},
{'resource': alert.resource, 'service': None, 'event': None, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': alert.resource, 'service': None, 'event': None, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': alert.resource, 'service': None, 'event': alert.event, 'group': None, 'tags': None, 'origin': None},
{'resource': alert.resource, 'service': None, 'event': alert.event, 'group': None, 'tags': None, 'origin': alert.origin},
{'resource': alert.resource, 'service': None, 'event': alert.event, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': alert.resource, 'service': None, 'event': alert.event, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': alert.resource, 'service': None, 'event': alert.event, 'group': alert.group, 'tags': None, 'origin': None},
{'resource': alert.resource, 'service': None, 'event': alert.event, 'group': alert.group, 'tags': None, 'origin': alert.origin},
{'resource': alert.resource, 'service': None, 'event': alert.event, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': alert.resource, 'service': None, 'event': alert.event, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': None, 'tags': None, 'origin': None},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': None, 'tags': None, 'origin': alert.origin},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': alert.group, 'tags': None, 'origin': None},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': alert.group, 'tags': None, 'origin': alert.origin},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': None, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': None, 'tags': None, 'origin': None},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': None, 'tags': None, 'origin': alert.origin},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': None, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': alert.group, 'tags': None, 'origin': None},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': alert.group, 'tags': None, 'origin': alert.origin},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': None},
{'resource': alert.resource, 'service': {'$not': {'$elemMatch': {'$nin': alert.service}}}, 'event': alert.event, 'group': alert.group, 'tags': {'$not': {'$elemMatch': {'$nin': alert.tags}}}, 'origin': alert.origin},
]}]
if current_app.config['CUSTOMER_VIEWS']:
query['$and'].append({'$or': [{'customer': None}, {'customer': alert.customer}]})
if self.get_db().blackouts.find_one(query):
return True
return False
def update_blackout(self, id, **kwargs):
return self.get_db().blackouts.find_one_and_update(
{'_id': id},
update={'$set': kwargs},
return_document=ReturnDocument.AFTER
)
def delete_blackout(self, id):
response = self.get_db().blackouts.delete_one({'_id': id})
return True if response.deleted_count == 1 else False
# HEARTBEATS
def upsert_heartbeat(self, heartbeat):
return self.get_db().heartbeats.find_one_and_update(
{
'origin': heartbeat.origin,
'customer': heartbeat.customer
},
{
'$setOnInsert': {
'_id': heartbeat.id
},
'$set': {
'origin': heartbeat.origin,
'tags': heartbeat.tags,
'attributes': heartbeat.attributes,
'type': heartbeat.event_type,
'createTime': heartbeat.create_time,
'timeout': heartbeat.timeout,
'receiveTime': heartbeat.receive_time,
'customer': heartbeat.customer
}
},
upsert=True,
return_document=ReturnDocument.AFTER
)
def get_heartbeat(self, id, customers=None):
if len(id) == 8:
query = {'_id': {'$regex': '^' + id}}
else:
query = {'_id': id}
if customers:
query['customer'] = {'$in': customers}
return self.get_db().heartbeats.find_one(query)
def get_heartbeats(self, query=None, page=None, page_size=None):
query = query or Query()
return self.get_db().heartbeats.find(query.where, sort=query.sort).skip((page - 1) * page_size).limit(page_size)
def get_heartbeats_by_status(self, status=None, query=None, page=None, page_size=None):
status = status or list()
query = query or Query()
max_latency = current_app.config['HEARTBEAT_MAX_LATENCY']
pipeline = [{'$match': query.where}]
if status:
pipeline.extend([
{'$addFields': {'timeoutInMs': {'$multiply': ['$timeout', 1000]}}},
{'$addFields': {'isExpired': {'$gt': [{'$subtract': [datetime.utcnow(), '$receiveTime']}, '$timeoutInMs']}}},
{'$addFields': {'isSlow': {'$gt': [{'$subtract': ['$receiveTime', '$createTime']}, max_latency]}}}
])
match_or = list()
if HeartbeatStatus.OK in status:
match_or.append({'isExpired': False, 'isSlow': False})
if HeartbeatStatus.Expired in status:
match_or.append({'isExpired': True})
if HeartbeatStatus.Slow in status:
match_or.append({'isExpired': False, 'isSlow': True})
pipeline.append({'$match': {'$or': match_or}})
pipeline.extend([
{'$sort': {k: v for k, v in query.sort}},
{'$skip': (page - 1) * page_size},
{'$limit': page_size}
])
return self.get_db().heartbeats.aggregate(pipeline)
def get_heartbeats_count(self, query=None):
query = query or Query()
return self.get_db().heartbeats.count_documents(query.where)
def delete_heartbeat(self, id):
response = self.get_db().heartbeats.delete_one({'_id': {'$regex': '^' + id}})
return True if response.deleted_count == 1 else False
# API KEYS
# save
def create_key(self, key):
data = {
'_id': key.id,
'key': key.key,
'user': key.user,
'scopes': key.scopes,
'text': key.text,
'expireTime': key.expire_time,
'count': key.count,
'lastUsedTime': key.last_used_time
}
if key.customer:
data['customer'] = key.customer
if self.get_db().keys.insert_one(data).inserted_id == key.id:
return data
# get
def get_key(self, key, user=None):
query = {'$or': [{'key': key}, {'_id': key}]}
if user:
query['user'] = user
return self.get_db().keys.find_one(query)
# list
def get_keys(self, query=None, page=None, page_size=None):
query = query or Query()
return self.get_db().keys.find(query.where, sort=query.sort).skip((page - 1) * page_size).limit(page_size)
def get_keys_by_user(self, user):
return self.get_db().keys.find({'user': user})
def get_keys_count(self, query=None):
query = query or Query()
return self.get_db().keys.count_documents(query.where)
def update_key(self, key, **kwargs):
return self.get_db().keys.find_one_and_update(
{'$or': [{'key': key}, {'_id': key}]},
update={'$set': kwargs},
return_document=ReturnDocument.AFTER
)
# update
def update_key_last_used(self, key):
return self.get_db().keys.update_one(
{'$or': [{'key': key}, {'_id': key}]},
{
'$set': {'lastUsedTime': datetime.utcnow()},
'$inc': {'count': 1}
}
).matched_count == 1
# delete
def delete_key(self, key):
query = {'$or': [{'key': key}, {'_id': key}]}
response = self.get_db().keys.delete_one(query)
return True if response.deleted_count == 1 else False
# USERS
def create_user(self, user):
data = {
'_id': user.id,
'name': user.name,
'login': user.login,
'password': user.password,
'email': user.email,
'status': user.status,
'roles': user.roles,
'attributes': user.attributes,
'createTime': user.create_time,
'lastLogin': user.last_login,
'text': user.text,
'updateTime': user.update_time,
'email_verified': user.email_verified
}
if self.get_db().users.insert_one(data).inserted_id == user.id:
return data
# get
def get_user(self, id):
query = {'_id': id}
return self.get_db().users.find_one(query)
# list
def get_users(self, query=None, page=None, page_size=None):
query = query or Query()
return self.get_db().users.find(query.where, sort=query.sort).skip((page - 1) * page_size).limit(page_size)
def get_users_count(self, query=None):
query = query or Query()
return self.get_db().users.count_documents(query.where)
def get_user_by_username(self, username):
if not username:
return
query = {'$or': [{'login': username}, {'email': username}]}
return self.get_db().users.find_one(query)
def get_user_by_email(self, email):
if not email:
return
query = {'email': email}
return self.get_db().users.find_one(query)
def get_user_by_hash(self, hash):
query = {'hash': hash}
return self.get_db().users.find_one(query)
def update_last_login(self, id):
return self.get_db().users.update_one(
{'_id': id},
update={'$set': {'lastLogin': datetime.utcnow()}}
).matched_count == 1
def update_user(self, id, **kwargs):
update = dict()
if 'attributes' in kwargs:
update['$set'] = {k: v for k, v in kwargs.items() if k != 'attributes'}
set_value = {'attributes.' + k: v for k, v in kwargs['attributes'].items() if v is not None}
if set_value:
update['$set'].update(set_value)
unset_value = {'attributes.' + k: v for k, v in kwargs['attributes'].items() if v is None}
if unset_value:
update['$unset'] = unset_value
else:
update['$set'] = kwargs
return self.get_db().users.find_one_and_update(
{'_id': {'$regex': '^' + id}}, update=update, return_document=ReturnDocument.AFTER
)
def update_user_attributes(self, id, old_attrs, new_attrs):
"""
Set all attributes and unset attributes by using a value of 'null'.
"""
from alerta.utils.collections import merge
merge(old_attrs, new_attrs)
attrs = {k: v for k, v in old_attrs.items() if v is not None}
update = {
'$set': {'attributes': attrs}
}
response = self.get_db().users.update_one({'_id': {'$regex': '^' + id}}, update=update)
return response.matched_count > 0
def delete_user(self, id):
response = self.get_db().users.delete_one({'_id': id})
return True if response.deleted_count == 1 else False
def set_email_hash(self, id, hash):
return self.get_db().users.update_one(
{'_id': id},
update={'$set': {'hash': hash, 'updateTime': datetime.utcnow()}}
).matched_count == 1
# GROUPS
def create_group(self, group):
data = {
'_id': group.id,
'name': group.name,
'text': group.text
}
if self.get_db().groups.insert_one(data).inserted_id == group.id:
return data
def get_group(self, id):
query = {'_id': id}
return self.get_db().groups.find_one(query)
def get_groups(self, query=None, page=None, page_size=None):
query = query or Query()
return self.get_db().groups.find(query.where, sort=query.sort).skip((page - 1) * page_size).limit(page_size)
def get_groups_count(self, query=None):
query = query or Query()
return self.get_db().groups.count_documents(query.where)
def get_group_users(self, id):
pipeline = [
{'$match': {'_id': id}},
{'$unwind': '$users'},
{'$lookup': {
'from': 'users',
'localField': 'users',
'foreignField': '_id',
'as': 'groupUser'
}},
{'$project': {'groupUser': 1}} # u.id, u.login, u.email, u.name, u.status
]
responses = self.get_db().groups.aggregate(pipeline)
users = list()
for response in responses:
users.append(
{
'id': response['groupUser'][0]['_id'],
'login': response['groupUser'][0].get('login'),
'email': response['groupUser'][0]['email'],
'name': response['groupUser'][0]['name'],
'status': response['groupUser'][0]['status']
}
)
return users
def update_group(self, id, **kwargs):
return self.get_db().groups.find_one_and_update(
{'_id': id},
update={'$set': kwargs},
return_document=ReturnDocument.AFTER
)
def add_user_to_group(self, group, user):
response = self.get_db().groups.update_one(
{'_id': group}, {'$addToSet': {'users': user}})
return response.matched_count > 0
def remove_user_from_group(self, group, user):
response = self.get_db().groups.update_one({'_id': group}, {'$pullAll': {'users': [user]}})
return response.matched_count > 0
def delete_group(self, id):
response = self.get_db().groups.delete_one({'_id': id})
return True if response.deleted_count == 1 else False
def get_groups_by_user(self, user):
return self.get_db().groups.find({'users': user})
# PERMISSIONS
def create_perm(self, perm):
data = {
'_id': perm.id,
'match': perm.match,
'scopes': perm.scopes
}
if self.get_db().perms.insert_one(data).inserted_id == perm.id:
return data
def get_perm(self, id):
query = {'_id': id}
return self.get_db().perms.find_one(query)
def get_perms(self, query=None, page=None, page_size=None):
query = query or Query()
return self.get_db().perms.find(query.where, sort=query.sort).skip((page - 1) * page_size).limit(page_size)
def get_perms_count(self, query=None):
query = query or Query()
return self.get_db().perms.count_documents(query.where)
def update_perm(self, id, **kwargs):
return self.get_db().perms.find_one_and_update(
{'_id': id},
update={'$set': kwargs},
return_document=ReturnDocument.AFTER
)
def delete_perm(self, id):
response = self.get_db().perms.delete_one({'_id': id})
return True if response.deleted_count == 1 else False
def get_scopes_by_match(self, login, matches):
if login in current_app.config['ADMIN_USERS']:
return ADMIN_SCOPES
scopes = list()
for match in matches:
if match in current_app.config['ADMIN_ROLES']:
return ADMIN_SCOPES
if match in current_app.config['USER_ROLES']:
scopes.extend(current_app.config['USER_DEFAULT_SCOPES'])
if match in current_app.config['GUEST_ROLES']:
scopes.extend(current_app.config['GUEST_DEFAULT_SCOPES'])
response = self.get_db().perms.find_one({'match': match}, projection={'scopes': 1, '_id': 0})
if response:
scopes.extend(response['scopes'])
return sorted(set(scopes))
# CUSTOMERS
def create_customer(self, customer):
data = {
'_id': customer.id,
'match': customer.match,
'customer': customer.customer
}
if self.get_db().customers.insert_one(data).inserted_id == customer.id:
return data
def get_customer(self, id):
query = {'_id': id}
return self.get_db().customers.find_one(query)
def get_customers(self, query=None, page=None, page_size=None):
query = query or Query()
return self.get_db().customers.find(query.where, sort=query.sort).skip((page - 1) * page_size).limit(page_size)
def get_customers_count(self, query=None):
query = query or Query()
return self.get_db().customers.count_documents(query.where)
def update_customer(self, id, **kwargs):
return self.get_db().customers.find_one_and_update(
{'_id': id},
update={'$set': kwargs},
return_document=ReturnDocument.AFTER
)
def delete_customer(self, id):
response = self.get_db().customers.delete_one({'_id': id})
return True if response.deleted_count == 1 else False
def get_customers_by_match(self, login, matches):
if login in current_app.config['ADMIN_USERS']:
return '*' # all customers
customers = []
for match in [login] + matches:
for r in self.get_db().customers.find({'match': match}):
customers.append(r['customer'])
if customers:
if '*' in customers:
return '*' # all customers
return customers
raise NoCustomerMatch(f"No customer lookup configured for user '{login}' or '{','.join(matches)}'")
# NOTES
def create_note(self, note):
data = {
'_id': note.id,
'text': note.text,
'user': note.user,
'attributes': note.attributes,
'type': note.note_type,
'createTime': note.create_time,
'updateTime': note.update_time,
'alert': note.alert
}
if note.customer:
data['customer'] = note.customer
if self.get_db().notes.insert_one(data).inserted_id == note.id:
return data
def get_note(self, id):
query = {'_id': id}
return self.get_db().notes.find_one(query)
def get_notes(self, query=None, page=None, page_size=None):
query = query or Query()
return self.get_db().notes.find(query.where, sort=query.sort).skip((page - 1) * page_size).limit(page_size)
def get_alert_notes(self, id, page=None, page_size=None):
if len(id) == 8:
query = {'alert': {'$regex': '^' + id}}
else:
query = {'alert': id}
return self.get_db().notes.find(query).skip((page - 1) * page_size).limit(page_size)
def get_customer_notes(self, customer, page=None, page_size=None):
return self.get_db().notes.find({'customer': customer}).skip((page - 1) * page_size).limit(page_size)
def update_note(self, id, **kwargs):
kwargs['updateTime'] = datetime.utcnow()
return self.get_db().notes.find_one_and_update(
{'_id': id},
update={'$set': kwargs},
return_document=ReturnDocument.AFTER
)
def delete_note(self, id):
response = self.get_db().notes.delete_one({'_id': id})
return True if response.deleted_count == 1 else False
# METRICS
def get_metrics(self, type=None):
query = {'type': type} if type else {}
return list(self.get_db().metrics.find(query, {'_id': 0}))
def set_gauge(self, gauge):
return self.get_db().metrics.find_one_and_update(
{
'group': gauge.group,
'name': gauge.name
},
{
'$set': {
'group': gauge.group,
'name': gauge.name,
'title': gauge.title,
'description': gauge.description,
'value': gauge.value,
'type': 'gauge'
}
},
upsert=True,
return_document=ReturnDocument.AFTER
)['value']
def inc_counter(self, counter):
return self.get_db().metrics.find_one_and_update(
{
'group': counter.group,
'name': counter.name
},
{
'$set': {
'group': counter.group,
'name': counter.name,
'title': counter.title,
'description': counter.description,
'type': 'counter'
},
'$inc': {'count': counter.count}
},
upsert=True,
return_document=ReturnDocument.AFTER
)['count']
def update_timer(self, timer):
return self.get_db().metrics.find_one_and_update(
{
'group': timer.group,
'name': timer.name
},
{
'$set': {
'group': timer.group,
'name': timer.name,
'title': timer.title,
'description': timer.description,
'type': 'timer'
},
'$inc': {'count': timer.count, 'totalTime': timer.total_time}
},
upsert=True,
return_document=ReturnDocument.AFTER
)
# HOUSEKEEPING
def get_expired(self, expired_threshold, info_threshold):
# delete 'closed' or 'expired' alerts older than "expired_threshold" seconds
# and 'informational' alerts older than "info_threshold" seconds
if expired_threshold:
expired_seconds_ago = datetime.utcnow() - timedelta(seconds=expired_threshold)
self.get_db().alerts.delete_many(
{'status': {'$in': ['closed', 'expired']}, 'lastReceiveTime': {'$lt': expired_seconds_ago}})
if info_threshold:
info_seconds_ago = datetime.utcnow() - timedelta(seconds=info_threshold)
self.get_db().alerts.delete_many({'severity': alarm_model.DEFAULT_INFORM_SEVERITY, 'lastReceiveTime': {'$lt': info_seconds_ago}})
# get list of alerts to be newly expired
pipeline = [
{'$match': {'status': {'$nin': ['expired']}}},
{'$addFields': {
'computedTimeout': {'$multiply': [{'$ifNull': ['$timeout', current_app.config['ALERT_TIMEOUT']]}, 1000]}
}},
{'$addFields': {
'isExpired': {'$lt': [{'$add': ['$lastReceiveTime', '$computedTimeout']}, datetime.utcnow()]}
}},
{'$match': {'isExpired': True, 'computedTimeout': {'$ne': 0}}}
]
return self.get_db().alerts.aggregate(pipeline)
def get_unshelve(self):
# get list of alerts to be unshelved
pipeline = [
{'$match': {'status': 'shelved'}},
{'$unwind': '$history'},
{'$match': {
'history.type': 'shelve',
'history.status': 'shelved'
}},
{'$sort': {'history.updateTime': -1}},
{'$group': {
'_id': '$_id',
'resource': {'$first': '$resource'},
'event': {'$first': '$event'},
'environment': {'$first': '$environment'},
'severity': {'$first': '$severity'},
'correlate': {'$first': '$correlate'},
'status': {'$first': '$status'},
'service': {'$first': '$service'},
'group': {'$first': '$group'},
'value': {'$first': '$value'},
'text': {'$first': '$text'},
'tags': {'$first': '$tags'},
'attributes': {'$first': '$attributes'},
'origin': {'$first': '$origin'},
'type': {'$first': '$type'},
'createTime': {'$first': '$createTime'},
'timeout': {'$first': '$timeout'},
'rawData': {'$first': '$rawData'},
'customer': {'$first': '$customer'},
'duplicateCount': {'$first': '$duplicateCount'},
'repeat': {'$first': '$repeat'},
'previousSeverity': {'$first': '$previousSeverity'},
'trendIndication': {'$first': '$trendIndication'},
'receiveTime': {'$first': '$receiveTime'},
'lastReceiveId': {'$first': '$lastReceiveId'},
'lastReceiveTime': {'$first': '$lastReceiveTime'},
'updateTime': {'$first': '$updateTime'},
'history': {'$first': '$history'},
}},
{'$addFields': {
'computedTimeout': {'$multiply': [{'$ifNull': ['$history.timeout', current_app.config['SHELVE_TIMEOUT']]}, 1000]}
}},
{'$addFields': {
'isExpired': {'$lt': [{'$add': ['$updateTime', '$computedTimeout']}, datetime.utcnow()]}
}},
{'$match': {'isExpired': True, 'computedTimeout': {'$ne': 0}}}
]
return self.get_db().alerts.aggregate(pipeline)
def get_unack(self):
# get list of alerts to be unack'ed
pipeline = [
{'$match': {'status': 'ack'}},
{'$unwind': '$history'},
{'$match': {
'history.type': 'ack',
'history.status': 'ack'
}},
{'$sort': {'history.updateTime': -1}},
{'$group': {
'_id': '$_id',
'resource': {'$first': '$resource'},
'event': {'$first': '$event'},
'environment': {'$first': '$environment'},
'severity': {'$first': '$severity'},
'correlate': {'$first': '$correlate'},
'status': {'$first': '$status'},
'service': {'$first': '$service'},
'group': {'$first': '$group'},
'value': {'$first': '$value'},
'text': {'$first': '$text'},
'tags': {'$first': '$tags'},
'attributes': {'$first': '$attributes'},
'origin': {'$first': '$origin'},
'type': {'$first': '$type'},
'createTime': {'$first': '$createTime'},
'timeout': {'$first': '$timeout'},
'rawData': {'$first': '$rawData'},
'customer': {'$first': '$customer'},
'duplicateCount': {'$first': '$duplicateCount'},
'repeat': {'$first': '$repeat'},
'previousSeverity': {'$first': '$previousSeverity'},
'trendIndication': {'$first': '$trendIndication'},
'receiveTime': {'$first': '$receiveTime'},
'lastReceiveId': {'$first': '$lastReceiveId'},
'lastReceiveTime': {'$first': '$lastReceiveTime'},
'updateTime': {'$first': '$updateTime'},
'history': {'$first': '$history'},
}},
{'$addFields': {
'computedTimeout': {'$multiply': [{'$ifNull': ['$history.timeout', current_app.config['ACK_TIMEOUT']]}, 1000]}
}},
{'$addFields': {
'isExpired': {'$lt': [{'$add': ['$updateTime', '$computedTimeout']}, datetime.utcnow()]}
}},
{'$match': {'isExpired': True, 'computedTimeout': {'$ne': 0}}}
]
return self.get_db().alerts.aggregate(pipeline)