mirror of
https://github.com/alerta/alerta.git
synced 2025-01-24 17:29:39 +00:00
d50131b9e6
This reverts commit 4664cd2d88
.
1661 lines
69 KiB
Python
1661 lines
69 KiB
Python
import threading
|
|
import time
|
|
from collections import defaultdict, namedtuple
|
|
from datetime import datetime
|
|
|
|
import psycopg2
|
|
from flask import current_app
|
|
from psycopg2.extensions import AsIs, adapt, register_adapter
|
|
from psycopg2.extras import Json, NamedTupleCursor, register_composite
|
|
|
|
from alerta.app import alarm_model
|
|
from alerta.database.base import Database
|
|
from alerta.exceptions import NoCustomerMatch
|
|
from alerta.models.enums import ADMIN_SCOPES
|
|
from alerta.models.heartbeat import HeartbeatStatus
|
|
from alerta.utils.format import DateTime
|
|
from alerta.utils.response import absolute_url
|
|
|
|
from .utils import Query
|
|
|
|
MAX_RETRIES = 5
|
|
|
|
|
|
class HistoryAdapter:
|
|
def __init__(self, history):
|
|
self.history = history
|
|
self.conn = None
|
|
|
|
def prepare(self, conn):
|
|
self.conn = conn
|
|
|
|
def getquoted(self):
|
|
def quoted(o):
|
|
a = adapt(o)
|
|
if hasattr(a, 'prepare'):
|
|
a.prepare(self.conn)
|
|
return a.getquoted().decode('utf-8')
|
|
|
|
return '({}, {}, {}, {}, {}, {}, {}, {}::timestamp, {}, {})::history'.format(
|
|
quoted(self.history.id),
|
|
quoted(self.history.event),
|
|
quoted(self.history.severity),
|
|
quoted(self.history.status),
|
|
quoted(self.history.value),
|
|
quoted(self.history.text),
|
|
quoted(self.history.change_type),
|
|
quoted(self.history.update_time),
|
|
quoted(self.history.user),
|
|
quoted(self.history.timeout)
|
|
)
|
|
|
|
def __str__(self):
|
|
return str(self.getquoted())
|
|
|
|
|
|
Record = namedtuple('Record', [
|
|
'id', 'resource', 'event', 'environment', 'severity', 'status', 'service',
|
|
'group', 'value', 'text', 'tags', 'attributes', 'origin', 'update_time',
|
|
'user', 'timeout', 'type', 'customer'
|
|
])
|
|
|
|
|
|
class Backend(Database):
|
|
|
|
def create_engine(self, app, uri, dbname=None, schema='public', raise_on_error=True):
|
|
self.uri = uri
|
|
self.dbname = dbname
|
|
self.schema = schema
|
|
|
|
lock = threading.Lock()
|
|
with lock:
|
|
conn = self.connect()
|
|
|
|
with app.open_resource('sql/schema.sql') as f:
|
|
try:
|
|
conn.cursor().execute(f.read())
|
|
conn.commit()
|
|
except Exception as e:
|
|
if raise_on_error:
|
|
raise
|
|
app.logger.warning(e)
|
|
|
|
register_adapter(dict, Json)
|
|
register_adapter(datetime, self._adapt_datetime)
|
|
register_composite(
|
|
schema + '.history' if schema else 'history',
|
|
conn,
|
|
globally=True
|
|
)
|
|
from alerta.models.alert import History
|
|
register_adapter(History, HistoryAdapter)
|
|
|
|
def connect(self):
|
|
retry = 0
|
|
while True:
|
|
try:
|
|
conn = psycopg2.connect(
|
|
dsn=self.uri,
|
|
dbname=self.dbname,
|
|
cursor_factory=NamedTupleCursor
|
|
)
|
|
|
|
conn.set_client_encoding('UTF8')
|
|
break
|
|
except Exception as e:
|
|
print(e) # FIXME - should log this error instead of printing, but current_app is unavailable here
|
|
retry += 1
|
|
if retry > MAX_RETRIES:
|
|
conn = None
|
|
break
|
|
else:
|
|
backoff = 2 ** retry
|
|
print(f'Retry attempt {retry}/{MAX_RETRIES} (wait={backoff}s)...')
|
|
time.sleep(backoff)
|
|
|
|
if conn:
|
|
conn.cursor().execute('SET search_path TO {}'.format(self.schema))
|
|
conn.commit()
|
|
return conn
|
|
else:
|
|
raise RuntimeError(f'Database connect error. Failed to connect after {MAX_RETRIES} retries.')
|
|
|
|
@staticmethod
|
|
def _adapt_datetime(dt):
|
|
return AsIs(f'{adapt(DateTime.iso8601(dt))}')
|
|
|
|
@property
|
|
def name(self):
|
|
cursor = self.get_db().cursor()
|
|
cursor.execute('SELECT current_database()')
|
|
return cursor.fetchone()[0]
|
|
|
|
@property
|
|
def version(self):
|
|
cursor = self.get_db().cursor()
|
|
cursor.execute('SHOW server_version')
|
|
return cursor.fetchone()[0]
|
|
|
|
@property
|
|
def is_alive(self):
|
|
cursor = self.get_db().cursor()
|
|
cursor.execute('SELECT true')
|
|
return cursor.fetchone()
|
|
|
|
def close(self, db):
|
|
db.close()
|
|
|
|
def destroy(self):
|
|
conn = self.connect()
|
|
cursor = conn.cursor()
|
|
for table in ['alerts', 'blackouts', 'customers', 'groups', 'heartbeats', 'keys', 'metrics', 'perms', 'users']:
|
|
cursor.execute(f'DROP TABLE IF EXISTS {table}')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# ALERTS
|
|
|
|
def get_severity(self, alert):
|
|
select = """
|
|
SELECT severity FROM alerts
|
|
WHERE environment=%(environment)s AND resource=%(resource)s
|
|
AND ((event=%(event)s AND severity!=%(severity)s)
|
|
OR (event!=%(event)s AND %(event)s=ANY(correlate)))
|
|
AND {customer}
|
|
""".format(customer='customer=%(customer)s' if alert.customer else 'customer IS NULL')
|
|
return self._fetchone(select, vars(alert)).severity
|
|
|
|
def get_status(self, alert):
|
|
select = """
|
|
SELECT status FROM alerts
|
|
WHERE environment=%(environment)s AND resource=%(resource)s
|
|
AND (event=%(event)s OR %(event)s=ANY(correlate))
|
|
AND {customer}
|
|
""".format(customer='customer=%(customer)s' if alert.customer else 'customer IS NULL')
|
|
return self._fetchone(select, vars(alert)).status
|
|
|
|
def is_duplicate(self, alert):
|
|
select = """
|
|
SELECT * FROM alerts
|
|
WHERE environment=%(environment)s
|
|
AND resource=%(resource)s
|
|
AND event=%(event)s
|
|
AND severity=%(severity)s
|
|
AND {customer}
|
|
""".format(customer='customer=%(customer)s' if alert.customer else 'customer IS NULL')
|
|
return self._fetchone(select, vars(alert))
|
|
|
|
def is_correlated(self, alert):
|
|
select = """
|
|
SELECT * FROM alerts
|
|
WHERE environment=%(environment)s AND resource=%(resource)s
|
|
AND ((event=%(event)s AND severity!=%(severity)s)
|
|
OR (event!=%(event)s AND %(event)s=ANY(correlate)))
|
|
AND {customer}
|
|
""".format(customer='customer=%(customer)s' if alert.customer else 'customer IS NULL')
|
|
return self._fetchone(select, vars(alert))
|
|
|
|
def is_flapping(self, alert, window=1800, count=2):
|
|
"""
|
|
Return true if alert severity has changed more than X times in Y seconds
|
|
"""
|
|
select = """
|
|
SELECT COUNT(*)
|
|
FROM alerts, unnest(history) h
|
|
WHERE environment=%(environment)s
|
|
AND resource=%(resource)s
|
|
AND h.event=%(event)s
|
|
AND h.update_time > (NOW() at time zone 'utc' - INTERVAL '{window} seconds')
|
|
AND h.type='severity'
|
|
AND {customer}
|
|
""".format(window=window, customer='customer=%(customer)s' if alert.customer else 'customer IS NULL')
|
|
return self._fetchone(select, vars(alert)).count > count
|
|
|
|
def dedup_alert(self, alert, history):
|
|
"""
|
|
Update alert status, service, value, text, timeout and rawData, increment duplicate count and set
|
|
repeat=True, and keep track of last receive id and time but don't append to history unless status changes.
|
|
"""
|
|
alert.history = history
|
|
update = """
|
|
UPDATE alerts
|
|
SET status=%(status)s, service=%(service)s, value=%(value)s, text=%(text)s,
|
|
timeout=%(timeout)s, raw_data=%(raw_data)s, repeat=%(repeat)s,
|
|
last_receive_id=%(last_receive_id)s, last_receive_time=%(last_receive_time)s,
|
|
tags=ARRAY(SELECT DISTINCT UNNEST(tags || %(tags)s)), attributes=attributes || %(attributes)s,
|
|
duplicate_count=duplicate_count + 1, {update_time}, history=(%(history)s || history)[1:{limit}]
|
|
WHERE environment=%(environment)s
|
|
AND resource=%(resource)s
|
|
AND event=%(event)s
|
|
AND severity=%(severity)s
|
|
AND {customer}
|
|
RETURNING *
|
|
""".format(
|
|
limit=current_app.config['HISTORY_LIMIT'],
|
|
update_time='update_time=%(update_time)s' if alert.update_time else 'update_time=update_time',
|
|
customer='customer=%(customer)s' if alert.customer else 'customer IS NULL'
|
|
)
|
|
return self._updateone(update, vars(alert), returning=True)
|
|
|
|
def correlate_alert(self, alert, history):
|
|
alert.history = history
|
|
update = """
|
|
UPDATE alerts
|
|
SET event=%(event)s, severity=%(severity)s, status=%(status)s, service=%(service)s, value=%(value)s,
|
|
text=%(text)s, create_time=%(create_time)s, timeout=%(timeout)s, raw_data=%(raw_data)s,
|
|
duplicate_count=%(duplicate_count)s, repeat=%(repeat)s, previous_severity=%(previous_severity)s,
|
|
trend_indication=%(trend_indication)s, receive_time=%(receive_time)s, last_receive_id=%(last_receive_id)s,
|
|
last_receive_time=%(last_receive_time)s, tags=ARRAY(SELECT DISTINCT UNNEST(tags || %(tags)s)),
|
|
attributes=attributes || %(attributes)s, {update_time}, history=(%(history)s || history)[1:{limit}]
|
|
WHERE environment=%(environment)s
|
|
AND resource=%(resource)s
|
|
AND ((event=%(event)s AND severity!=%(severity)s) OR (event!=%(event)s AND %(event)s=ANY(correlate)))
|
|
AND {customer}
|
|
RETURNING *
|
|
""".format(
|
|
limit=current_app.config['HISTORY_LIMIT'],
|
|
update_time='update_time=%(update_time)s' if alert.update_time else 'update_time=update_time',
|
|
customer='customer=%(customer)s' if alert.customer else 'customer IS NULL'
|
|
)
|
|
return self._updateone(update, vars(alert), returning=True)
|
|
|
|
def create_alert(self, alert):
|
|
insert = """
|
|
INSERT INTO alerts (id, resource, event, environment, severity, correlate, status, service, "group",
|
|
value, text, tags, attributes, origin, type, create_time, timeout, raw_data, customer,
|
|
duplicate_count, repeat, previous_severity, trend_indication, receive_time, last_receive_id,
|
|
last_receive_time, update_time, history)
|
|
VALUES (%(id)s, %(resource)s, %(event)s, %(environment)s, %(severity)s, %(correlate)s, %(status)s,
|
|
%(service)s, %(group)s, %(value)s, %(text)s, %(tags)s, %(attributes)s, %(origin)s,
|
|
%(event_type)s, %(create_time)s, %(timeout)s, %(raw_data)s, %(customer)s, %(duplicate_count)s,
|
|
%(repeat)s, %(previous_severity)s, %(trend_indication)s, %(receive_time)s, %(last_receive_id)s,
|
|
%(last_receive_time)s, %(update_time)s, %(history)s::history[])
|
|
RETURNING *
|
|
"""
|
|
return self._insert(insert, vars(alert))
|
|
|
|
def set_alert(self, id, severity, status, tags, attributes, timeout, previous_severity, update_time, history=None):
|
|
update = """
|
|
UPDATE alerts
|
|
SET severity=%(severity)s, status=%(status)s, tags=ARRAY(SELECT DISTINCT UNNEST(tags || %(tags)s)),
|
|
attributes=%(attributes)s, timeout=%(timeout)s, previous_severity=%(previous_severity)s,
|
|
update_time=%(update_time)s, history=(%(change)s || history)[1:{limit}]
|
|
WHERE id=%(id)s OR id LIKE %(like_id)s
|
|
RETURNING *
|
|
""".format(limit=current_app.config['HISTORY_LIMIT'])
|
|
return self._updateone(update, {'id': id, 'like_id': id + '%', 'severity': severity, 'status': status,
|
|
'tags': tags, 'attributes': attributes, 'timeout': timeout,
|
|
'previous_severity': previous_severity, 'update_time': update_time,
|
|
'change': history}, returning=True)
|
|
|
|
def get_alert(self, id, customers=None):
|
|
select = """
|
|
SELECT * FROM alerts
|
|
WHERE (id ~* (%(id)s) OR last_receive_id ~* (%(id)s))
|
|
AND {customer}
|
|
""".format(customer='customer=ANY(%(customers)s)' if customers else '1=1')
|
|
return self._fetchone(select, {'id': '^' + id, 'customers': customers})
|
|
|
|
# STATUS, TAGS, ATTRIBUTES
|
|
|
|
def set_status(self, id, status, timeout, update_time, history=None):
|
|
update = """
|
|
UPDATE alerts
|
|
SET status=%(status)s, timeout=%(timeout)s, update_time=%(update_time)s, history=(%(change)s || history)[1:{limit}]
|
|
WHERE id=%(id)s OR id LIKE %(like_id)s
|
|
RETURNING *
|
|
""".format(limit=current_app.config['HISTORY_LIMIT'])
|
|
return self._updateone(update, {'id': id, 'like_id': id + '%', 'status': status, 'timeout': timeout, 'update_time': update_time, 'change': history}, returning=True)
|
|
|
|
def tag_alert(self, id, tags):
|
|
update = """
|
|
UPDATE alerts
|
|
SET tags=ARRAY(SELECT DISTINCT UNNEST(tags || %(tags)s))
|
|
WHERE id=%(id)s OR id LIKE %(like_id)s
|
|
RETURNING *
|
|
"""
|
|
return self._updateone(update, {'id': id, 'like_id': id + '%', 'tags': tags}, returning=True)
|
|
|
|
def untag_alert(self, id, tags):
|
|
update = """
|
|
UPDATE alerts
|
|
SET tags=(select array_agg(t) FROM unnest(tags) AS t WHERE NOT t=ANY(%(tags)s) )
|
|
WHERE id=%(id)s OR id LIKE %(like_id)s
|
|
RETURNING *
|
|
"""
|
|
return self._updateone(update, {'id': id, 'like_id': id + '%', 'tags': tags}, returning=True)
|
|
|
|
def update_tags(self, id, tags):
|
|
update = """
|
|
UPDATE alerts
|
|
SET tags=%(tags)s
|
|
WHERE id=%(id)s OR id LIKE %(like_id)s
|
|
RETURNING *
|
|
"""
|
|
return self._updateone(update, {'id': id, 'like_id': id + '%', 'tags': tags}, returning=True)
|
|
|
|
def update_attributes(self, id, old_attrs, new_attrs):
|
|
old_attrs.update(new_attrs)
|
|
attrs = {k: v for k, v in old_attrs.items() if v is not None}
|
|
|
|
update = """
|
|
UPDATE alerts
|
|
SET attributes=%(attrs)s
|
|
WHERE id=%(id)s OR id LIKE %(like_id)s
|
|
RETURNING attributes
|
|
"""
|
|
return self._updateone(update, {'id': id, 'like_id': id + '%', 'attrs': attrs}, returning=True).attributes
|
|
|
|
def delete_alert(self, id):
|
|
delete = """
|
|
DELETE FROM alerts
|
|
WHERE id=%(id)s OR id LIKE %(like_id)s
|
|
RETURNING id
|
|
"""
|
|
return self._deleteone(delete, {'id': id, 'like_id': id + '%'}, returning=True)
|
|
|
|
# BULK
|
|
|
|
def tag_alerts(self, query=None, tags=None):
|
|
query = query or Query()
|
|
update = f"""
|
|
UPDATE alerts
|
|
SET tags=ARRAY(SELECT DISTINCT UNNEST(tags || %(_tags)s))
|
|
WHERE {query.where}
|
|
RETURNING id
|
|
"""
|
|
return [row[0] for row in self._updateall(update, {**query.vars, **{'_tags': tags}}, returning=True)]
|
|
|
|
def untag_alerts(self, query=None, tags=None):
|
|
query = query or Query()
|
|
update = """
|
|
UPDATE alerts
|
|
SET tags=(select array_agg(t) FROM unnest(tags) AS t WHERE NOT t=ANY(%(_tags)s) )
|
|
WHERE {where}
|
|
RETURNING id
|
|
""".format(where=query.where)
|
|
return [row[0] for row in self._updateall(update, {**query.vars, **{'_tags': tags}}, returning=True)]
|
|
|
|
def update_attributes_by_query(self, query=None, attributes=None):
|
|
update = f"""
|
|
UPDATE alerts
|
|
SET attributes=attributes || %(_attributes)s
|
|
WHERE {query.where}
|
|
RETURNING id
|
|
"""
|
|
return [row[0] for row in self._updateall(update, {**query.vars, **{'_attributes': attributes}}, returning=True)]
|
|
|
|
def delete_alerts(self, query=None):
|
|
query = query or Query()
|
|
delete = f"""
|
|
DELETE FROM alerts
|
|
WHERE {query.where}
|
|
RETURNING id
|
|
"""
|
|
return [row[0] for row in self._deleteall(delete, query.vars, returning=True)]
|
|
|
|
# SEARCH & HISTORY
|
|
|
|
def add_history(self, id, history):
|
|
update = """
|
|
UPDATE alerts
|
|
SET history=(%(history)s || history)[1:{limit}]
|
|
WHERE id=%(id)s OR id LIKE %(like_id)s
|
|
RETURNING *
|
|
""".format(limit=current_app.config['HISTORY_LIMIT'])
|
|
return self._updateone(update, {'id': id, 'like_id': id + '%', 'history': history}, returning=True)
|
|
|
|
def get_alerts(self, query=None, raw_data=False, history=False, page=None, page_size=None):
|
|
query = query or Query()
|
|
if raw_data and history:
|
|
select = '*'
|
|
else:
|
|
select = (
|
|
'id, resource, event, environment, severity, correlate, status, service, "group", value, "text",'
|
|
+ 'tags, attributes, origin, type, create_time, timeout, {raw_data}, customer, duplicate_count, repeat,'
|
|
+ 'previous_severity, trend_indication, receive_time, last_receive_id, last_receive_time, update_time,'
|
|
+ '{history}'
|
|
).format(
|
|
raw_data='raw_data' if raw_data else 'NULL as raw_data',
|
|
history='history' if history else 'array[]::history[] as history'
|
|
)
|
|
|
|
join = ''
|
|
if 's.code' in query.sort:
|
|
join += 'JOIN (VALUES {}) AS s(sev, code) ON alerts.severity = s.sev '.format(
|
|
', '.join((f"('{k}', {v})" for k, v in alarm_model.Severity.items()))
|
|
)
|
|
if 'st.state' in query.sort:
|
|
join += 'JOIN (VALUES {}) AS st(sts, state) ON alerts.status = st.sts '.format(
|
|
', '.join((f"('{k}', '{v}')" for k, v in alarm_model.Status.items()))
|
|
)
|
|
select = f"""
|
|
SELECT {select}
|
|
FROM alerts {join}
|
|
WHERE {query.where}
|
|
ORDER BY {query.sort or 'last_receive_time'}
|
|
"""
|
|
return self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_alert_history(self, alert, page=None, page_size=None):
|
|
select = """
|
|
SELECT resource, environment, service, "group", tags, attributes, origin, customer, h.*
|
|
FROM alerts, unnest(history[1:{limit}]) h
|
|
WHERE environment=%(environment)s AND resource=%(resource)s
|
|
AND (h.event=%(event)s OR %(event)s=ANY(correlate))
|
|
AND {customer}
|
|
ORDER BY update_time DESC
|
|
""".format(
|
|
customer='customer=%(customer)s' if alert.customer else 'customer IS NULL',
|
|
limit=current_app.config['HISTORY_LIMIT']
|
|
)
|
|
return [
|
|
Record(
|
|
id=h.id,
|
|
resource=h.resource,
|
|
event=h.event,
|
|
environment=h.environment,
|
|
severity=h.severity,
|
|
status=h.status,
|
|
service=h.service,
|
|
group=h.group,
|
|
value=h.value,
|
|
text=h.text,
|
|
tags=h.tags,
|
|
attributes=h.attributes,
|
|
origin=h.origin,
|
|
update_time=h.update_time,
|
|
user=getattr(h, 'user', None),
|
|
timeout=getattr(h, 'timeout', None),
|
|
type=h.type,
|
|
customer=h.customer
|
|
) for h in self._fetchall(select, vars(alert), limit=page_size, offset=(page - 1) * page_size)
|
|
]
|
|
|
|
def get_history(self, query=None, page=None, page_size=None):
|
|
query = query or Query()
|
|
if 'id' in query.vars:
|
|
select = """
|
|
SELECT a.id
|
|
FROM alerts a, unnest(history[1:{limit}]) h
|
|
WHERE h.id LIKE %(id)s
|
|
""".format(limit=current_app.config['HISTORY_LIMIT'])
|
|
query.vars['id'] = self._fetchone(select, query.vars)
|
|
|
|
select = """
|
|
SELECT resource, environment, service, "group", tags, attributes, origin, customer, history, h.*
|
|
FROM alerts, unnest(history[1:{limit}]) h
|
|
WHERE {where}
|
|
ORDER BY update_time DESC
|
|
""".format(where=query.where, limit=current_app.config['HISTORY_LIMIT'])
|
|
|
|
return [
|
|
Record(
|
|
id=h.id,
|
|
resource=h.resource,
|
|
event=h.event,
|
|
environment=h.environment,
|
|
severity=h.severity,
|
|
status=h.status,
|
|
service=h.service,
|
|
group=h.group,
|
|
value=h.value,
|
|
text=h.text,
|
|
tags=h.tags,
|
|
attributes=h.attributes,
|
|
origin=h.origin,
|
|
update_time=h.update_time,
|
|
user=getattr(h, 'user', None),
|
|
timeout=getattr(h, 'timeout', None),
|
|
type=h.type,
|
|
customer=h.customer
|
|
) for h in self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
]
|
|
|
|
# COUNTS
|
|
|
|
def get_count(self, query=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT COUNT(1) FROM alerts
|
|
WHERE {query.where}
|
|
"""
|
|
return self._fetchone(select, query.vars).count
|
|
|
|
def get_counts(self, query=None, group=None):
|
|
query = query or Query()
|
|
if group is None:
|
|
raise ValueError('Must define a group')
|
|
select = """
|
|
SELECT {group}, COUNT(*) FROM alerts
|
|
WHERE {where}
|
|
GROUP BY {group}
|
|
""".format(where=query.where, group=group)
|
|
return {s['group']: s.count for s in self._fetchall(select, query.vars)}
|
|
|
|
def get_counts_by_severity(self, query=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT severity, COUNT(*) FROM alerts
|
|
WHERE {query.where}
|
|
GROUP BY severity
|
|
"""
|
|
return {s.severity: s.count for s in self._fetchall(select, query.vars)}
|
|
|
|
def get_counts_by_status(self, query=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT status, COUNT(*) FROM alerts
|
|
WHERE {query.where}
|
|
GROUP BY status
|
|
"""
|
|
return {s.status: s.count for s in self._fetchall(select, query.vars)}
|
|
|
|
def get_topn_count(self, query=None, topn=100):
|
|
query = query or Query()
|
|
group = 'event'
|
|
if query and query.group:
|
|
group = query.group[0]
|
|
|
|
select = """
|
|
SELECT {group}, COUNT(1) as count, SUM(duplicate_count) AS duplicate_count,
|
|
array_agg(DISTINCT environment) AS environments, array_agg(DISTINCT svc) AS services,
|
|
array_agg(DISTINCT ARRAY[id, resource]) AS resources
|
|
FROM alerts, UNNEST (service) svc
|
|
WHERE {where}
|
|
GROUP BY {group}
|
|
ORDER BY count DESC
|
|
""".format(where=query.where, group=group)
|
|
return [
|
|
{
|
|
'count': t.count,
|
|
'duplicateCount': t.duplicate_count,
|
|
'environments': t.environments,
|
|
'services': t.services,
|
|
group: getattr(t, group),
|
|
'resources': [{'id': r[0], 'resource': r[1], 'href': absolute_url(f'/alert/{r[0]}')} for r in t.resources]
|
|
} for t in self._fetchall(select, query.vars, limit=topn)
|
|
]
|
|
|
|
def get_topn_flapping(self, query=None, topn=100):
|
|
query = query or Query()
|
|
group = 'event'
|
|
if query and query.group:
|
|
group = query.group[0]
|
|
select = """
|
|
WITH topn AS (SELECT * FROM alerts WHERE {where})
|
|
SELECT topn.{group}, COUNT(1) as count, SUM(duplicate_count) AS duplicate_count,
|
|
array_agg(DISTINCT environment) AS environments, array_agg(DISTINCT svc) AS services,
|
|
array_agg(DISTINCT ARRAY[topn.id, resource]) AS resources
|
|
FROM topn, UNNEST (service) svc, UNNEST (history) hist
|
|
WHERE hist.type='severity'
|
|
GROUP BY topn.{group}
|
|
ORDER BY count DESC
|
|
""".format(where=query.where, group=group)
|
|
return [
|
|
{
|
|
'count': t.count,
|
|
'duplicateCount': t.duplicate_count,
|
|
'environments': t.environments,
|
|
'services': t.services,
|
|
group: getattr(t, group),
|
|
'resources': [{'id': r[0], 'resource': r[1], 'href': absolute_url(f'/alert/{r[0]}')} for r in t.resources]
|
|
} for t in self._fetchall(select, query.vars, limit=topn)
|
|
]
|
|
|
|
def get_topn_standing(self, query=None, topn=100):
|
|
query = query or Query()
|
|
group = 'event'
|
|
if query and query.group:
|
|
group = query.group[0]
|
|
select = """
|
|
WITH topn AS (SELECT * FROM alerts WHERE {where})
|
|
SELECT topn.{group}, COUNT(1) as count, SUM(duplicate_count) AS duplicate_count,
|
|
SUM(last_receive_time - create_time) as life_time,
|
|
array_agg(DISTINCT environment) AS environments, array_agg(DISTINCT svc) AS services,
|
|
array_agg(DISTINCT ARRAY[topn.id, resource]) AS resources
|
|
FROM topn, UNNEST (service) svc, UNNEST (history) hist
|
|
WHERE hist.type='severity'
|
|
GROUP BY topn.{group}
|
|
ORDER BY life_time DESC
|
|
""".format(where=query.where, group=group)
|
|
return [
|
|
{
|
|
'count': t.count,
|
|
'duplicateCount': t.duplicate_count,
|
|
'environments': t.environments,
|
|
'services': t.services,
|
|
group: getattr(t, group),
|
|
'resources': [{'id': r[0], 'resource': r[1], 'href': absolute_url(f'/alert/{r[0]}')} for r in t.resources]
|
|
} for t in self._fetchall(select, query.vars, limit=topn)
|
|
]
|
|
|
|
# ENVIRONMENTS
|
|
|
|
def get_environments(self, query=None, topn=1000):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT environment, severity, status, count(1) FROM alerts
|
|
WHERE {query.where}
|
|
GROUP BY environment, CUBE(severity, status)
|
|
"""
|
|
result = self._fetchall(select, query.vars, limit=topn)
|
|
|
|
severity_count = defaultdict(list)
|
|
status_count = defaultdict(list)
|
|
total_count = defaultdict(int)
|
|
|
|
for row in result:
|
|
if row.severity and not row.status:
|
|
severity_count[row.environment].append((row.severity, row.count))
|
|
if not row.severity and row.status:
|
|
status_count[row.environment].append((row.status, row.count))
|
|
if not row.severity and not row.status:
|
|
total_count[row.environment] = row.count
|
|
|
|
select = """SELECT DISTINCT environment FROM alerts"""
|
|
environments = self._fetchall(select, {})
|
|
return [
|
|
{
|
|
'environment': e.environment,
|
|
'severityCounts': dict(severity_count[e.environment]),
|
|
'statusCounts': dict(status_count[e.environment]),
|
|
'count': total_count[e.environment]
|
|
} for e in environments]
|
|
|
|
# SERVICES
|
|
|
|
def get_services(self, query=None, topn=1000):
|
|
query = query or Query()
|
|
select = """
|
|
SELECT environment, svc, severity, status, count(1) FROM alerts, UNNEST(service) svc
|
|
WHERE {where}
|
|
GROUP BY environment, svc, CUBE(severity, status)
|
|
""".format(where=query.where)
|
|
result = self._fetchall(select, query.vars, limit=topn)
|
|
|
|
severity_count = defaultdict(list)
|
|
status_count = defaultdict(list)
|
|
total_count = defaultdict(int)
|
|
|
|
for row in result:
|
|
if row.severity and not row.status:
|
|
severity_count[(row.environment, row.svc)].append((row.severity, row.count))
|
|
if not row.severity and row.status:
|
|
status_count[(row.environment, row.svc)].append((row.status, row.count))
|
|
if not row.severity and not row.status:
|
|
total_count[(row.environment, row.svc)] = row.count
|
|
|
|
select = """SELECT DISTINCT environment, svc FROM alerts, UNNEST(service) svc"""
|
|
services = self._fetchall(select, {})
|
|
return [
|
|
{
|
|
'environment': s.environment,
|
|
'service': s.svc,
|
|
'severityCounts': dict(severity_count[(s.environment, s.svc)]),
|
|
'statusCounts': dict(status_count[(s.environment, s.svc)]),
|
|
'count': total_count[(s.environment, s.svc)]
|
|
} for s in services]
|
|
|
|
# ALERT GROUPS
|
|
|
|
def get_alert_groups(self, query=None, topn=1000):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT environment, "group", count(1) FROM alerts
|
|
WHERE {query.where}
|
|
GROUP BY environment, "group"
|
|
"""
|
|
return [
|
|
{
|
|
'environment': g.environment,
|
|
'group': g.group,
|
|
'count': g.count
|
|
} for g in self._fetchall(select, query.vars, limit=topn)]
|
|
|
|
# ALERT TAGS
|
|
|
|
def get_alert_tags(self, query=None, topn=1000):
|
|
query = query or Query()
|
|
select = """
|
|
SELECT environment, tag, count(1) FROM alerts, UNNEST(tags) tag
|
|
WHERE {where}
|
|
GROUP BY environment, tag
|
|
""".format(where=query.where)
|
|
return [{'environment': t.environment, 'tag': t.tag, 'count': t.count} for t in self._fetchall(select, query.vars, limit=topn)]
|
|
|
|
# BLACKOUTS
|
|
|
|
def create_blackout(self, blackout):
|
|
insert = """
|
|
INSERT INTO blackouts (id, priority, environment, service, resource, event,
|
|
"group", tags, origin, customer, start_time, end_time,
|
|
duration, "user", create_time, text)
|
|
VALUES (%(id)s, %(priority)s, %(environment)s, %(service)s, %(resource)s, %(event)s,
|
|
%(group)s, %(tags)s, %(origin)s, %(customer)s, %(start_time)s, %(end_time)s,
|
|
%(duration)s, %(user)s, %(create_time)s, %(text)s)
|
|
RETURNING *, duration AS remaining
|
|
"""
|
|
return self._insert(insert, vars(blackout))
|
|
|
|
def get_blackout(self, id, customers=None):
|
|
select = """
|
|
SELECT *, GREATEST(EXTRACT(EPOCH FROM (end_time - GREATEST(start_time, NOW() at time zone 'utc'))), 0) AS remaining
|
|
FROM blackouts
|
|
WHERE id=%(id)s
|
|
AND {customer}
|
|
""".format(customer='customer=ANY(%(customers)s)' if customers else '1=1')
|
|
return self._fetchone(select, {'id': id, 'customers': customers})
|
|
|
|
def get_blackouts(self, query=None, page=None, page_size=None):
|
|
query = query or Query()
|
|
select = """
|
|
SELECT *, GREATEST(EXTRACT(EPOCH FROM (end_time - GREATEST(start_time, NOW() at time zone 'utc'))), 0) AS remaining
|
|
FROM blackouts
|
|
WHERE {where}
|
|
ORDER BY {order}
|
|
""".format(where=query.where, order=query.sort)
|
|
return self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_blackouts_count(self, query=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT COUNT(1) FROM blackouts
|
|
WHERE {query.where}
|
|
"""
|
|
return self._fetchone(select, query.vars).count
|
|
|
|
def is_blackout_period(self, alert):
|
|
select = """
|
|
SELECT *
|
|
FROM blackouts
|
|
WHERE start_time <= %(create_time)s AND end_time > %(create_time)s
|
|
AND environment=%(environment)s
|
|
AND (
|
|
( resource IS NULL AND service='{}' AND event IS NULL AND "group" IS NULL AND tags='{}' AND origin IS NULL )
|
|
OR ( resource IS NULL AND service='{}' AND event IS NULL AND "group" IS NULL AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service='{}' AND event IS NULL AND "group" IS NULL AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource IS NULL AND service='{}' AND event IS NULL AND "group" IS NULL AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service='{}' AND event IS NULL AND "group"=%(group)s AND tags='{}' AND origin IS NULL )
|
|
OR ( resource IS NULL AND service='{}' AND event IS NULL AND "group"=%(group)s AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service='{}' AND event IS NULL AND "group"=%(group)s AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource IS NULL AND service='{}' AND event IS NULL AND "group"=%(group)s AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service='{}' AND event=%(event)s AND "group" IS NULL AND tags='{}' AND origin IS NULL )
|
|
OR ( resource IS NULL AND service='{}' AND event=%(event)s AND "group" IS NULL AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service='{}' AND event=%(event)s AND "group" IS NULL AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource IS NULL AND service='{}' AND event=%(event)s AND "group" IS NULL AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service='{}' AND event=%(event)s AND "group"=%(group)s AND tags='{}' AND origin IS NULL )
|
|
OR ( resource IS NULL AND service='{}' AND event=%(event)s AND "group"=%(group)s AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service='{}' AND event=%(event)s AND "group"=%(group)s AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource IS NULL AND service='{}' AND event=%(event)s AND "group"=%(group)s AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event IS NULL AND "group" IS NULL AND tags='{}' AND origin IS NULL )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event IS NULL AND "group" IS NULL AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event IS NULL AND "group" IS NULL AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event IS NULL AND "group" IS NULL AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event IS NULL AND "group"=%(group)s AND tags='{}' AND origin IS NULL )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event IS NULL AND "group"=%(group)s AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event IS NULL AND "group"=%(group)s AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event IS NULL AND "group"=%(group)s AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event=%(event)s AND "group" IS NULL AND tags='{}' AND origin IS NULL )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event=%(event)s AND "group" IS NULL AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event=%(event)s AND "group" IS NULL AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event=%(event)s AND "group" IS NULL AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event=%(event)s AND "group"=%(group)s AND tags='{}' AND origin IS NULL )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event=%(event)s AND "group"=%(group)s AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event=%(event)s AND "group"=%(group)s AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource IS NULL AND service <@ %(service)s AND event=%(event)s AND "group"=%(group)s AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service='{}' AND event IS NULL AND "group" IS NULL AND tags='{}' AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service='{}' AND event IS NULL AND "group" IS NULL AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service='{}' AND event IS NULL AND "group" IS NULL AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service='{}' AND event IS NULL AND "group" IS NULL AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service='{}' AND event IS NULL AND "group"=%(group)s AND tags='{}' AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service='{}' AND event IS NULL AND "group"=%(group)s AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service='{}' AND event IS NULL AND "group"=%(group)s AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service='{}' AND event IS NULL AND "group"=%(group)s AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service='{}' AND event=%(event)s AND "group" IS NULL AND tags='{}' AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service='{}' AND event=%(event)s AND "group" IS NULL AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service='{}' AND event=%(event)s AND "group" IS NULL AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service='{}' AND event=%(event)s AND "group" IS NULL AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service='{}' AND event=%(event)s AND "group"=%(group)s AND tags='{}' AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service='{}' AND event=%(event)s AND "group"=%(group)s AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service='{}' AND event=%(event)s AND "group"=%(group)s AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service='{}' AND event=%(event)s AND "group"=%(group)s AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event IS NULL AND "group" IS NULL AND tags='{}' AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event IS NULL AND "group" IS NULL AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event IS NULL AND "group" IS NULL AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event IS NULL AND "group" IS NULL AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event IS NULL AND "group"=%(group)s AND tags='{}' AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event IS NULL AND "group"=%(group)s AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event IS NULL AND "group"=%(group)s AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event IS NULL AND "group"=%(group)s AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event=%(event)s AND "group" IS NULL AND tags='{}' AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event=%(event)s AND "group" IS NULL AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event=%(event)s AND "group" IS NULL AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event=%(event)s AND "group" IS NULL AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event=%(event)s AND "group"=%(group)s AND tags='{}' AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event=%(event)s AND "group"=%(group)s AND tags='{}' AND origin=%(origin)s )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event=%(event)s AND "group"=%(group)s AND tags <@ %(tags)s AND origin IS NULL )
|
|
OR ( resource=%(resource)s AND service <@ %(service)s AND event=%(event)s AND "group"=%(group)s AND tags <@ %(tags)s AND origin=%(origin)s )
|
|
)
|
|
"""
|
|
if current_app.config['CUSTOMER_VIEWS']:
|
|
select += ' AND (customer IS NULL OR customer=%(customer)s)'
|
|
if self._fetchone(select, vars(alert)):
|
|
return True
|
|
return False
|
|
|
|
def update_blackout(self, id, **kwargs):
|
|
update = """
|
|
UPDATE blackouts
|
|
SET
|
|
"""
|
|
if kwargs.get('environment') is not None:
|
|
update += 'environment=%(environment)s, '
|
|
if 'service' in kwargs:
|
|
update += 'service=%(service)s, '
|
|
if 'resource' in kwargs:
|
|
update += 'resource=%(resource)s, '
|
|
if 'event' in kwargs:
|
|
update += 'event=%(event)s, '
|
|
if 'group' in kwargs:
|
|
update += '"group"=%(group)s, '
|
|
if 'tags' in kwargs:
|
|
update += 'tags=%(tags)s, '
|
|
if 'origin' in kwargs:
|
|
update += 'origin=%(origin)s, '
|
|
if 'customer' in kwargs:
|
|
update += 'customer=%(customer)s, '
|
|
if kwargs.get('startTime') is not None:
|
|
update += 'start_time=%(startTime)s, '
|
|
if kwargs.get('endTime') is not None:
|
|
update += 'end_time=%(endTime)s, '
|
|
if 'duration' in kwargs:
|
|
update += 'duration=%(duration)s, '
|
|
if 'text' in kwargs:
|
|
update += 'text=%(text)s, '
|
|
update += """
|
|
"user"=COALESCE(%(user)s, "user")
|
|
WHERE id=%(id)s
|
|
RETURNING *, GREATEST(EXTRACT(EPOCH FROM (end_time - GREATEST(start_time, NOW() at time zone 'utc'))), 0) AS remaining
|
|
"""
|
|
kwargs['id'] = id
|
|
kwargs['user'] = kwargs.get('user')
|
|
return self._updateone(update, kwargs, returning=True)
|
|
|
|
def delete_blackout(self, id):
|
|
delete = """
|
|
DELETE FROM blackouts
|
|
WHERE id=%s
|
|
RETURNING id
|
|
"""
|
|
return self._deleteone(delete, (id,), returning=True)
|
|
|
|
# HEARTBEATS
|
|
|
|
def upsert_heartbeat(self, heartbeat):
|
|
upsert = """
|
|
INSERT INTO heartbeats (id, origin, tags, attributes, type, create_time, timeout, receive_time, customer)
|
|
VALUES (%(id)s, %(origin)s, %(tags)s, %(attributes)s, %(event_type)s, %(create_time)s, %(timeout)s, %(receive_time)s, %(customer)s)
|
|
ON CONFLICT (origin, COALESCE(customer, '')) DO UPDATE
|
|
SET tags=%(tags)s, attributes=%(attributes)s, create_time=%(create_time)s, timeout=%(timeout)s, receive_time=%(receive_time)s
|
|
RETURNING *,
|
|
EXTRACT(EPOCH FROM (receive_time - create_time)) AS latency,
|
|
EXTRACT(EPOCH FROM (NOW() - receive_time)) AS since
|
|
"""
|
|
return self._upsert(upsert, vars(heartbeat))
|
|
|
|
def get_heartbeat(self, id, customers=None):
|
|
select = """
|
|
SELECT *,
|
|
EXTRACT(EPOCH FROM (receive_time - create_time)) AS latency,
|
|
EXTRACT(EPOCH FROM (NOW() - receive_time)) AS since
|
|
FROM heartbeats
|
|
WHERE (id=%(id)s OR id LIKE %(like_id)s)
|
|
AND {customer}
|
|
""".format(customer='customer=%(customers)s' if customers else '1=1')
|
|
return self._fetchone(select, {'id': id, 'like_id': id + '%', 'customers': customers})
|
|
|
|
def get_heartbeats(self, query=None, page=None, page_size=None):
|
|
query = query or Query()
|
|
select = """
|
|
SELECT *,
|
|
EXTRACT(EPOCH FROM (receive_time - create_time)) AS latency,
|
|
EXTRACT(EPOCH FROM (NOW() - receive_time)) AS since
|
|
FROM heartbeats
|
|
WHERE {where}
|
|
ORDER BY {order}
|
|
""".format(where=query.where, order=query.sort)
|
|
return self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_heartbeats_by_status(self, status=None, query=None, page=None, page_size=None):
|
|
status = status or list()
|
|
query = query or Query()
|
|
|
|
swhere = ''
|
|
if status:
|
|
q = list()
|
|
if HeartbeatStatus.OK in status:
|
|
q.append(
|
|
"""
|
|
(EXTRACT(EPOCH FROM (NOW() at time zone 'utc' - receive_time)) <= timeout
|
|
AND EXTRACT(EPOCH FROM (receive_time - create_time)) * 1000 <= {max_latency})
|
|
""".format(max_latency=current_app.config['HEARTBEAT_MAX_LATENCY']))
|
|
if HeartbeatStatus.Expired in status:
|
|
q.append("(EXTRACT(EPOCH FROM (NOW() at time zone 'utc' - receive_time)) > timeout)")
|
|
if HeartbeatStatus.Slow in status:
|
|
q.append(
|
|
"""
|
|
(EXTRACT(EPOCH FROM (NOW() at time zone 'utc' - receive_time)) <= timeout
|
|
AND EXTRACT(EPOCH FROM (receive_time - create_time)) * 1000 > {max_latency})
|
|
""".format(max_latency=current_app.config['HEARTBEAT_MAX_LATENCY']))
|
|
if q:
|
|
swhere = 'AND (' + ' OR '.join(q) + ')'
|
|
|
|
select = """
|
|
SELECT *,
|
|
EXTRACT(EPOCH FROM (receive_time - create_time)) AS latency,
|
|
EXTRACT(EPOCH FROM (NOW() - receive_time)) AS since
|
|
FROM heartbeats
|
|
WHERE {where}
|
|
{swhere}
|
|
ORDER BY {order}
|
|
""".format(where=query.where, swhere=swhere, order=query.sort)
|
|
return self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_heartbeats_count(self, query=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT COUNT(1) FROM heartbeats
|
|
WHERE {query.where}
|
|
"""
|
|
return self._fetchone(select, query.vars).count
|
|
|
|
def delete_heartbeat(self, id):
|
|
delete = """
|
|
DELETE FROM heartbeats
|
|
WHERE id=%(id)s OR id LIKE %(like_id)s
|
|
RETURNING id
|
|
"""
|
|
return self._deleteone(delete, {'id': id, 'like_id': id + '%'}, returning=True)
|
|
|
|
# API KEYS
|
|
|
|
def create_key(self, key):
|
|
insert = """
|
|
INSERT INTO keys (id, key, "user", scopes, text, expire_time, "count", last_used_time, customer)
|
|
VALUES (%(id)s, %(key)s, %(user)s, %(scopes)s, %(text)s, %(expire_time)s, %(count)s, %(last_used_time)s, %(customer)s)
|
|
RETURNING *
|
|
"""
|
|
return self._insert(insert, vars(key))
|
|
|
|
def get_key(self, key, user=None):
|
|
select = f"""
|
|
SELECT * FROM keys
|
|
WHERE (id=%(key)s OR key=%(key)s)
|
|
AND {'"user"=%(user)s' if user else '1=1'}
|
|
"""
|
|
return self._fetchone(select, {'key': key, 'user': user})
|
|
|
|
def get_keys(self, query=None, page=None, page_size=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT * FROM keys
|
|
WHERE {query.where}
|
|
ORDER BY {query.sort}
|
|
"""
|
|
return self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_keys_by_user(self, user):
|
|
select = """
|
|
SELECT * FROM keys
|
|
WHERE "user"=%s
|
|
"""
|
|
return self._fetchall(select, (user,))
|
|
|
|
def get_keys_count(self, query=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT COUNT(1) FROM keys
|
|
WHERE {query.where}
|
|
"""
|
|
return self._fetchone(select, query.vars).count
|
|
|
|
def update_key(self, key, **kwargs):
|
|
update = """
|
|
UPDATE keys
|
|
SET
|
|
"""
|
|
if 'user' in kwargs:
|
|
update += '"user"=%(user)s, '
|
|
if 'scopes' in kwargs:
|
|
update += 'scopes=%(scopes)s, '
|
|
if 'text' in kwargs:
|
|
update += 'text=%(text)s, '
|
|
if 'expireTime' in kwargs:
|
|
update += 'expire_time=%(expireTime)s, '
|
|
if 'customer' in kwargs:
|
|
update += 'customer=%(customer)s, '
|
|
update += """
|
|
id=id
|
|
WHERE (id=%(key)s OR key=%(key)s)
|
|
RETURNING *
|
|
"""
|
|
kwargs['key'] = key
|
|
return self._updateone(update, kwargs, returning=True)
|
|
|
|
def update_key_last_used(self, key):
|
|
update = """
|
|
UPDATE keys
|
|
SET last_used_time=NOW() at time zone 'utc', count=count + 1
|
|
WHERE id=%s OR key=%s
|
|
"""
|
|
return self._updateone(update, (key, key))
|
|
|
|
def delete_key(self, key):
|
|
delete = """
|
|
DELETE FROM keys
|
|
WHERE id=%s OR key=%s
|
|
RETURNING key
|
|
"""
|
|
return self._deleteone(delete, (key, key), returning=True)
|
|
|
|
# USERS
|
|
|
|
def create_user(self, user):
|
|
insert = """
|
|
INSERT INTO users (id, name, login, password, email, status, roles, attributes,
|
|
create_time, last_login, text, update_time, email_verified)
|
|
VALUES (%(id)s, %(name)s, %(login)s, %(password)s, %(email)s, %(status)s, %(roles)s, %(attributes)s, %(create_time)s,
|
|
%(last_login)s, %(text)s, %(update_time)s, %(email_verified)s)
|
|
RETURNING *
|
|
"""
|
|
return self._insert(insert, vars(user))
|
|
|
|
def get_user(self, id):
|
|
select = """SELECT * FROM users WHERE id=%s"""
|
|
return self._fetchone(select, (id,))
|
|
|
|
def get_users(self, query=None, page=None, page_size=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT * FROM users
|
|
WHERE {query.where}
|
|
ORDER BY {query.sort}
|
|
"""
|
|
return self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_users_count(self, query=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT COUNT(1) FROM users
|
|
WHERE {query.where}
|
|
"""
|
|
return self._fetchone(select, query.vars).count
|
|
|
|
def get_user_by_username(self, username):
|
|
select = """SELECT * FROM users WHERE login=%s OR email=%s"""
|
|
return self._fetchone(select, (username, username))
|
|
|
|
def get_user_by_email(self, email):
|
|
select = """SELECT * FROM users WHERE email=%s"""
|
|
return self._fetchone(select, (email,))
|
|
|
|
def get_user_by_hash(self, hash):
|
|
select = """SELECT * FROM users WHERE hash=%s"""
|
|
return self._fetchone(select, (hash,))
|
|
|
|
def update_last_login(self, id):
|
|
update = """
|
|
UPDATE users
|
|
SET last_login=NOW() at time zone 'utc'
|
|
WHERE id=%s
|
|
"""
|
|
return self._updateone(update, (id,))
|
|
|
|
def update_user(self, id, **kwargs):
|
|
update = """
|
|
UPDATE users
|
|
SET
|
|
"""
|
|
if kwargs.get('name', None) is not None:
|
|
update += 'name=%(name)s, '
|
|
if kwargs.get('login', None) is not None:
|
|
update += 'login=%(login)s, '
|
|
if kwargs.get('password', None) is not None:
|
|
update += 'password=%(password)s, '
|
|
if kwargs.get('email', None) is not None:
|
|
update += 'email=%(email)s, '
|
|
if kwargs.get('status', None) is not None:
|
|
update += 'status=%(status)s, '
|
|
if kwargs.get('roles', None) is not None:
|
|
update += 'roles=%(roles)s, '
|
|
if kwargs.get('attributes', None) is not None:
|
|
update += 'attributes=attributes || %(attributes)s, '
|
|
if kwargs.get('text', None) is not None:
|
|
update += 'text=%(text)s, '
|
|
if kwargs.get('email_verified', None) is not None:
|
|
update += 'email_verified=%(email_verified)s, '
|
|
update += """
|
|
update_time=NOW() at time zone 'utc'
|
|
WHERE id=%(id)s
|
|
RETURNING *
|
|
"""
|
|
kwargs['id'] = id
|
|
return self._updateone(update, kwargs, returning=True)
|
|
|
|
def update_user_attributes(self, id, old_attrs, new_attrs):
|
|
from alerta.utils.collections import merge
|
|
merge(old_attrs, new_attrs)
|
|
attrs = {k: v for k, v in old_attrs.items() if v is not None}
|
|
update = """
|
|
UPDATE users
|
|
SET attributes=%(attrs)s, update_time=NOW() at time zone 'utc'
|
|
WHERE id=%(id)s
|
|
RETURNING id
|
|
"""
|
|
return bool(self._updateone(update, {'id': id, 'attrs': attrs}, returning=True))
|
|
|
|
def delete_user(self, id):
|
|
delete = """
|
|
DELETE FROM users
|
|
WHERE id=%s
|
|
RETURNING id
|
|
"""
|
|
return self._deleteone(delete, (id,), returning=True)
|
|
|
|
def set_email_hash(self, id, hash):
|
|
update = """
|
|
UPDATE users
|
|
SET hash=%s, update_time=NOW() at time zone 'utc'
|
|
WHERE id=%s
|
|
"""
|
|
return self._updateone(update, (hash, id))
|
|
|
|
# GROUPS
|
|
|
|
def create_group(self, group):
|
|
insert = """
|
|
INSERT INTO groups (id, name, text)
|
|
VALUES (%(id)s, %(name)s, %(text)s)
|
|
RETURNING *, 0 AS count
|
|
"""
|
|
return self._insert(insert, vars(group))
|
|
|
|
def get_group(self, id):
|
|
select = """SELECT *, COALESCE(CARDINALITY(users), 0) AS count FROM groups WHERE id=%s"""
|
|
return self._fetchone(select, (id,))
|
|
|
|
def get_groups(self, query=None, page=None, page_size=None):
|
|
query = query or Query()
|
|
select = """
|
|
SELECT *, COALESCE(CARDINALITY(users), 0) AS count FROM groups
|
|
WHERE {where}
|
|
ORDER BY {order}
|
|
""".format(where=query.where, order=query.sort)
|
|
return self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_groups_count(self, query=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT COUNT(1) FROM groups
|
|
WHERE {query.where}
|
|
"""
|
|
return self._fetchone(select, query.vars).count
|
|
|
|
def get_group_users(self, id):
|
|
select = """
|
|
SELECT u.id, u.login, u.email, u.name, u.status
|
|
FROM (SELECT id, UNNEST(users) as uid FROM groups) g
|
|
INNER JOIN users u on g.uid = u.id
|
|
WHERE g.id = %s
|
|
"""
|
|
return self._fetchall(select, (id,))
|
|
|
|
def update_group(self, id, **kwargs):
|
|
update = """
|
|
UPDATE groups
|
|
SET
|
|
"""
|
|
if kwargs.get('name', None) is not None:
|
|
update += 'name=%(name)s, '
|
|
if kwargs.get('text', None) is not None:
|
|
update += 'text=%(text)s, '
|
|
update += """
|
|
update_time=NOW() at time zone 'utc'
|
|
WHERE id=%(id)s
|
|
RETURNING *, COALESCE(CARDINALITY(users), 0) AS count
|
|
"""
|
|
kwargs['id'] = id
|
|
return self._updateone(update, kwargs, returning=True)
|
|
|
|
def add_user_to_group(self, group, user):
|
|
update = """
|
|
UPDATE groups
|
|
SET users=ARRAY(SELECT DISTINCT UNNEST(users || %(users)s))
|
|
WHERE id=%(id)s
|
|
RETURNING *
|
|
"""
|
|
return self._updateone(update, {'id': group, 'users': [user]}, returning=True)
|
|
|
|
def remove_user_from_group(self, group, user):
|
|
update = """
|
|
UPDATE groups
|
|
SET users=(select array_agg(u) FROM unnest(users) AS u WHERE NOT u=%(user)s )
|
|
WHERE id=%(id)s
|
|
RETURNING *
|
|
"""
|
|
return self._updateone(update, {'id': group, 'user': user}, returning=True)
|
|
|
|
def delete_group(self, id):
|
|
delete = """
|
|
DELETE FROM groups
|
|
WHERE id=%s
|
|
RETURNING id
|
|
"""
|
|
return self._deleteone(delete, (id,), returning=True)
|
|
|
|
def get_groups_by_user(self, user):
|
|
select = """
|
|
SELECT *, COALESCE(CARDINALITY(users), 0) AS count
|
|
FROM groups
|
|
WHERE %s=ANY(users)
|
|
"""
|
|
return self._fetchall(select, (user,))
|
|
|
|
# PERMISSIONS
|
|
|
|
def create_perm(self, perm):
|
|
insert = """
|
|
INSERT INTO perms (id, match, scopes)
|
|
VALUES (%(id)s, %(match)s, %(scopes)s)
|
|
RETURNING *
|
|
"""
|
|
return self._insert(insert, vars(perm))
|
|
|
|
def get_perm(self, id):
|
|
select = """SELECT * FROM perms WHERE id=%s"""
|
|
return self._fetchone(select, (id,))
|
|
|
|
def get_perms(self, query=None, page=None, page_size=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT * FROM perms
|
|
WHERE {query.where}
|
|
ORDER BY {query.sort}
|
|
"""
|
|
return self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_perms_count(self, query=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT COUNT(1) FROM perms
|
|
WHERE {query.where}
|
|
"""
|
|
return self._fetchone(select, query.vars).count
|
|
|
|
def update_perm(self, id, **kwargs):
|
|
update = """
|
|
UPDATE perms
|
|
SET
|
|
"""
|
|
if 'match' in kwargs:
|
|
update += 'match=%(match)s, '
|
|
if 'scopes' in kwargs:
|
|
update += 'scopes=%(scopes)s, '
|
|
update += """
|
|
id=%(id)s
|
|
WHERE id=%(id)s
|
|
RETURNING *
|
|
"""
|
|
kwargs['id'] = id
|
|
return self._updateone(update, kwargs, returning=True)
|
|
|
|
def delete_perm(self, id):
|
|
delete = """
|
|
DELETE FROM perms
|
|
WHERE id=%s
|
|
RETURNING id
|
|
"""
|
|
return self._deleteone(delete, (id,), returning=True)
|
|
|
|
def get_scopes_by_match(self, login, matches):
|
|
if login in current_app.config['ADMIN_USERS']:
|
|
return ADMIN_SCOPES
|
|
|
|
scopes = list()
|
|
for match in matches:
|
|
if match in current_app.config['ADMIN_ROLES']:
|
|
return ADMIN_SCOPES
|
|
if match in current_app.config['USER_ROLES']:
|
|
scopes.extend(current_app.config['USER_DEFAULT_SCOPES'])
|
|
if match in current_app.config['GUEST_ROLES']:
|
|
scopes.extend(current_app.config['GUEST_DEFAULT_SCOPES'])
|
|
select = """SELECT scopes FROM perms WHERE match=%s"""
|
|
response = self._fetchone(select, (match,))
|
|
if response:
|
|
scopes.extend(response.scopes)
|
|
return sorted(set(scopes))
|
|
|
|
# CUSTOMERS
|
|
|
|
def create_customer(self, customer):
|
|
insert = """
|
|
INSERT INTO customers (id, match, customer)
|
|
VALUES (%(id)s, %(match)s, %(customer)s)
|
|
RETURNING *
|
|
"""
|
|
return self._insert(insert, vars(customer))
|
|
|
|
def get_customer(self, id):
|
|
select = """SELECT * FROM customers WHERE id=%s"""
|
|
return self._fetchone(select, (id,))
|
|
|
|
def get_customers(self, query=None, page=None, page_size=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT * FROM customers
|
|
WHERE {query.where}
|
|
ORDER BY {query.sort}
|
|
"""
|
|
return self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_customers_count(self, query=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT COUNT(1) FROM customers
|
|
WHERE {query.where}
|
|
"""
|
|
return self._fetchone(select, query.vars).count
|
|
|
|
def update_customer(self, id, **kwargs):
|
|
update = """
|
|
UPDATE customers
|
|
SET
|
|
"""
|
|
if 'match' in kwargs:
|
|
update += 'match=%(match)s, '
|
|
if 'customer' in kwargs:
|
|
update += 'customer=%(customer)s, '
|
|
update += """
|
|
id=%(id)s
|
|
WHERE id=%(id)s
|
|
RETURNING *
|
|
"""
|
|
kwargs['id'] = id
|
|
return self._updateone(update, kwargs, returning=True)
|
|
|
|
def delete_customer(self, id):
|
|
delete = """
|
|
DELETE FROM customers
|
|
WHERE id=%s
|
|
RETURNING id
|
|
"""
|
|
return self._deleteone(delete, (id,), returning=True)
|
|
|
|
def get_customers_by_match(self, login, matches):
|
|
if login in current_app.config['ADMIN_USERS']:
|
|
return '*' # all customers
|
|
|
|
customers = []
|
|
for match in [login] + matches:
|
|
select = """SELECT customer FROM customers WHERE match=%s"""
|
|
response = self._fetchall(select, (match,))
|
|
if response:
|
|
customers.extend([r.customer for r in response])
|
|
|
|
if customers:
|
|
if '*' in customers:
|
|
return '*' # all customers
|
|
return customers
|
|
|
|
raise NoCustomerMatch(f"No customer lookup configured for user '{login}' or '{','.join(matches)}'")
|
|
|
|
# NOTES
|
|
|
|
def create_note(self, note):
|
|
insert = """
|
|
INSERT INTO notes (id, text, "user", attributes, type,
|
|
create_time, update_time, alert, customer)
|
|
VALUES (%(id)s, %(text)s, %(user)s, %(attributes)s, %(note_type)s,
|
|
%(create_time)s, %(update_time)s, %(alert)s, %(customer)s)
|
|
RETURNING *
|
|
"""
|
|
return self._insert(insert, vars(note))
|
|
|
|
def get_note(self, id):
|
|
select = """
|
|
SELECT * FROM notes
|
|
WHERE id=%s
|
|
"""
|
|
return self._fetchone(select, (id,))
|
|
|
|
def get_notes(self, query=None, page=None, page_size=None):
|
|
query = query or Query()
|
|
select = f"""
|
|
SELECT * FROM notes
|
|
WHERE {query.where}
|
|
ORDER BY {query.sort or 'create_time'}
|
|
"""
|
|
return self._fetchall(select, query.vars, limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_alert_notes(self, id, page=None, page_size=None):
|
|
select = """
|
|
SELECT * FROM notes
|
|
WHERE alert ~* (%s)
|
|
"""
|
|
return self._fetchall(select, (id,), limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def get_customer_notes(self, customer, page=None, page_size=None):
|
|
select = """
|
|
SELECT * FROM notes
|
|
WHERE customer=%s
|
|
"""
|
|
return self._fetchall(select, (customer,), limit=page_size, offset=(page - 1) * page_size)
|
|
|
|
def update_note(self, id, **kwargs):
|
|
update = """
|
|
UPDATE notes
|
|
SET
|
|
"""
|
|
if kwargs.get('text', None) is not None:
|
|
update += 'text=%(text)s, '
|
|
if kwargs.get('attributes', None) is not None:
|
|
update += 'attributes=attributes || %(attributes)s, '
|
|
update += """
|
|
"user"=COALESCE(%(user)s, "user"),
|
|
update_time=NOW() at time zone 'utc'
|
|
WHERE id=%(id)s
|
|
RETURNING *
|
|
"""
|
|
kwargs['id'] = id
|
|
kwargs['user'] = kwargs.get('user')
|
|
return self._updateone(update, kwargs, returning=True)
|
|
|
|
def delete_note(self, id):
|
|
delete = """
|
|
DELETE FROM notes
|
|
WHERE id=%s
|
|
RETURNING id
|
|
"""
|
|
return self._deleteone(delete, (id,), returning=True)
|
|
|
|
# METRICS
|
|
|
|
def get_metrics(self, type=None):
|
|
select = """SELECT * FROM metrics"""
|
|
if type:
|
|
select += ' WHERE type=%s'
|
|
return self._fetchall(select, (type,))
|
|
|
|
def set_gauge(self, gauge):
|
|
upsert = """
|
|
INSERT INTO metrics ("group", name, title, description, value, type)
|
|
VALUES (%(group)s, %(name)s, %(title)s, %(description)s, %(value)s, %(type)s)
|
|
ON CONFLICT ("group", name, type) DO UPDATE
|
|
SET value=%(value)s
|
|
RETURNING *
|
|
"""
|
|
return self._upsert(upsert, vars(gauge))
|
|
|
|
def inc_counter(self, counter):
|
|
upsert = """
|
|
INSERT INTO metrics ("group", name, title, description, count, type)
|
|
VALUES (%(group)s, %(name)s, %(title)s, %(description)s, %(count)s, %(type)s)
|
|
ON CONFLICT ("group", name, type) DO UPDATE
|
|
SET count=metrics.count + %(count)s
|
|
RETURNING *
|
|
"""
|
|
return self._upsert(upsert, vars(counter))
|
|
|
|
def update_timer(self, timer):
|
|
upsert = """
|
|
INSERT INTO metrics ("group", name, title, description, count, total_time, type)
|
|
VALUES (%(group)s, %(name)s, %(title)s, %(description)s, %(count)s, %(total_time)s, %(type)s)
|
|
ON CONFLICT ("group", name, type) DO UPDATE
|
|
SET count=metrics.count + %(count)s, total_time=metrics.total_time + %(total_time)s
|
|
RETURNING *
|
|
"""
|
|
return self._upsert(upsert, vars(timer))
|
|
|
|
# HOUSEKEEPING
|
|
|
|
def get_expired(self, expired_threshold, info_threshold):
|
|
# delete 'closed' or 'expired' alerts older than "expired_threshold" seconds
|
|
# and 'informational' alerts older than "info_threshold" seconds
|
|
|
|
if expired_threshold:
|
|
delete = """
|
|
DELETE FROM alerts
|
|
WHERE (status IN ('closed', 'expired')
|
|
AND last_receive_time < (NOW() at time zone 'utc' - INTERVAL '%(expired_threshold)s seconds'))
|
|
"""
|
|
self._deleteall(delete, {'expired_threshold': expired_threshold})
|
|
|
|
if info_threshold:
|
|
delete = """
|
|
DELETE FROM alerts
|
|
WHERE (severity=%(inform_severity)s
|
|
AND last_receive_time < (NOW() at time zone 'utc' - INTERVAL '%(info_threshold)s seconds'))
|
|
"""
|
|
self._deleteall(delete, {'inform_severity': alarm_model.DEFAULT_INFORM_SEVERITY, 'info_threshold': info_threshold})
|
|
|
|
# get list of alerts to be newly expired
|
|
select = """
|
|
SELECT *
|
|
FROM alerts
|
|
WHERE status NOT IN ('expired') AND COALESCE(timeout, {timeout})!=0
|
|
AND (last_receive_time + INTERVAL '1 second' * timeout) < NOW() at time zone 'utc'
|
|
""".format(timeout=current_app.config['ALERT_TIMEOUT'])
|
|
|
|
return self._fetchall(select, {})
|
|
|
|
def get_unshelve(self):
|
|
# get list of alerts to be unshelved
|
|
select = """
|
|
SELECT DISTINCT ON (a.id) a.*
|
|
FROM alerts a, UNNEST(history) h
|
|
WHERE a.status='shelved'
|
|
AND h.type='shelve'
|
|
AND h.status='shelved'
|
|
AND COALESCE(h.timeout, {timeout})!=0
|
|
AND (a.update_time + INTERVAL '1 second' * h.timeout) < NOW() at time zone 'utc'
|
|
ORDER BY a.id, a.update_time DESC
|
|
""".format(timeout=current_app.config['SHELVE_TIMEOUT'])
|
|
return self._fetchall(select, {})
|
|
|
|
def get_unack(self):
|
|
# get list of alerts to be unack'ed
|
|
select = """
|
|
SELECT DISTINCT ON (a.id) a.*
|
|
FROM alerts a, UNNEST(history) h
|
|
WHERE a.status='ack'
|
|
AND h.type='ack'
|
|
AND h.status='ack'
|
|
AND COALESCE(h.timeout, {timeout})!=0
|
|
AND (a.update_time + INTERVAL '1 second' * h.timeout) < NOW() at time zone 'utc'
|
|
ORDER BY a.id, a.update_time DESC
|
|
""".format(timeout=current_app.config['ACK_TIMEOUT'])
|
|
return self._fetchall(select, {})
|
|
|
|
# SQL HELPERS
|
|
|
|
def _insert(self, query, vars):
|
|
"""
|
|
Insert, with return.
|
|
"""
|
|
cursor = self.get_db().cursor()
|
|
self._log(cursor, query, vars)
|
|
cursor.execute(query, vars)
|
|
self.get_db().commit()
|
|
return cursor.fetchone()
|
|
|
|
def _fetchone(self, query, vars):
|
|
"""
|
|
Return none or one row.
|
|
"""
|
|
cursor = self.get_db().cursor()
|
|
self._log(cursor, query, vars)
|
|
cursor.execute(query, vars)
|
|
return cursor.fetchone()
|
|
|
|
def _fetchall(self, query, vars, limit=None, offset=0):
|
|
"""
|
|
Return multiple rows.
|
|
"""
|
|
if limit is None:
|
|
limit = current_app.config['DEFAULT_PAGE_SIZE']
|
|
query += f' LIMIT {limit} OFFSET {offset}'
|
|
cursor = self.get_db().cursor()
|
|
self._log(cursor, query, vars)
|
|
cursor.execute(query, vars)
|
|
return cursor.fetchall()
|
|
|
|
def _updateone(self, query, vars, returning=False):
|
|
"""
|
|
Update, with optional return.
|
|
"""
|
|
cursor = self.get_db().cursor()
|
|
self._log(cursor, query, vars)
|
|
cursor.execute(query, vars)
|
|
self.get_db().commit()
|
|
return cursor.fetchone() if returning else None
|
|
|
|
def _updateall(self, query, vars, returning=False):
|
|
"""
|
|
Update, with optional return.
|
|
"""
|
|
cursor = self.get_db().cursor()
|
|
self._log(cursor, query, vars)
|
|
cursor.execute(query, vars)
|
|
self.get_db().commit()
|
|
return cursor.fetchall() if returning else None
|
|
|
|
def _upsert(self, query, vars):
|
|
"""
|
|
Insert or update, with return.
|
|
"""
|
|
return self._insert(query, vars)
|
|
|
|
def _deleteone(self, query, vars, returning=False):
|
|
"""
|
|
Delete, with optional return.
|
|
"""
|
|
cursor = self.get_db().cursor()
|
|
self._log(cursor, query, vars)
|
|
cursor.execute(query, vars)
|
|
self.get_db().commit()
|
|
return cursor.fetchone() if returning else None
|
|
|
|
def _deleteall(self, query, vars, returning=False):
|
|
"""
|
|
Delete multiple rows, with optional return.
|
|
"""
|
|
cursor = self.get_db().cursor()
|
|
self._log(cursor, query, vars)
|
|
cursor.execute(query, vars)
|
|
self.get_db().commit()
|
|
return cursor.fetchall() if returning else None
|
|
|
|
def _log(self, cursor, query, vars):
|
|
current_app.logger.debug('{stars}\n{query}\n{stars}'.format(
|
|
stars='*' * 40, query=cursor.mogrify(query, vars).decode('utf-8')))
|