0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-24 01:10:24 +00:00
alerta_alerta/alerta/tasks.py
Víctor M. García Muñoz 7cebc46a09
fix: Bulk operations did not work properly (#1825)
Must use Alerts queryBuilder and not generic one.
Flask login property is needed to create alert history. BG tasks
are run in a celery worker so no login property is available.
Main process must provide login when scheduling the task.
2023-03-09 21:51:56 +01:00

46 lines
1.4 KiB
Python

from typing import List, Optional
from flask import g
from alerta.app import create_celery_app
from alerta.exceptions import InvalidAction, RejectException
from alerta.models.alert import Alert
from alerta.utils.api import process_action, process_status
celery = create_celery_app()
@celery.task
def action_alerts(alerts: List[str], action: str, text: str, timeout: Optional[int], login: str) -> None:
updated = []
errors = []
for alert_id in alerts:
alert = Alert.find_by_id(alert_id)
try:
g.login = login
previous_status = alert.status
alert, action, text, timeout = process_action(alert, action, text, timeout)
alert = alert.from_action(action, text, timeout)
except RejectException as e:
errors.append(str(e))
continue
except InvalidAction as e:
errors.append(str(e))
continue
except Exception as e:
errors.append(str(e))
continue
if previous_status != alert.status:
try:
alert, status, text = process_status(alert, alert.status, text)
alert = alert.from_status(status, text, timeout)
except RejectException as e:
errors.append(str(e))
continue
except Exception as e:
errors.append(str(e))
continue
updated.append(alert.id)