mirror of
https://github.com/alerta/alerta.git
synced 2025-01-24 09:19:40 +00:00
7cebc46a09
Must use Alerts queryBuilder and not generic one. Flask login property is needed to create alert history. BG tasks are run in a celery worker so no login property is available. Main process must provide login when scheduling the task.
46 lines
1.4 KiB
Python
46 lines
1.4 KiB
Python
from typing import List, Optional
|
|
|
|
from flask import g
|
|
|
|
from alerta.app import create_celery_app
|
|
from alerta.exceptions import InvalidAction, RejectException
|
|
from alerta.models.alert import Alert
|
|
from alerta.utils.api import process_action, process_status
|
|
|
|
celery = create_celery_app()
|
|
|
|
|
|
@celery.task
|
|
def action_alerts(alerts: List[str], action: str, text: str, timeout: Optional[int], login: str) -> None:
|
|
updated = []
|
|
errors = []
|
|
for alert_id in alerts:
|
|
alert = Alert.find_by_id(alert_id)
|
|
|
|
try:
|
|
g.login = login
|
|
previous_status = alert.status
|
|
alert, action, text, timeout = process_action(alert, action, text, timeout)
|
|
alert = alert.from_action(action, text, timeout)
|
|
except RejectException as e:
|
|
errors.append(str(e))
|
|
continue
|
|
except InvalidAction as e:
|
|
errors.append(str(e))
|
|
continue
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
continue
|
|
|
|
if previous_status != alert.status:
|
|
try:
|
|
alert, status, text = process_status(alert, alert.status, text)
|
|
alert = alert.from_status(status, text, timeout)
|
|
except RejectException as e:
|
|
errors.append(str(e))
|
|
continue
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
continue
|
|
|
|
updated.append(alert.id)
|