mirror of
https://github.com/alerta/alerta.git
synced 2025-01-24 17:29:39 +00:00
f3c51f360d
* Allow unack when status set to ack in plugin * Fix race condition for blackout test
695 lines
25 KiB
Python
695 lines
25 KiB
Python
import os
|
|
import platform
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Optional # noqa
|
|
from typing import Any, Dict, List, Tuple, Union
|
|
from uuid import uuid4
|
|
|
|
from flask import current_app, g
|
|
|
|
from alerta.app import alarm_model, db
|
|
from alerta.database.base import Query
|
|
from alerta.models.enums import ChangeType
|
|
from alerta.models.history import History, RichHistory
|
|
from alerta.models.note import Note
|
|
from alerta.utils.format import DateTime
|
|
from alerta.utils.hooks import status_change_hook
|
|
from alerta.utils.response import absolute_url
|
|
|
|
JSON = Dict[str, Any]
|
|
NoneType = type(None)
|
|
|
|
|
|
class Alert:
|
|
|
|
def __init__(self, resource: str, event: str, **kwargs) -> None:
|
|
|
|
if not resource:
|
|
raise ValueError('Missing mandatory value for "resource"')
|
|
if not event:
|
|
raise ValueError('Missing mandatory value for "event"')
|
|
if any(['.' in key for key in kwargs.get('attributes', dict()).keys()]) \
|
|
or any(['$' in key for key in kwargs.get('attributes', dict()).keys()]):
|
|
raise ValueError('Attribute keys must not contain "." or "$"')
|
|
if isinstance(kwargs.get('value', None), int):
|
|
kwargs['value'] = str(kwargs['value'])
|
|
for attr in ['create_time', 'receive_time', 'last_receive_time']:
|
|
if not isinstance(kwargs.get(attr), (datetime, NoneType)): # type: ignore
|
|
raise ValueError(f"Attribute '{attr}' must be datetime type")
|
|
|
|
timeout = kwargs.get('timeout') if kwargs.get('timeout') is not None else current_app.config['ALERT_TIMEOUT']
|
|
try:
|
|
timeout = int(timeout) # type: ignore
|
|
except ValueError:
|
|
raise ValueError(f"Could not convert 'timeout' value of '{timeout}' to an integer")
|
|
if timeout < 0:
|
|
raise ValueError(f"Invalid negative 'timeout' value ({timeout})")
|
|
|
|
self.id = kwargs.get('id') or str(uuid4())
|
|
self.resource = resource
|
|
self.event = event
|
|
self.environment = kwargs.get('environment', None) or ''
|
|
self.severity = kwargs.get('severity', None) or alarm_model.DEFAULT_NORMAL_SEVERITY
|
|
self.correlate = kwargs.get('correlate', None) or list()
|
|
if self.correlate and event not in self.correlate:
|
|
self.correlate.append(event)
|
|
self.status = kwargs.get('status', None) or alarm_model.DEFAULT_STATUS
|
|
self.service = kwargs.get('service', None) or list()
|
|
self.group = kwargs.get('group', None) or 'Misc'
|
|
self.value = kwargs.get('value', None)
|
|
self.text = kwargs.get('text', None) or ''
|
|
self.tags = kwargs.get('tags', None) or list()
|
|
self.attributes = kwargs.get('attributes', None) or dict()
|
|
self.origin = kwargs.get('origin', None) or f'{os.path.basename(sys.argv[0])}/{platform.uname()[1]}'
|
|
self.event_type = kwargs.get('event_type', kwargs.get('type', None)) or 'exceptionAlert'
|
|
self.create_time = kwargs.get('create_time', None) or datetime.utcnow()
|
|
self.timeout = timeout
|
|
self.raw_data = kwargs.get('raw_data', None)
|
|
self.customer = kwargs.get('customer', None)
|
|
|
|
self.duplicate_count = kwargs.get('duplicate_count', None)
|
|
self.repeat = kwargs.get('repeat', None)
|
|
self.previous_severity = kwargs.get('previous_severity', None)
|
|
self.trend_indication = kwargs.get('trend_indication', None)
|
|
self.receive_time = kwargs.get('receive_time', None) or datetime.utcnow()
|
|
self.last_receive_id = kwargs.get('last_receive_id', None)
|
|
self.last_receive_time = kwargs.get('last_receive_time', None)
|
|
self.update_time = kwargs.get('update_time', None)
|
|
self.history = kwargs.get('history', None) or list()
|
|
|
|
@classmethod
|
|
def parse(cls, json: JSON) -> 'Alert':
|
|
if not isinstance(json.get('correlate', []), list):
|
|
raise ValueError('correlate must be a list')
|
|
if not isinstance(json.get('service', []), list):
|
|
raise ValueError('service must be a list')
|
|
if not isinstance(json.get('tags', []), list):
|
|
raise ValueError('tags must be a list')
|
|
if not isinstance(json.get('attributes', {}), dict):
|
|
raise ValueError('attributes must be a JSON object')
|
|
if not isinstance(json.get('timeout') if json.get('timeout', None) is not None else 0, int):
|
|
raise ValueError('timeout must be an integer')
|
|
if json.get('customer', None) == '':
|
|
raise ValueError('customer must not be an empty string')
|
|
|
|
return Alert(
|
|
id=json.get('id', None),
|
|
resource=json.get('resource', None),
|
|
event=json.get('event', None),
|
|
environment=json.get('environment', None),
|
|
severity=json.get('severity', None),
|
|
correlate=json.get('correlate', list()),
|
|
status=json.get('status', None),
|
|
service=json.get('service', list()),
|
|
group=json.get('group', None),
|
|
value=json.get('value', None),
|
|
text=json.get('text', None),
|
|
tags=json.get('tags', list()),
|
|
attributes=json.get('attributes', dict()),
|
|
origin=json.get('origin', None),
|
|
event_type=json.get('type', None),
|
|
create_time=DateTime.parse(json['createTime']) if 'createTime' in json else None,
|
|
timeout=json.get('timeout', None),
|
|
raw_data=json.get('rawData', None),
|
|
customer=json.get('customer', None)
|
|
)
|
|
|
|
@property
|
|
def serialize(self) -> Dict[str, Any]:
|
|
return {
|
|
'id': self.id,
|
|
'href': absolute_url('/alert/' + self.id),
|
|
'resource': self.resource,
|
|
'event': self.event,
|
|
'environment': self.environment,
|
|
'severity': self.severity,
|
|
'correlate': self.correlate,
|
|
'status': self.status,
|
|
'service': self.service,
|
|
'group': self.group,
|
|
'value': self.value,
|
|
'text': self.text,
|
|
'tags': self.tags,
|
|
'attributes': self.attributes,
|
|
'origin': self.origin,
|
|
'type': self.event_type,
|
|
'createTime': self.create_time,
|
|
'timeout': self.timeout,
|
|
'rawData': self.raw_data,
|
|
'customer': self.customer,
|
|
'duplicateCount': self.duplicate_count,
|
|
'repeat': self.repeat,
|
|
'previousSeverity': self.previous_severity,
|
|
'trendIndication': self.trend_indication,
|
|
'receiveTime': self.receive_time,
|
|
'lastReceiveId': self.last_receive_id,
|
|
'lastReceiveTime': self.last_receive_time,
|
|
'updateTime': self.update_time,
|
|
'history': [h.serialize for h in sorted(self.history, key=lambda x: x.update_time)]
|
|
}
|
|
|
|
def get_id(self, short: bool = False) -> str:
|
|
return self.id[:8] if short else self.id
|
|
|
|
def get_body(self, history: bool = True) -> Dict[str, Any]:
|
|
body = self.serialize
|
|
body.update({
|
|
key: DateTime.iso8601(body[key]) for key in ['createTime', 'lastReceiveTime', 'receiveTime', 'updateTime'] if body[key]
|
|
})
|
|
if not history:
|
|
body['history'] = []
|
|
return body
|
|
|
|
def __repr__(self) -> str:
|
|
return 'Alert(id={!r}, environment={!r}, resource={!r}, event={!r}, severity={!r}, status={!r}, customer={!r})'.format(
|
|
self.id, self.environment, self.resource, self.event, self.severity, self.status, self.customer
|
|
)
|
|
|
|
@classmethod
|
|
def from_document(cls, doc: Dict[str, Any]) -> 'Alert':
|
|
return Alert(
|
|
id=doc.get('id', None) or doc.get('_id'),
|
|
resource=doc.get('resource', None),
|
|
event=doc.get('event', None),
|
|
environment=doc.get('environment', None),
|
|
severity=doc.get('severity', None),
|
|
correlate=doc.get('correlate', list()),
|
|
status=doc.get('status', None),
|
|
service=doc.get('service', list()),
|
|
group=doc.get('group', None),
|
|
value=doc.get('value', None),
|
|
text=doc.get('text', None),
|
|
tags=doc.get('tags', list()),
|
|
attributes=doc.get('attributes', dict()),
|
|
origin=doc.get('origin', None),
|
|
event_type=doc.get('type', None),
|
|
create_time=doc.get('createTime', None),
|
|
timeout=doc.get('timeout', None),
|
|
raw_data=doc.get('rawData', None),
|
|
customer=doc.get('customer', None),
|
|
duplicate_count=doc.get('duplicateCount', None),
|
|
repeat=doc.get('repeat', None),
|
|
previous_severity=doc.get('previousSeverity', None),
|
|
trend_indication=doc.get('trendIndication', None),
|
|
receive_time=doc.get('receiveTime', None),
|
|
last_receive_id=doc.get('lastReceiveId', None),
|
|
last_receive_time=doc.get('lastReceiveTime', None),
|
|
update_time=doc.get('updateTime', None),
|
|
history=[History.from_db(h) for h in doc.get('history', list())]
|
|
)
|
|
|
|
@classmethod
|
|
def from_record(cls, rec) -> 'Alert':
|
|
return Alert(
|
|
id=rec.id,
|
|
resource=rec.resource,
|
|
event=rec.event,
|
|
environment=rec.environment,
|
|
severity=rec.severity,
|
|
correlate=rec.correlate,
|
|
status=rec.status,
|
|
service=rec.service,
|
|
group=rec.group,
|
|
value=rec.value,
|
|
text=rec.text,
|
|
tags=rec.tags,
|
|
attributes=dict(rec.attributes),
|
|
origin=rec.origin,
|
|
event_type=rec.type,
|
|
create_time=rec.create_time,
|
|
timeout=rec.timeout,
|
|
raw_data=rec.raw_data,
|
|
customer=rec.customer,
|
|
duplicate_count=rec.duplicate_count,
|
|
repeat=rec.repeat,
|
|
previous_severity=rec.previous_severity,
|
|
trend_indication=rec.trend_indication,
|
|
receive_time=rec.receive_time,
|
|
last_receive_id=rec.last_receive_id,
|
|
last_receive_time=rec.last_receive_time,
|
|
update_time=getattr(rec, 'update_time'),
|
|
history=[History.from_db(h) for h in rec.history]
|
|
)
|
|
|
|
@classmethod
|
|
def from_db(cls, r: Union[Dict, Tuple]) -> 'Alert':
|
|
if isinstance(r, dict):
|
|
return cls.from_document(r)
|
|
elif isinstance(r, tuple):
|
|
return cls.from_record(r)
|
|
|
|
def is_duplicate(self) -> Optional['Alert']:
|
|
"""Return duplicate alert or None"""
|
|
return Alert.from_db(db.is_duplicate(self))
|
|
|
|
def is_correlated(self) -> Optional['Alert']:
|
|
"""Return correlated alert or None"""
|
|
return Alert.from_db(db.is_correlated(self))
|
|
|
|
def is_flapping(self, window: int = 1800, count: int = 2) -> bool:
|
|
return db.is_flapping(self, window, count)
|
|
|
|
def get_status_and_value(self):
|
|
return [(h.status, h.value) for h in self.get_alert_history(self, page=1, page_size=10) if h.status]
|
|
|
|
def _get_hist_info(self, action=None):
|
|
h_loop = self.get_alert_history(alert=self)
|
|
if not h_loop:
|
|
return None, None, None, None
|
|
|
|
current_status = h_loop[0].status
|
|
current_value = h_loop[0].value
|
|
|
|
if len(h_loop) == 1:
|
|
return current_status, current_value, None, None
|
|
if action == ChangeType.unack:
|
|
find = ChangeType.ack
|
|
elif action == ChangeType.unshelve:
|
|
find = ChangeType.shelve
|
|
else:
|
|
find = None
|
|
|
|
if find:
|
|
for h, h_next in zip(h_loop, h_loop[1:]):
|
|
if h.change_type == find:
|
|
return current_status, current_value, h_next.status, h_next.timeout
|
|
|
|
return current_status, current_value, h_loop[1].status, h_loop[1].timeout
|
|
|
|
# de-duplicate an alert
|
|
def deduplicate(self, duplicate_of) -> 'Alert':
|
|
now = datetime.utcnow()
|
|
|
|
status, previous_value, previous_status, _ = self._get_hist_info()
|
|
|
|
_, new_status = alarm_model.transition(
|
|
alert=self,
|
|
current_status=status,
|
|
previous_status=previous_status
|
|
)
|
|
|
|
self.repeat = True
|
|
self.last_receive_id = self.id
|
|
self.last_receive_time = now
|
|
|
|
if new_status != status:
|
|
r = status_change_hook.send(duplicate_of, status=new_status, text=self.text)
|
|
_, (_, new_status, text) = r[0]
|
|
self.update_time = now
|
|
|
|
history = History(
|
|
id=self.id,
|
|
event=self.event,
|
|
severity=self.severity,
|
|
status=new_status,
|
|
value=self.value,
|
|
text=text,
|
|
change_type=ChangeType.status,
|
|
update_time=self.create_time,
|
|
user=g.login,
|
|
timeout=self.timeout,
|
|
) # type: Optional[History]
|
|
|
|
elif current_app.config['HISTORY_ON_VALUE_CHANGE'] and self.value != previous_value:
|
|
history = History(
|
|
id=self.id,
|
|
event=self.event,
|
|
severity=self.severity,
|
|
status=status,
|
|
value=self.value,
|
|
text=self.text,
|
|
change_type=ChangeType.value,
|
|
update_time=self.create_time,
|
|
user=g.login,
|
|
timeout=self.timeout,
|
|
)
|
|
else:
|
|
history = None
|
|
|
|
self.status = new_status
|
|
return Alert.from_db(db.dedup_alert(self, history))
|
|
|
|
# correlate an alert
|
|
def update(self, correlate_with) -> 'Alert':
|
|
now = datetime.utcnow()
|
|
|
|
self.previous_severity = db.get_severity(self)
|
|
self.trend_indication = alarm_model.trend(self.previous_severity, self.severity)
|
|
|
|
status, _, previous_status, _ = self._get_hist_info()
|
|
|
|
_, new_status = alarm_model.transition(
|
|
alert=self,
|
|
current_status=status,
|
|
previous_status=previous_status
|
|
)
|
|
|
|
self.duplicate_count = 0
|
|
self.repeat = False
|
|
self.receive_time = now
|
|
self.last_receive_id = self.id
|
|
self.last_receive_time = now
|
|
|
|
if new_status != status:
|
|
r = status_change_hook.send(correlate_with, status=new_status, text=self.text)
|
|
_, (_, new_status, text) = r[0]
|
|
self.update_time = now
|
|
else:
|
|
text = self.text
|
|
|
|
history = [History(
|
|
id=self.id,
|
|
event=self.event,
|
|
severity=self.severity,
|
|
status=new_status,
|
|
value=self.value,
|
|
text=text,
|
|
change_type=ChangeType.severity,
|
|
update_time=self.create_time,
|
|
user=g.login,
|
|
timeout=self.timeout
|
|
)]
|
|
|
|
self.status = new_status
|
|
return Alert.from_db(db.correlate_alert(self, history))
|
|
|
|
# create an alert
|
|
def create(self) -> 'Alert':
|
|
now = datetime.utcnow()
|
|
|
|
trend_indication = alarm_model.trend(alarm_model.DEFAULT_PREVIOUS_SEVERITY, self.severity)
|
|
|
|
_, self.status = alarm_model.transition(
|
|
alert=self
|
|
)
|
|
|
|
self.duplicate_count = 0
|
|
self.repeat = False
|
|
self.previous_severity = alarm_model.DEFAULT_PREVIOUS_SEVERITY
|
|
self.trend_indication = trend_indication
|
|
self.receive_time = now
|
|
self.last_receive_id = self.id
|
|
self.last_receive_time = now
|
|
self.update_time = now
|
|
|
|
self.history = [History(
|
|
id=self.id,
|
|
event=self.event,
|
|
severity=self.severity,
|
|
status=self.status,
|
|
value=self.value,
|
|
text=self.text,
|
|
change_type=ChangeType.new,
|
|
update_time=self.create_time,
|
|
user=g.login,
|
|
timeout=self.timeout
|
|
)]
|
|
|
|
return Alert.from_db(db.create_alert(self))
|
|
|
|
# retrieve an alert
|
|
@staticmethod
|
|
def find_by_id(id: str, customers: List[str] = None) -> 'Alert':
|
|
return Alert.from_db(db.get_alert(id, customers))
|
|
|
|
def is_blackout(self) -> bool:
|
|
"""Does the alert create time fall within an existing blackout period?"""
|
|
if not current_app.config['NOTIFICATION_BLACKOUT']:
|
|
if self.severity in current_app.config['BLACKOUT_ACCEPT']:
|
|
return False
|
|
return db.is_blackout_period(self)
|
|
|
|
@property
|
|
def is_suppressed(self) -> bool:
|
|
"""Is the alert status 'blackout'?"""
|
|
return alarm_model.is_suppressed(self)
|
|
|
|
# set alert status
|
|
def set_status(self, status: str, text: str = '', timeout: int = None) -> 'Alert':
|
|
now = datetime.utcnow()
|
|
|
|
timeout = timeout or current_app.config['ALERT_TIMEOUT']
|
|
history = History(
|
|
id=self.id,
|
|
event=self.event,
|
|
severity=self.severity,
|
|
status=status,
|
|
value=self.value,
|
|
text=text,
|
|
change_type=ChangeType.status,
|
|
update_time=now,
|
|
user=g.login,
|
|
timeout=self.timeout
|
|
)
|
|
return db.set_status(self.id, status, timeout, update_time=now, history=history)
|
|
|
|
# tag an alert
|
|
def tag(self, tags: List[str]) -> bool:
|
|
return db.tag_alert(self.id, tags)
|
|
|
|
# untag an alert
|
|
def untag(self, tags: List[str]) -> bool:
|
|
return db.untag_alert(self.id, tags)
|
|
|
|
# update alert tags
|
|
def update_tags(self, tags: List[str]) -> bool:
|
|
return db.update_tags(self.id, list(set(tags)))
|
|
|
|
# update alert attributes
|
|
def update_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]:
|
|
return db.update_attributes(self.id, self.attributes, attributes)
|
|
|
|
# delete an alert
|
|
def delete(self) -> bool:
|
|
return db.delete_alert(self.id)
|
|
|
|
# bulk tag
|
|
@staticmethod
|
|
def tag_find_all(query, tags):
|
|
return db.tag_alerts(query, tags)
|
|
|
|
# bulk untag
|
|
@staticmethod
|
|
def untag_find_all(query, tags):
|
|
return db.untag_alerts(query, tags)
|
|
|
|
# bulk update attributes
|
|
@staticmethod
|
|
def update_attributes_find_all(query, attributes):
|
|
return db.update_attributes_by_query(query, attributes)
|
|
|
|
# bulk delete
|
|
@staticmethod
|
|
def delete_find_all(query=None):
|
|
return db.delete_alerts(query)
|
|
|
|
# search alerts
|
|
@staticmethod
|
|
def find_all(query: Query = None, raw_data: bool = False, history: bool = False, page: int = 1, page_size: int = 1000) -> List['Alert']:
|
|
return [Alert.from_db(alert) for alert in db.get_alerts(query, raw_data, history, page, page_size)]
|
|
|
|
@staticmethod
|
|
def get_alert_history(alert, page=1, page_size=100):
|
|
return [RichHistory.from_db(hist) for hist in db.get_alert_history(alert, page, page_size)]
|
|
|
|
# list alert history
|
|
@staticmethod
|
|
def get_history(query: Query = None, page=1, page_size=1000) -> List[RichHistory]:
|
|
return [RichHistory.from_db(hist) for hist in db.get_history(query, page, page_size)]
|
|
|
|
# get total count
|
|
@staticmethod
|
|
def get_count(query: Query = None) -> Dict[str, Any]:
|
|
return db.get_count(query)
|
|
|
|
# get severity counts
|
|
@staticmethod
|
|
def get_counts_by_severity(query: Query = None) -> Dict[str, Any]:
|
|
return db.get_counts_by_severity(query)
|
|
|
|
# get status counts
|
|
@staticmethod
|
|
def get_counts_by_status(query: Query = None) -> Dict[str, Any]:
|
|
return db.get_counts_by_status(query)
|
|
|
|
# top 10 alerts
|
|
@staticmethod
|
|
def get_top10_count(query: Query = None) -> List[Dict[str, Any]]:
|
|
return Alert.get_topn_count(query, topn=10)
|
|
|
|
@staticmethod
|
|
def get_topn_count(query: Query = None, topn: int = 10) -> List[Dict[str, Any]]:
|
|
return db.get_topn_count(query, topn=topn)
|
|
|
|
# top 10 flapping
|
|
@staticmethod
|
|
def get_top10_flapping(query: Query = None) -> List[Dict[str, Any]]:
|
|
return Alert.get_topn_flapping(topn=10)
|
|
|
|
@staticmethod
|
|
def get_topn_flapping(query: Query = None, topn: int = 10) -> List[Dict[str, Any]]:
|
|
return db.get_topn_flapping(query, topn=topn)
|
|
|
|
# top 10 standing
|
|
@staticmethod
|
|
def get_top10_standing(query: Query = None) -> List[Dict[str, Any]]:
|
|
return Alert.get_topn_standing(topn=10)
|
|
|
|
@staticmethod
|
|
def get_topn_standing(query: Query = None, topn: int = 10) -> List[Dict[str, Any]]:
|
|
return db.get_topn_standing(query, topn=topn)
|
|
|
|
# get environments
|
|
@staticmethod
|
|
def get_environments(query: Query = None) -> List[str]:
|
|
return db.get_environments(query)
|
|
|
|
# get services
|
|
@staticmethod
|
|
def get_services(query: Query = None) -> List[str]:
|
|
return db.get_services(query)
|
|
|
|
# get groups
|
|
@staticmethod
|
|
def get_groups(query: Query = None) -> List[str]:
|
|
return db.get_alert_groups(query)
|
|
|
|
# get tags
|
|
@staticmethod
|
|
def get_tags(query: Query = None) -> List[str]:
|
|
return db.get_alert_tags(query)
|
|
|
|
# add note
|
|
def add_note(self, text: str) -> Note:
|
|
note = Note.from_alert(self, text)
|
|
history = History(
|
|
id=note.id,
|
|
event=self.event,
|
|
severity=self.severity,
|
|
status=self.status,
|
|
value=self.value,
|
|
text=text,
|
|
change_type=ChangeType.note,
|
|
update_time=datetime.utcnow(),
|
|
user=g.login
|
|
)
|
|
db.add_history(self.id, history)
|
|
return note
|
|
|
|
# get notes for alert
|
|
def get_alert_notes(self, page: int = 1, page_size: int = 100) -> List['Note']:
|
|
notes = db.get_alert_notes(self.id, page, page_size)
|
|
return [Note.from_db(note) for note in notes]
|
|
|
|
def delete_note(self, note_id):
|
|
history = History(
|
|
id=note_id,
|
|
event=self.event,
|
|
severity=self.severity,
|
|
status=self.status,
|
|
value=self.value,
|
|
text='note dismissed',
|
|
change_type=ChangeType.dismiss,
|
|
update_time=datetime.utcnow(),
|
|
user=g.login
|
|
)
|
|
db.add_history(self.id, history)
|
|
return Note.delete_by_id(note_id)
|
|
|
|
@staticmethod
|
|
def housekeeping(expired_threshold: int, info_threshold: int) -> Tuple[List['Alert'], List['Alert'], List['Alert']]:
|
|
return (
|
|
[Alert.from_db(alert) for alert in db.get_expired(expired_threshold, info_threshold)],
|
|
[Alert.from_db(alert) for alert in db.get_unshelve()],
|
|
[Alert.from_db(alert) for alert in db.get_unack()]
|
|
)
|
|
|
|
def from_status(self, status: str, text: str = '', timeout: int = None) -> 'Alert':
|
|
now = datetime.utcnow()
|
|
|
|
self.timeout = timeout or current_app.config['ALERT_TIMEOUT']
|
|
history = [History(
|
|
id=self.id,
|
|
event=self.event,
|
|
severity=self.severity,
|
|
status=status,
|
|
value=self.value,
|
|
text=text,
|
|
change_type=ChangeType.status,
|
|
update_time=now,
|
|
user=g.login,
|
|
timeout=self.timeout
|
|
)]
|
|
return Alert.from_db(db.set_alert(
|
|
id=self.id,
|
|
severity=self.severity,
|
|
status=status,
|
|
tags=self.tags,
|
|
attributes=self.attributes,
|
|
timeout=timeout,
|
|
previous_severity=self.previous_severity,
|
|
update_time=now,
|
|
history=history)
|
|
)
|
|
|
|
def from_action(self, action: str, text: str = '', timeout: int = None) -> 'Alert':
|
|
now = datetime.utcnow()
|
|
|
|
status, _, previous_status, previous_timeout = self._get_hist_info(action)
|
|
|
|
if action in [ChangeType.unack, ChangeType.unshelve, ChangeType.timeout]:
|
|
timeout = timeout or previous_timeout
|
|
|
|
if action in [ChangeType.ack, ChangeType.unack]:
|
|
timeout = timeout or current_app.config['ACK_TIMEOUT']
|
|
elif action in [ChangeType.shelve, ChangeType.unshelve]:
|
|
timeout = timeout or current_app.config['SHELVE_TIMEOUT']
|
|
else:
|
|
timeout = timeout or self.timeout or current_app.config['ALERT_TIMEOUT']
|
|
|
|
new_severity, new_status = alarm_model.transition(
|
|
alert=self,
|
|
current_status=status,
|
|
previous_status=previous_status,
|
|
action=action
|
|
)
|
|
|
|
r = status_change_hook.send(self, status=new_status, text=text)
|
|
_, (_, new_status, text) = r[0]
|
|
|
|
try:
|
|
change_type = ChangeType(action)
|
|
except ValueError:
|
|
change_type = ChangeType.action
|
|
|
|
history = [History(
|
|
id=self.id,
|
|
event=self.event,
|
|
severity=new_severity,
|
|
status=new_status,
|
|
value=self.value,
|
|
text=text,
|
|
change_type=change_type,
|
|
update_time=now,
|
|
user=g.login,
|
|
timeout=timeout
|
|
)]
|
|
|
|
return Alert.from_db(db.set_alert(
|
|
id=self.id,
|
|
severity=new_severity,
|
|
status=new_status,
|
|
tags=self.tags,
|
|
attributes=self.attributes,
|
|
timeout=self.timeout,
|
|
previous_severity=self.severity if new_severity != self.severity else self.previous_severity,
|
|
update_time=now,
|
|
history=history)
|
|
)
|
|
|
|
def from_expired(self, text: str = '', timeout: int = None):
|
|
return self.from_action(action='expired', text=text, timeout=timeout)
|
|
|
|
def from_timeout(self, text: str = '', timeout: int = None):
|
|
return self.from_action(action='timeout', text=text, timeout=timeout)
|