healthchecks_healthchecks/hc/api/models.py
2024-11-14 13:33:21 +02:00

1325 lines
45 KiB
Python

from __future__ import annotations
import hashlib
import json
import socket
import uuid
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta as td
from datetime import timezone
from typing import Any, TypedDict
from urllib.parse import urlencode
from zoneinfo import ZoneInfo
from cronsim import CronSim
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.humanize.templatetags.humanize import naturaltime
from django.core.mail import mail_admins
from django.core.signing import TimestampSigner
from django.db import models, transaction
from django.db.models import F, QuerySet
from django.http import HttpRequest
from django.urls import reverse
from django.utils.functional import cached_property
from django.utils.timezone import now
from oncalendar import OnCalendar
from pydantic import BaseModel, Field
from hc.accounts.models import Project
from hc.api import transports
from hc.lib import emails
from hc.lib.date import month_boundaries, seconds_in_month
from hc.lib.s3 import get_object, put_object, remove_objects
STATUSES = (("up", "Up"), ("down", "Down"), ("new", "New"), ("paused", "Paused"))
DEFAULT_TIMEOUT = td(days=1)
DEFAULT_GRACE = td(hours=1)
NEVER = datetime(3000, 1, 1, tzinfo=timezone.utc)
CHECK_KINDS = (("simple", "Simple"), ("cron", "Cron"), ("oncalendar", "OnCalendar"))
# max time between start and ping where we will consider both events related:
MAX_DURATION = td(hours=72)
REASONS = (("", "Unknown"), ("timeout", "Timeout"), ("fail", "Fail signal"))
TRANSPORTS: dict[str, tuple[str, type[transports.Transport]]] = {
"apprise": ("Apprise", transports.Apprise),
"call": ("Phone Call", transports.Call),
"discord": ("Discord", transports.Discord),
"email": ("Email", transports.Email),
"gotify": ("Gotify", transports.Gotify),
"group": ("Group", transports.Group),
"linenotify": ("LINE Notify", transports.LineNotify),
"matrix": ("Matrix", transports.Matrix),
"mattermost": ("Mattermost", transports.Mattermost),
"msteams": ("MS Teams Connector (stops working Jan 2025)", transports.MsTeams),
"msteamsw": ("Microsoft Teams", transports.MsTeamsWorkflow),
"ntfy": ("ntfy", transports.Ntfy),
"opsgenie": ("Opsgenie", transports.Opsgenie),
"pagertree": ("PagerTree", transports.PagerTree),
"pd": ("PagerDuty", transports.PagerDuty),
"po": ("Pushover", transports.Pushover),
"pushbullet": ("Pushbullet", transports.Pushbullet),
"rocketchat": ("Rocket.Chat", transports.RocketChat),
"shell": ("Shell Command", transports.Shell),
"signal": ("Signal", transports.Signal),
"slack": ("Slack", transports.Slack),
"sms": ("SMS", transports.Sms),
"spike": ("Spike", transports.Spike),
"telegram": ("Telegram", transports.Telegram),
"trello": ("Trello", transports.Trello),
"victorops": ("Splunk On-Call", transports.VictorOps),
"webhook": ("Webhook", transports.Webhook),
"whatsapp": ("WhatsApp", transports.WhatsApp),
"zulip": ("Zulip", transports.Zulip),
}
CHANNEL_KINDS = [(kind, label_cls[0]) for kind, label_cls in TRANSPORTS.items()]
PO_PRIORITIES = {
-3: "disabled",
-2: "lowest",
-1: "low",
0: "normal",
1: "high",
2: "emergency",
}
NTFY_PRIORITIES = {
5: "max",
4: "high",
3: "default",
2: "low",
1: "min",
0: "disabled",
}
def isostring(dt: datetime | None) -> str | None:
"""Convert the datetime to ISO 8601 format with no microseconds."""
return dt.replace(microsecond=0).isoformat() if dt else None
class CheckDict(TypedDict, total=False):
uuid: str | None
name: str
slug: str
tags: str
desc: str
grace: int
n_pings: int
status: str
started: bool
last_ping: str | None
next_ping: str | None
manual_resume: bool
methods: str
subject: str
subject_fail: str
start_kw: str
success_kw: str
failure_kw: str
filter_subject: bool
filter_body: bool
last_duration: int
unique_key: str
ping_url: str
update_url: str
pause_url: str
resume_url: str
channels: str
timeout: int
schedule: str
tz: str
@dataclass
class DowntimeRecord:
boundary: datetime # The start of this time interval (timezone-aware)
tz: str # For calculating total seconds in a month
no_data: bool # True if the check did not yet exist in this time interval
duration: td # Total downtime in this time interval
count: int # The number of downtime events in this time interval
def monthly_uptime(self) -> float:
# NB: this method assumes monthly boundaries.
# It will yield incorrect results for weekly boundaries
max_seconds = seconds_in_month(self.boundary.date(), self.tz)
up_seconds = max_seconds - self.duration.total_seconds()
return up_seconds / max_seconds
class DowntimeRecorder:
def __init__(self, boundaries: list[datetime], tz: str, created: datetime) -> None:
"""
`boundaries` is a list of timezone-aware datetimes of the starts of time
intervals (months or weeks), and should be pre-sorted in descending order.
"""
self.records = []
prev_boundary = None
for b in boundaries:
# If the check was created *after* the start of the previous time
# interval then the check did not yet exist during this time interval:
no_data = prev_boundary is not None and created > prev_boundary
self.records.append(DowntimeRecord(b, tz, no_data, td(), 0))
prev_boundary = b
def add(self, when: datetime, duration: td) -> None:
for record in self.records:
if when >= record.boundary:
record.duration += duration
record.count += 1
return
class Check(models.Model):
name = models.CharField(max_length=100, blank=True)
slug = models.CharField(max_length=100, blank=True)
tags = models.CharField(max_length=500, blank=True)
code = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
desc = models.TextField(blank=True)
project = models.ForeignKey(Project, models.CASCADE)
created = models.DateTimeField(default=now)
kind = models.CharField(max_length=10, default="simple", choices=CHECK_KINDS)
timeout = models.DurationField(default=DEFAULT_TIMEOUT)
grace = models.DurationField(default=DEFAULT_GRACE)
schedule = models.CharField(max_length=100, default="* * * * *")
tz = models.CharField(max_length=36, default="UTC")
filter_subject = models.BooleanField(default=False)
filter_body = models.BooleanField(default=False)
start_kw = models.CharField(max_length=200, blank=True)
success_kw = models.CharField(max_length=200, blank=True)
failure_kw = models.CharField(max_length=200, blank=True)
methods = models.CharField(max_length=30, blank=True)
manual_resume = models.BooleanField(default=False)
badge_key = models.UUIDField(null=True, unique=True)
n_pings = models.IntegerField(default=0)
last_ping = models.DateTimeField(null=True, blank=True)
last_start = models.DateTimeField(null=True, blank=True)
last_start_rid = models.UUIDField(null=True)
last_duration = models.DurationField(null=True, blank=True)
has_confirmation_link = models.BooleanField(default=False)
alert_after = models.DateTimeField(null=True, blank=True, editable=False)
status = models.CharField(max_length=6, choices=STATUSES, default="new")
class Meta:
indexes = [
# Index for the alert_after field. Exclude rows with status=down.
# Used in the sendalerts management command.
models.Index(
fields=["alert_after"],
name="api_check_aa_not_down",
condition=~models.Q(status="down"),
),
models.Index(fields=["project_id", "slug"], name="api_check_project_slug"),
]
def __str__(self) -> str:
return "%s (%d)" % (self.name or self.code, self.id)
def name_then_code(self) -> str:
if self.name:
return self.name
return str(self.code)
def url(self) -> str | None:
"""Return check's ping url in user's preferred style.
Note: this method reads self.project. If project is not loaded already,
this causes a SQL query.
"""
if self.project_id and self.project.show_slugs:
if not self.slug:
return None
# If ping_key is not set, use dummy placeholder
key = self.project.ping_key or "{ping_key}"
return settings.PING_ENDPOINT + key + "/" + self.slug
return settings.PING_ENDPOINT + str(self.code)
def details_url(self, full: bool = True) -> str:
result = reverse("hc-details", args=[self.code])
return settings.SITE_ROOT + result if full else result
def get_absolute_url(self) -> str:
return self.details_url(full=False)
def cloaked_url(self) -> str:
return settings.SITE_ROOT + reverse("hc-uncloak", args=[self.unique_key])
def email(self) -> str:
return "%s@%s" % (self.code, settings.PING_EMAIL_DOMAIN)
def clamped_last_duration(self) -> td | None:
if self.last_duration and self.last_duration < MAX_DURATION:
return self.last_duration
return None
def get_grace_start(self, *, with_started: bool = True) -> datetime | None:
"""Return the datetime when the grace period starts.
If the check is currently new, paused or down, return None.
"""
# NEVER is a constant sentinel value (year 3000).
# Using None instead would make the min() logic clunky.
result = NEVER
if self.kind == "simple" and self.status == "up":
assert self.last_ping is not None
result = self.last_ping + self.timeout
elif self.kind == "cron" and self.status == "up":
assert self.last_ping is not None
# The complex case, next ping is expected based on cron schedule.
# Don't convert to naive datetimes (and so avoid ambiguities around
# DST transitions). cronsim will handle the timezone-aware datetimes.
last_local = self.last_ping.astimezone(ZoneInfo(self.tz))
result = next(CronSim(self.schedule, last_local))
# Important: convert from the local timezone back to UTC.
# If the result is kept in the local timezone, adding
# a timedelta to it later (in `going_down_after` and in `get_status`)
# may yield incorrect results during DST transitions.
result = result.astimezone(timezone.utc)
elif self.kind == "oncalendar" and self.status == "up":
assert self.last_ping is not None
last_local = self.last_ping.astimezone(ZoneInfo(self.tz))
try:
result = next(OnCalendar(self.schedule, last_local))
# Same as for cron, convert back to UTC:
result = result.astimezone(timezone.utc)
except StopIteration:
result = NEVER
if with_started and self.last_start and self.status != "down":
result = min(result, self.last_start)
return result if result != NEVER else None
def going_down_after(self) -> datetime | None:
"""Return the datetime when the check goes down.
If the check is new or paused, and not currently running, return None.
If the check is already down, also return None.
"""
grace_start = self.get_grace_start()
if grace_start is not None:
return grace_start + self.grace
return None
@cached_property
def cached_status(self) -> str:
return self.get_status()
def get_status(self, *, with_started: bool = False) -> str:
"""Return current status for display."""
frozen_now = now()
if self.last_start:
if frozen_now >= self.last_start + self.grace:
return "down"
elif with_started:
return "started"
if self.status in ("new", "paused", "down"):
return self.status
grace_start = self.get_grace_start(with_started=False)
if grace_start is None:
# next elapse is "never", so this check will stay up indefinitely
return "up"
grace_end = grace_start + self.grace
if frozen_now >= grace_end:
return "down"
if frozen_now >= grace_start:
return "grace"
return "up"
def lock_and_delete(self) -> None:
"""Acquire a DB lock for this check, then delete the check.
Without the lock the delete can fail, if the check gets pinged while it is
in the process of deletion.
"""
with transaction.atomic():
Check.objects.select_for_update().filter(id=self.id).delete()
def assign_all_channels(self) -> None:
channels = Channel.objects.filter(project=self.project)
self.channel_set.set(channels)
def tags_list(self) -> list[str]:
return [t.strip() for t in self.tags.split(" ") if t.strip()]
def matches_tag_set(self, tag_set: set[str]) -> bool:
return tag_set.issubset(self.tags_list())
def channels_str(self) -> str:
"""Return a comma-separated string of assigned channel codes."""
# Is this an unsaved instance?
if not self.id:
return ""
# self.channel_set may already be prefetched.
# Sort in python to make sure we don't run additional queries
codes = [str(channel.code) for channel in self.channel_set.all()]
return ",".join(sorted(codes))
@property
def unique_key(self) -> str:
code_half = self.code.hex[:16]
return hashlib.sha1(code_half.encode()).hexdigest()
def prepare_badge_key(self) -> uuid.UUID:
if not self.badge_key:
self.badge_key = uuid.uuid4()
Check.objects.filter(id=self.id).update(badge_key=self.badge_key)
return self.badge_key
def to_dict(self, *, readonly: bool = False, v: int = 3) -> CheckDict:
with_started = v == 1
result: CheckDict = {
"name": self.name,
"slug": self.slug,
"tags": self.tags,
"desc": self.desc,
"grace": int(self.grace.total_seconds()),
"n_pings": self.n_pings,
"status": self.get_status(with_started=with_started),
"started": self.last_start is not None,
"last_ping": isostring(self.last_ping),
"next_ping": isostring(self.get_grace_start()),
"manual_resume": self.manual_resume,
"methods": self.methods,
"subject": self.success_kw if self.filter_subject else "",
"subject_fail": self.failure_kw if self.filter_subject else "",
"start_kw": self.start_kw,
"success_kw": self.success_kw,
"failure_kw": self.failure_kw,
"filter_subject": self.filter_subject,
"filter_body": self.filter_body,
}
if self.last_duration:
result["last_duration"] = int(self.last_duration.total_seconds())
if readonly:
result["unique_key"] = self.unique_key
else:
result["uuid"] = str(self.code)
result["ping_url"] = settings.PING_ENDPOINT + str(self.code)
# Optimization: construct API URLs manually instead of using reverse().
# This is significantly quicker when returning hundreds of checks.
update_url = settings.SITE_ROOT + f"/api/v{v}/checks/{self.code}"
result["update_url"] = update_url
result["pause_url"] = update_url + "/pause"
result["resume_url"] = update_url + "/resume"
result["channels"] = self.channels_str()
if self.kind == "simple":
result["timeout"] = int(self.timeout.total_seconds())
elif self.kind in ("cron", "oncalendar"):
result["schedule"] = self.schedule
result["tz"] = self.tz
return result
def ping(
self,
remote_addr: str,
scheme: str,
method: str,
ua: str,
body: bytes,
action: str,
rid: uuid.UUID | None,
exitstatus: int | None = None,
) -> None:
# The following block updates a Check object, then creates a Ping object.
# There's a possible race condition where the "sendalerts" command sees
# the updated Check object before the Ping object is created.
# To avoid this, put both operations inside a transaction:
with transaction.atomic():
# Acquire a lock. Without locking, on MariaDB, concurrent pings can
# lead to a deadlock
self = Check.objects.select_for_update().get(id=self.id)
frozen_now = now()
if self.status == "paused" and self.manual_resume:
action = "ign"
if action == "start":
self.last_start = frozen_now
self.last_start_rid = rid
# Don't update "last_ping" field.
elif action == "ign":
pass
elif action == "log":
pass
else:
self.last_ping = frozen_now
self.last_duration = None
if self.last_start:
if self.last_start_rid == rid:
# rid matches: calculate last_duration, clear last_start
self.last_duration = self.last_ping - self.last_start
self.last_start = None
elif action == "fail" or rid is None:
# clear last_start (exit the "running" state) on:
# - "success" event with no rid
# - "fail" event, regardless of rid mismatch
self.last_start = None
new_status = "down" if action == "fail" else "up"
if self.status != new_status:
reason = "fail" if action == "fail" else ""
self.create_flip(new_status, reason=reason)
self.status = new_status
self.alert_after = self.going_down_after()
self.n_pings = models.F("n_pings") + 1
body_lowercase = body.decode(errors="replace").lower()
self.has_confirmation_link = "confirm" in body_lowercase
self.save()
self.refresh_from_db()
ping = Ping(owner=self)
ping.n = self.n_pings
ping.created = frozen_now
if action in ("start", "fail", "ign", "log"):
ping.kind = action
ping.remote_addr = remote_addr
ping.scheme = scheme
ping.method = method
# If User-Agent is longer than 200 characters, truncate it:
ping.ua = ua[:200]
if len(body) > 100 and settings.S3_BUCKET:
ping.object_size = len(body)
else:
ping.body_raw = body
ping.rid = rid
ping.exitstatus = exitstatus
ping.save()
# Upload ping body to S3 outside the DB transaction, because this operation
# can potentially take a long time:
if ping.object_size:
put_object(self.code, ping.n, body)
# Every 100 received pings, prune old pings and notifications:
if self.n_pings % 100 == 0:
self.prune()
def prune(self, wait: bool = False) -> None:
"""Remove old pings and notifications."""
threshold = self.n_pings - self.project.owner_profile.ping_log_limit
# Remove ping bodies from object storage
if settings.S3_BUCKET:
remove_objects(str(self.code), threshold, wait=wait)
# Remove ping objects from db
self.ping_set.filter(n__lte=threshold).delete()
try:
# Important: sort by "created", not by "id". Sorting by id
# may cause Postgres to use the "api_ping_pkey" index, and scan
# a huge number of rows.
ping = self.ping_set.earliest("created")
# Delete notifications older than the oldest retained ping
self.notification_set.filter(created__lt=ping.created).delete()
# Delete flips older than the oldest retained ping *and*
# older than 93 days. We need ~3 months of flips for calculating
# downtime statistics. The precise requirement is
# "we need the current month and full two previous months of data".
# We could calculate this precisely, but 3*31 is close enough and
# much simpler.
flip_threshold = min(ping.created, now() - td(days=93))
self.flip_set.filter(created__lt=flip_threshold).delete()
except Ping.DoesNotExist:
pass
@property
def visible_pings(self) -> QuerySet[Ping]:
threshold = self.n_pings - self.project.owner_profile.ping_log_limit
return self.ping_set.filter(n__gt=threshold)
def downtimes_by_boundary(
self, boundaries: list[datetime], tz: str
) -> list[DowntimeRecord]:
"""Calculate downtime counts and durations for the given time intervals.
Returns a list of DowntimeRecord instances in descending datetime order.
`boundaries` are timezone-aware datetimes of the first days of time intervals
(months or weeks), and should be pre-sorted in descending order.
"""
summary = DowntimeRecorder(boundaries, tz, self.created)
# A list of flips and time interval boundaries
events = [(b, "---") for b in boundaries]
q = self.flip_set.filter(created__gt=min(boundaries))
for pair in q.values_list("created", "old_status"):
events.append(pair)
# Iterate through flips and boundaries,
# and for each "down" event increase the counters in `totals`.
dt, status = now(), self.status
for prev_dt, prev_status in sorted(events, reverse=True):
if status == "down":
# Before subtracting datetimes convert them to UTC.
# Otherwise we will get incorrect results around DST transitions:
delta = dt.astimezone(timezone.utc) - prev_dt.astimezone(timezone.utc)
summary.add(prev_dt, delta)
dt = prev_dt
if prev_status != "---":
status = prev_status
return summary.records
def downtimes(self, months: int, tz: str) -> list[DowntimeRecord]:
boundaries = month_boundaries(months, tz)
return self.downtimes_by_boundary(boundaries, tz)
def create_flip(
self, new_status: str, reason: str = "", mark_as_processed: bool = False
) -> None:
"""Create a Flip object for this check.
Flip objects record check status changes, and have two uses:
- for sending notifications asynchronously (create a flip object in
wwww process, a separate "sendalerts" process picks it up and processes it)
- for downtime statistics calculation. The Check.downtimes() method
analyzes the flips and calculates downtime counts and durations per
month.
"""
flip = Flip(owner=self)
flip.created = now()
if mark_as_processed:
flip.processed = flip.created
flip.old_status = self.status
flip.new_status = new_status
flip.reason = reason
flip.save()
class PingDict(TypedDict, total=False):
type: str
date: str
n: int | None
scheme: str
remote_addr: str | None
method: str
ua: str
rid: uuid.UUID | None
duration: float
body_url: str | None
class Ping(models.Model):
id = models.BigAutoField(primary_key=True)
n = models.IntegerField(null=True)
owner = models.ForeignKey(Check, models.CASCADE)
created = models.DateTimeField(default=now)
kind = models.CharField(max_length=6, blank=True, null=True)
scheme = models.CharField(max_length=10, default="http")
remote_addr = models.GenericIPAddressField(blank=True, null=True)
method = models.CharField(max_length=10, blank=True)
ua = models.CharField(max_length=200, blank=True)
body_raw = models.BinaryField(null=True)
object_size = models.IntegerField(null=True)
exitstatus = models.SmallIntegerField(null=True)
rid = models.UUIDField(null=True)
def to_dict(self) -> PingDict:
if self.has_body():
args = [self.owner.code, self.n]
body_url = settings.SITE_ROOT + reverse("hc-api-ping-body", args=args)
else:
body_url = None
result: PingDict = {
"type": self.kind or "success",
"date": self.created.isoformat(),
"n": self.n,
"scheme": self.scheme,
"remote_addr": self.remote_addr,
"method": self.method,
"ua": self.ua,
"rid": self.rid,
"body_url": body_url,
}
duration = self.duration
if duration is not None:
result["duration"] = duration.total_seconds()
return result
def has_body(self) -> bool:
if self.body_raw or self.object_size:
return True
return False
def get_body_bytes(self) -> bytes | None:
if self.object_size and self.n:
return get_object(str(self.owner.code), self.n)
if self.body_raw:
return self.body_raw
return None
def get_body(self) -> str | None:
body_bytes = self.get_body_bytes()
if body_bytes:
return bytes(body_bytes).decode(errors="replace")
return None
def get_body_size(self) -> int:
if self.body_raw:
return len(self.body_raw)
if self.object_size:
return self.object_size
return 0
def get_kind_display(self) -> str:
if self.kind == "ign":
return "Ignored"
if self.kind == "fail":
if self.exitstatus:
return f"Exit status {self.exitstatus}"
return "Failure"
if self.kind == "start":
return "Start"
if self.kind == "log":
return "Log"
return "Success"
@cached_property
def duration(self) -> td | None:
# Return early if this is not a success or failure ping,
# or if this is the very first ping:
if self.kind not in (None, "", "fail") or self.n == 1:
return None
pings = Ping.objects.filter(owner=self.owner_id)
# only look backwards but don't look further than MAX_DURATION in the past
pings = pings.filter(id__lt=self.id, created__gte=self.created - MAX_DURATION)
# Look for a "start" event, with no success/fail event in between:
for ping in pings.order_by("-id").only("created", "kind", "rid"):
if ping.kind == "start" and ping.rid == self.rid:
return self.created - ping.created
elif ping.kind in (None, "", "fail") and ping.rid == self.rid:
return None
return None
def formatted_kind_created(self) -> str:
"""Return a string in "Success, 10 minutes" form."""
# xa0 is non-breaking spaces, we want regular spaces
created_str = naturaltime(self.created).replace("\xa0", " ")
return f"{self.get_kind_display()}, {created_str}"
class WebhookSpec(BaseModel):
method: str
url: str
body: str
headers: dict[str, str]
class TelegramConf(BaseModel):
id: int
thread_id: int | None = None
type: str | None = None
name: str | None = None
class ShellConf(BaseModel):
cmd_down: str
cmd_up: str
class PdConf(BaseModel):
service_key: str
account: str | None = None
@classmethod
def load(cls, data: Any) -> PdConf:
# Is it plain service_key value?
if not data.startswith("{"):
return cls.model_validate({"service_key": data})
return super().model_validate_json(data)
class PhoneConf(BaseModel):
value: str
notify_up: bool | None = Field(None, alias="up")
notify_down: bool | None = Field(None, alias="down")
class EmailConf(BaseModel):
value: str
notify_up: bool = Field(alias="up")
notify_down: bool = Field(alias="down")
@classmethod
def load(cls, data: Any) -> EmailConf:
# Is it a plain email address?
if not data.startswith("{"):
return cls.model_validate({"value": data, "up": True, "down": True})
return super().model_validate_json(data)
class OpsgenieConf(BaseModel):
key: str
region: str
class ZulipConf(BaseModel):
bot_email: str
api_key: str
mtype: str
to: str
site: str = ""
topic: str = ""
def model_post_init(self, context: Any) -> None:
if self.site == "":
# Fallback if we don't have the site value:
# derive it from bot's email
_, domain = self.bot_email.split("@")
self.site = f"https://{domain}"
class NtfyConf(BaseModel):
topic: str
url: str
priority: int
priority_up: int
token: str = ""
@property
def priority_display(self) -> str:
return NTFY_PRIORITIES[self.priority]
class TrelloConf(BaseModel):
token: str
list_id: str
board_name: str
list_name: str
class GotifyConf(BaseModel):
url: str
token: str
class Channel(models.Model):
name = models.CharField(max_length=100, blank=True)
code = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
project = models.ForeignKey(Project, models.CASCADE)
created = models.DateTimeField(default=now)
kind = models.CharField(max_length=20, choices=CHANNEL_KINDS)
value = models.TextField(blank=True)
email_verified = models.BooleanField(default=False)
disabled = models.BooleanField(default=False)
last_notify = models.DateTimeField(null=True, blank=True)
last_notify_duration = models.DurationField(null=True, blank=True)
last_error = models.CharField(max_length=200, blank=True)
checks = models.ManyToManyField(Check)
def __str__(self) -> str:
if self.name:
return self.name
if self.kind == "email":
return f"Email to {self.email.value}"
elif self.kind == "sms":
return f"SMS to {self.phone.value}"
elif self.kind == "slack":
return f"Slack {self.slack_channel}"
elif self.kind == "telegram":
return f"Telegram {self.telegram.name}"
elif self.kind == "zulip":
if self.zulip.mtype == "stream":
return f"Zulip stream {self.zulip.to}"
if self.zulip.mtype == "private":
return f"Zulip user {self.zulip.to}"
return self.get_kind_display()
def to_dict(self) -> dict[str, str]:
return {"id": str(self.code), "name": self.name, "kind": self.kind}
def is_editable(self) -> bool:
return self.kind in (
"email",
"webhook",
"sms",
"signal",
"whatsapp",
"ntfy",
"group",
)
def assign_all_checks(self) -> None:
checks = Check.objects.filter(project=self.project)
self.checks.add(*checks)
def make_token(self) -> str:
seed = "%s%s" % (self.code, settings.SECRET_KEY)
seed_bytes = seed.encode()
return hashlib.sha1(seed_bytes).hexdigest()
def send_verify_link(self) -> None:
args = [self.code, self.make_token()]
verify_link = reverse("hc-verify-email", args=args)
verify_link = settings.SITE_ROOT + verify_link
emails.verify_email(self.email.value, {"verify_link": verify_link})
def get_unsub_link(self) -> str:
signer = TimestampSigner(salt="alerts")
signed_token = signer.sign(self.make_token())
args = [self.code, signed_token]
verify_link = reverse("hc-unsubscribe-alerts", args=args)
return settings.SITE_ROOT + verify_link
def send_signal_captcha_alert(self, challenge: str, raw: str) -> None:
subject = "Signal CAPTCHA proof required"
message = f"Challenge token: {challenge}"
hostname = socket.gethostname()
submit_url = settings.SITE_ROOT + reverse("hc-signal-captcha")
submit_url += "?" + urlencode({"host": hostname, "challenge": challenge})
html_message = f"""
On host <b>{hostname}</b>, run:<br>
<pre>manage.py submitchallenge {challenge} CAPTCHA-SOLUTION-HERE</pre><br>
<br>
Alternatively, <a href="{submit_url}">submit CAPTCHA solution here</a>.<br>
<br>
Message from Signal:<br>
<pre>{raw}</pre>
"""
mail_admins(subject, message, html_message=html_message)
def send_signal_rate_limited_notice(self, message: str, plaintext: str) -> None:
email = self.project.owner.email
ctx = {
"recipient": self.phone.value,
"subject": plaintext.split("\n")[0],
"message": message,
"plaintext": plaintext,
}
emails.signal_rate_limited(email, ctx)
@property
def transport(self) -> transports.Transport:
if self.kind not in TRANSPORTS:
raise NotImplementedError(f"Unknown channel kind: {self.kind}")
_, cls = TRANSPORTS[self.kind]
return cls(self)
def notify(self, flip: Flip, is_test: bool = False) -> str:
if self.transport.is_noop(flip.new_status):
return "no-op"
n = Notification(channel=self)
if is_test:
# When sending a test notification we leave the owner field null.
# (the passed check is a dummy, unsaved Check instance)
pass
else:
n.owner = flip.owner
n.check_status = flip.new_status
n.error = "Sending"
n.save()
start, error, disabled = now(), "", self.disabled
try:
self.transport.notify(flip, notification=n)
except transports.TransportError as e:
disabled = True if e.permanent else disabled
error = e.message
Notification.objects.filter(id=n.id).update(error=error)
Channel.objects.filter(id=self.id).update(
last_notify=start,
last_notify_duration=now() - start,
last_error=error,
disabled=disabled,
)
return error
def icon_path(self) -> str:
return f"img/integrations/{self.kind}.png"
@property
def json(self) -> Any:
return json.loads(self.value)
@property
def po_priority(self) -> str:
assert self.kind == "po"
parts = self.value.split("|")
prio = int(parts[1])
return PO_PRIORITIES[prio]
def webhook_spec(self, status: str) -> WebhookSpec:
assert self.kind == "webhook"
assert status in ("up", "down")
doc = json.loads(self.value)
return WebhookSpec(
method=doc[f"method_{status}"],
url=doc[f"url_{status}"],
body=doc[f"body_{status}"],
headers=doc[f"headers_{status}"],
)
@property
def down_webhook_spec(self) -> WebhookSpec:
return self.webhook_spec("down")
@property
def up_webhook_spec(self) -> WebhookSpec:
return self.webhook_spec("up")
@property
def shell(self) -> ShellConf:
assert self.kind == "shell"
return ShellConf.model_validate_json(self.value)
@property
def slack_team(self) -> str | None:
assert self.kind == "slack"
if not self.value.startswith("{"):
return None
doc = json.loads(self.value)
if "team_name" in doc:
assert isinstance(doc["team_name"], str)
return doc["team_name"]
if "team" in doc:
assert isinstance(doc["team"]["name"], str)
return doc["team"]["name"]
return None
@property
def slack_channel(self) -> str | None:
assert self.kind == "slack"
if not self.value.startswith("{"):
return None
doc = json.loads(self.value)
v = doc["incoming_webhook"]["channel"]
assert isinstance(v, str)
return v
@property
def slack_webhook_url(self) -> str:
assert self.kind in ("slack", "mattermost")
if not self.value.startswith("{"):
return self.value
doc = json.loads(self.value)
v = doc["incoming_webhook"]["url"]
assert isinstance(v, str)
return v
@property
def discord_webhook_url(self) -> str:
assert self.kind == "discord"
url = self.json["webhook"]["url"]
assert isinstance(url, str)
# Discord migrated to discord.com,
# and is dropping support for discordapp.com on 7 November 2020
if url.startswith("https://discordapp.com/"):
url = "https://discord.com/" + url[23:]
return url
@property
def telegram(self) -> TelegramConf:
assert self.kind == "telegram"
return TelegramConf.model_validate_json(self.value)
def update_telegram_id(self, new_chat_id: int) -> None:
doc = json.loads(self.value)
doc["id"] = new_chat_id
self.value = json.dumps(doc)
self.save()
@property
def pd(self) -> PdConf:
assert self.kind == "pd"
return PdConf.load(self.value)
@property
def phone(self) -> PhoneConf:
assert self.kind in ("call", "sms", "whatsapp", "signal")
return PhoneConf.model_validate_json(self.value)
@property
def trello(self) -> TrelloConf:
assert self.kind == "trello"
return TrelloConf.model_validate_json(self.value, strict=True)
@property
def email(self) -> EmailConf:
return EmailConf.load(self.value)
@property
def opsgenie(self) -> OpsgenieConf:
return OpsgenieConf.model_validate_json(self.value)
@property
def zulip(self) -> ZulipConf:
return ZulipConf.model_validate_json(self.value)
@property
def linenotify_token(self) -> str:
assert self.kind == "linenotify"
return self.value
@property
def gotify(self) -> GotifyConf:
assert self.kind == "gotify"
return GotifyConf.model_validate_json(self.value, strict=True)
@property
def group_channels(self) -> QuerySet[Channel]:
assert self.kind == "group"
return Channel.objects.filter(
project=self.project, code__in=self.value.split(",")
)
@property
def ntfy(self) -> NtfyConf:
assert self.kind == "ntfy"
return NtfyConf.model_validate_json(self.value, strict=True)
class Notification(models.Model):
code = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
# owner is null for test notifications, produced by the "Test!" button
# in the Integrations page
owner = models.ForeignKey(Check, models.CASCADE, null=True)
check_status = models.CharField(max_length=6)
channel = models.ForeignKey(Channel, models.CASCADE)
created = models.DateTimeField(default=now)
error = models.CharField(max_length=200, blank=True)
class Meta:
get_latest_by = "created"
def status_url(self) -> str:
path = reverse("hc-api-notification-status", args=[self.code])
return settings.SITE_ROOT + path
class FlipDict(TypedDict):
timestamp: str
up: int
class Flip(models.Model):
owner = models.ForeignKey(Check, models.CASCADE)
created = models.DateTimeField()
processed = models.DateTimeField(null=True, blank=True)
old_status = models.CharField(max_length=8, choices=STATUSES)
new_status = models.CharField(max_length=8, choices=STATUSES)
reason = models.CharField(max_length=8, choices=REASONS, default="")
class Meta:
indexes = [
# For quickly looking up unprocessed flips.
# Used in the sendalerts management command.
models.Index(
fields=["processed"],
name="api_flip_not_processed",
condition=models.Q(processed=None),
)
]
def to_dict(self) -> FlipDict:
return {
"timestamp": self.created.replace(microsecond=0).isoformat(),
"up": 1 if self.new_status == "up" else 0,
}
def select_channels(self) -> list[Channel]:
"""Return a list of channels that need to be notified.
* Exclude all channels for new->up and paused->up transitions.
* Exclude disabled channels
* Exclude channels where transport.is_noop(status) returns True
* Sort channels by last_notify_duration (shorter durations first)
"""
# Don't send alerts on new->up and paused->up transitions
if self.new_status == "up" and self.old_status in ("new", "paused"):
return []
if self.new_status not in ("up", "down"):
raise NotImplementedError(f"Unexpected status: {self.new_status}")
q = self.owner.channel_set.exclude(disabled=True)
q = q.order_by(F("last_notify_duration").asc(nulls_last=True))
return [ch for ch in q if not ch.transport.is_noop(self.new_status)]
class TokenBucket(models.Model):
value = models.CharField(max_length=80, unique=True)
tokens = models.FloatField(default=1.0)
updated = models.DateTimeField(default=now)
@staticmethod
def authorize(value: str, capacity: int, refill_time_secs: int) -> bool:
frozen_now = now()
obj, created = TokenBucket.objects.get_or_create(value=value)
if not created:
# Top up the bucket:
duration_secs = (frozen_now - obj.updated).total_seconds()
obj.tokens = min(1.0, obj.tokens + duration_secs / refill_time_secs)
obj.tokens -= 1.0 / capacity
if obj.tokens < 0:
# Not enough tokens
return False
# Race condition: two concurrent authorize calls can overwrite each
# other's changes. It's OK to be a little inexact here for the sake
# of simplicity.
obj.updated = frozen_now
obj.save()
return True
@staticmethod
def authorize_auth_ip(request: HttpRequest) -> bool:
headers = request.META
remote_addr = headers.get("HTTP_X_FORWARDED_FOR", headers["REMOTE_ADDR"])
remote_addr = remote_addr.split(",")[0]
if "." in remote_addr and ":" in remote_addr:
# If remote_addr is in a ipv4address:port format
# (like in Azure App Service), remove the port:
remote_addr = remote_addr.split(":")[0]
value = f"auth-ip-{remote_addr}"
# 20 signup/login attempts for a single IP per hour:
return TokenBucket.authorize(value, 20, 3600)
@staticmethod
def authorize_login_email(email: str) -> bool:
# remove dots and alias:
mailbox, domain = email.split("@")
mailbox = mailbox.replace(".", "")
mailbox = mailbox.split("+")[0]
email = mailbox + "@" + domain
salted_encoded = (email + settings.SECRET_KEY).encode()
value = "em-%s" % hashlib.sha1(salted_encoded).hexdigest()
# 20 login attempts for a single email per hour:
return TokenBucket.authorize(value, 20, 3600)
@staticmethod
def authorize_invite(user: User) -> bool:
value = "invite-%d" % user.id
# 20 invites per day
return TokenBucket.authorize(value, 20, 3600 * 24)
@staticmethod
def authorize_login_password(email: str) -> bool:
salted_encoded = (email + settings.SECRET_KEY).encode()
value = "pw-%s" % hashlib.sha1(salted_encoded).hexdigest()
# 20 password attempts per day
return TokenBucket.authorize(value, 20, 3600 * 24)
@staticmethod
def authorize_telegram(telegram_id: int) -> bool:
value = "tg-%s" % telegram_id
# 6 messages for a single chat per minute:
return TokenBucket.authorize(value, 6, 60)
@staticmethod
def authorize_signal(phone: str) -> bool:
salted_encoded = (phone + settings.SECRET_KEY).encode()
value = "signal-%s" % hashlib.sha1(salted_encoded).hexdigest()
# 6 messages for a single recipient per minute:
return TokenBucket.authorize(value, 6, 60)
@staticmethod
def authorize_signal_verification(user: User) -> bool:
value = "signal-verify-%d" % user.id
# 50 signal recipient verifications per day
return TokenBucket.authorize(value, 50, 3600 * 24)
@staticmethod
def authorize_pushover(user_key: str) -> bool:
salted_encoded = (user_key + settings.SECRET_KEY).encode()
value = "po-%s" % hashlib.sha1(salted_encoded).hexdigest()
# 6 messages for a single user key per minute:
return TokenBucket.authorize(value, 6, 60)
@staticmethod
def authorize_sudo_code(user: User) -> bool:
value = "sudo-%d" % user.id
# 10 sudo attempts per day
return TokenBucket.authorize(value, 10, 3600 * 24)
@staticmethod
def authorize_totp_attempt(user: User) -> bool:
value = "totp-%d" % user.id
# 96 attempts per user per 24 hours
# (or, on average, one attempt per 15 minutes)
return TokenBucket.authorize(value, 96, 3600 * 24)
@staticmethod
def authorize_totp_code(user: User, code: str) -> bool:
value = "totpc-%d-%s" % (user.id, code)
# A code has a validity period of 3 * 30 = 90 seconds.
# During that period, allow the code to only be used once,
# so an eavesdropping attacker cannot reuse a code.
return TokenBucket.authorize(value, 1, 90)