mirror of
https://github.com/alerta/alerta.git
synced 2025-01-24 17:29:39 +00:00
7cebc46a09
Must use Alerts queryBuilder and not generic one. Flask login property is needed to create alert history. BG tasks are run in a celery worker so no login property is available. Main process must provide login when scheduling the task.
150 lines
4.4 KiB
Python
150 lines
4.4 KiB
Python
from flask import g, jsonify, request
|
|
from flask_cors import cross_origin
|
|
|
|
from alerta.app import qb
|
|
from alerta.auth.decorators import permission
|
|
from alerta.exceptions import ApiError, RejectException
|
|
from alerta.models.alert import Alert
|
|
from alerta.models.enums import Scope
|
|
from alerta.models.metrics import timer
|
|
from alerta.utils.api import process_status
|
|
from alerta.utils.response import absolute_url, jsonp
|
|
from alerta.views.alerts import (attrs_timer, delete_timer, status_timer,
|
|
tag_timer)
|
|
|
|
from . import api
|
|
|
|
|
|
@api.route('/_bulk/task/<task_id>', methods=['OPTIONS', 'GET'])
|
|
@cross_origin()
|
|
@permission(Scope.read_alerts)
|
|
@timer(status_timer)
|
|
@jsonp
|
|
def task_status(task_id):
|
|
from alerta.tasks import action_alerts
|
|
task = action_alerts.AsyncResult(task_id)
|
|
|
|
return jsonify(status=task.status.lower(), id=task.id)
|
|
|
|
|
|
@api.route('/_bulk/alerts/status', methods=['OPTIONS', 'PUT'])
|
|
@cross_origin()
|
|
@permission(Scope.write_alerts)
|
|
@timer(status_timer)
|
|
@jsonp
|
|
def bulk_set_status():
|
|
status = request.json.get('status', None)
|
|
text = request.json.get('text', 'bulk status update')
|
|
timeout = request.json.get('timeout', None)
|
|
|
|
if not status:
|
|
raise ApiError("must supply 'status' as json data", 400)
|
|
|
|
query = qb.alerts.from_params(request.args)
|
|
alerts = Alert.find_all(query)
|
|
|
|
if not alerts:
|
|
raise ApiError('not found', 404)
|
|
|
|
updated = []
|
|
errors = []
|
|
for alert in alerts:
|
|
try:
|
|
alert, status, text = process_status(alert, status, text)
|
|
except RejectException as e:
|
|
errors.append(str(e))
|
|
continue
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
continue
|
|
|
|
if alert.set_status(status, text, timeout):
|
|
updated.append(alert.id)
|
|
|
|
if errors:
|
|
raise ApiError('failed to bulk set alert status', 500, errors=errors)
|
|
else:
|
|
return jsonify(status='ok', updated=updated, count=len(updated))
|
|
|
|
|
|
@api.route('/_bulk/alerts/action', methods=['OPTIONS', 'PUT'])
|
|
@cross_origin()
|
|
@permission(Scope.write_alerts)
|
|
@timer(status_timer)
|
|
@jsonp
|
|
def bulk_action_alert():
|
|
from alerta.tasks import action_alerts
|
|
|
|
action = request.json.get('action', None)
|
|
text = request.json.get('text', 'bulk status update')
|
|
timeout = request.json.get('timeout', None)
|
|
|
|
if not action:
|
|
raise ApiError("must supply 'action' as json data", 400)
|
|
|
|
query = qb.alerts.from_params(request.args)
|
|
alerts = [alert.id for alert in Alert.find_all(query)]
|
|
|
|
if not alerts:
|
|
raise ApiError('not found', 404)
|
|
|
|
task = action_alerts.delay(alerts, action, text, timeout, g.login)
|
|
|
|
return jsonify(status='ok', message=f'{len(alerts)} alerts queued for action'), 202, {'Location': absolute_url('/_bulk/task/' + task.id)}
|
|
|
|
|
|
@api.route('/_bulk/alerts/tag', methods=['OPTIONS', 'PUT'])
|
|
@cross_origin()
|
|
@permission(Scope.write_alerts)
|
|
@timer(tag_timer)
|
|
@jsonp
|
|
def bulk_tag_alert():
|
|
if not request.json.get('tags', None):
|
|
raise ApiError("must supply 'tags' as json list")
|
|
|
|
query = qb.alerts.from_params(request.args)
|
|
updated = Alert.tag_find_all(query, tags=request.json['tags'])
|
|
|
|
return jsonify(status='ok', updated=updated, count=len(updated))
|
|
|
|
|
|
@api.route('/_bulk/alerts/untag', methods=['OPTIONS', 'PUT'])
|
|
@cross_origin()
|
|
@permission(Scope.write_alerts)
|
|
@timer(tag_timer)
|
|
@jsonp
|
|
def bulk_untag_alert():
|
|
if not request.json.get('tags', None):
|
|
raise ApiError("must supply 'tags' as json list")
|
|
|
|
query = qb.alerts.from_params(request.args)
|
|
updated = Alert.untag_find_all(query, tags=request.json['tags'])
|
|
|
|
return jsonify(status='ok', updated=updated, count=len(updated))
|
|
|
|
|
|
@api.route('/_bulk/alerts/attributes', methods=['OPTIONS', 'PUT'])
|
|
@cross_origin()
|
|
@permission(Scope.write_alerts)
|
|
@timer(attrs_timer)
|
|
@jsonp
|
|
def bulk_update_attributes():
|
|
if not request.json.get('attributes', None):
|
|
raise ApiError("must supply 'attributes' as json data", 400)
|
|
|
|
query = qb.alerts.from_params(request.args)
|
|
updated = Alert.update_attributes_find_all(query, request.json['attributes'])
|
|
|
|
return jsonify(status='ok', updated=updated, count=len(updated))
|
|
|
|
|
|
@api.route('/_bulk/alerts', methods=['OPTIONS', 'DELETE'])
|
|
@cross_origin()
|
|
@permission(Scope.write_alerts)
|
|
@timer(delete_timer)
|
|
@jsonp
|
|
def bulk_delete_alert():
|
|
query = qb.alerts.from_params(request.args)
|
|
deleted = Alert.delete_find_all(query)
|
|
|
|
return jsonify(status='ok', deleted=deleted, count=len(deleted))
|