108 lines
3.2 KiB
Python
108 lines
3.2 KiB
Python
import logging
|
|
import os
|
|
from datetime import datetime
|
|
|
|
from alerta.plugins import PluginBase
|
|
from influxdb import InfluxDBClient
|
|
|
|
try:
|
|
from alerta.plugins import app # alerta >= 5.0
|
|
except ImportError:
|
|
from alerta.app import app # alerta < 5.0
|
|
|
|
|
|
LOG = logging.getLogger('alerta.plugins.influxdb')
|
|
|
|
# 'influxdb://username:password@localhost:8086/databasename'
|
|
DEFAULT_INFLUXDB_DSN = 'influxdb://user:pass@localhost:8086/alerta'
|
|
|
|
INFLUXDB_DSN = os.environ.get('INFLUXDB_DSN') or app.config.get(
|
|
'INFLUXDB_DSN', DEFAULT_INFLUXDB_DSN)
|
|
INFLUXDB_DATABASE = os.environ.get(
|
|
'INFLUXDB_DATABASE') or app.config.get('INFLUXDB_DATABASE', None)
|
|
|
|
# Specify the name of a measurement to which all alerts will be logged
|
|
INFLUXDB_MEASUREMENT = os.environ.get(
|
|
'INFLUXDB_MEASUREMENT') or app.config.get('INFLUXDB_MEASUREMENT', 'event')
|
|
|
|
|
|
class InfluxDBWrite(PluginBase):
|
|
|
|
def __init__(self, name=None):
|
|
|
|
self.client = InfluxDBClient.from_dsn(INFLUXDB_DSN, timeout=2)
|
|
|
|
dbname = INFLUXDB_DATABASE or self.client._database
|
|
try:
|
|
if dbname:
|
|
self.client.switch_database(dbname)
|
|
self.client.create_database(dbname)
|
|
except Exception as e:
|
|
LOG.error('InfluxDB: ERROR - %s' % e)
|
|
|
|
super().__init__(name)
|
|
|
|
def pre_receive(self, alert):
|
|
return alert
|
|
|
|
def _influxdb_prepare_point(self, alert, status=None, text=None):
|
|
tags = {}
|
|
|
|
for tag in alert.tags:
|
|
try:
|
|
k, v = tag.split('=', 1)
|
|
tags[k] = v
|
|
except ValueError:
|
|
pass
|
|
|
|
tags.update(
|
|
event=alert.event,
|
|
resource=alert.resource,
|
|
environment=alert.environment,
|
|
severity=alert.severity,
|
|
status=status if status else alert.status,
|
|
service=','.join(alert.service)
|
|
)
|
|
if alert.customer:
|
|
tags.update(customer=alert.customer)
|
|
|
|
# event data
|
|
point = {
|
|
'measurement': INFLUXDB_MEASUREMENT,
|
|
'time': datetime.utcnow() if status else alert.create_time,
|
|
'tags': tags,
|
|
'fields': {}
|
|
}
|
|
|
|
# make sure we store the value in its original format
|
|
if isinstance(alert.value, float) or isinstance(alert.value, int):
|
|
point['fields']['value'] = alert.value
|
|
else:
|
|
point['fields']['value'] = str(alert.value)
|
|
|
|
if text:
|
|
point['fields']['text'] = text
|
|
|
|
return point
|
|
|
|
def post_receive(self, alert):
|
|
point = self._influxdb_prepare_point(alert)
|
|
LOG.debug('InfluxDB: point=%s', point)
|
|
|
|
try:
|
|
self.client.write_points([point], time_precision='ms')
|
|
except Exception as e:
|
|
raise RuntimeError('InfluxDB: ERROR - %s' % e)
|
|
|
|
def status_change(self, alert, status, text):
|
|
if status not in ['ack', 'assign']:
|
|
return
|
|
|
|
point = self._influxdb_prepare_point(alert, status, text)
|
|
LOG.debug('InfluxDB: point=%s', point)
|
|
|
|
try:
|
|
self.client.write_points([point], time_precision='ms')
|
|
except Exception as e:
|
|
raise RuntimeError('InfluxDB: ERROR - %s' % e)
|