mirror of
https://github.com/healthchecks/healthchecks.git
synced 2025-04-14 08:58:29 +00:00

The mitigation is to not attempt GetObject calls if there have been more than 3 S3 errors in the past minute. The implementation uses the TokenBucket class that we normally use for rate-limiting. An example scenario this is trying to avoid is: * the S3 service becomes unavailable for 10 straight minutes. Each S3 request hangs until we hit the configured timeout (settings.S3_TIMEOUT) * A client is frequently requesting the "Get ping's logged body" API call. Each call causes one webserver process to become busy for S3_TIMEOUT seconds. * All workers become busy, request backlog fills up, our service starts returning 5xx errors. With the mitigation, during an S3 outage, only the calls that retrieve ping's logged body will return 503, the rest of the service will (hopefully) work normally. Fixes: #1114
1374 lines
47 KiB
Python
1374 lines
47 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import socket
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from datetime import timedelta as td
|
|
from datetime import timezone
|
|
from typing import Any, TypedDict
|
|
from urllib.parse import urlencode
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from cronsim import CronSim
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import User
|
|
from django.contrib.humanize.templatetags.humanize import naturaltime
|
|
from django.core.mail import mail_admins
|
|
from django.core.signing import TimestampSigner
|
|
from django.db import models, transaction
|
|
from django.db.models import F, QuerySet
|
|
from django.http import HttpRequest
|
|
from django.urls import reverse
|
|
from django.utils.functional import cached_property
|
|
from django.utils.timezone import now
|
|
from oncalendar import OnCalendar
|
|
from pydantic import BaseModel, Field
|
|
|
|
from hc.accounts.models import Project
|
|
from hc.api import transports
|
|
from hc.lib import emails
|
|
from hc.lib.date import month_boundaries, seconds_in_month
|
|
from hc.lib.s3 import GetObjectError, get_object, put_object, remove_objects
|
|
from hc.lib.urls import absolute_reverse
|
|
|
|
STATUSES = (("up", "Up"), ("down", "Down"), ("new", "New"), ("paused", "Paused"))
|
|
DEFAULT_TIMEOUT = td(days=1)
|
|
DEFAULT_GRACE = td(hours=1)
|
|
NEVER = datetime(3000, 1, 1, tzinfo=timezone.utc)
|
|
CHECK_KINDS = (("simple", "Simple"), ("cron", "Cron"), ("oncalendar", "OnCalendar"))
|
|
# max time between start and ping where we will consider both events related:
|
|
MAX_DURATION = td(hours=72)
|
|
REASONS = (("", "Unknown"), ("timeout", "Timeout"), ("fail", "Fail signal"))
|
|
|
|
|
|
TRANSPORTS: dict[str, tuple[str, type[transports.Transport]]] = {
|
|
"apprise": ("Apprise", transports.Apprise),
|
|
"call": ("Phone Call", transports.Call),
|
|
"discord": ("Discord", transports.Discord),
|
|
"email": ("Email", transports.Email),
|
|
"gotify": ("Gotify", transports.Gotify),
|
|
"group": ("Group", transports.Group),
|
|
"linenotify": ("LINE Notify", transports.LineNotify),
|
|
"matrix": ("Matrix", transports.Matrix),
|
|
"mattermost": ("Mattermost", transports.Mattermost),
|
|
"msteams": ("MS Teams Connector (stops working Jan 2025)", transports.MsTeams),
|
|
"msteamsw": ("Microsoft Teams", transports.MsTeamsWorkflow),
|
|
"ntfy": ("ntfy", transports.Ntfy),
|
|
"opsgenie": ("Opsgenie", transports.Opsgenie),
|
|
"pagertree": ("PagerTree", transports.PagerTree),
|
|
"pd": ("PagerDuty", transports.PagerDuty),
|
|
"po": ("Pushover", transports.Pushover),
|
|
"pushbullet": ("Pushbullet", transports.Pushbullet),
|
|
"rocketchat": ("Rocket.Chat", transports.RocketChat),
|
|
"shell": ("Shell Command", transports.Shell),
|
|
"signal": ("Signal", transports.Signal),
|
|
"slack": ("Slack", transports.Slack),
|
|
"sms": ("SMS", transports.Sms),
|
|
"spike": ("Spike", transports.Spike),
|
|
"telegram": ("Telegram", transports.Telegram),
|
|
"trello": ("Trello", transports.Trello),
|
|
"victorops": ("Splunk On-Call", transports.VictorOps),
|
|
"webhook": ("Webhook", transports.Webhook),
|
|
"whatsapp": ("WhatsApp", transports.WhatsApp),
|
|
"zulip": ("Zulip", transports.Zulip),
|
|
}
|
|
|
|
|
|
CHANNEL_KINDS = [(kind, label_cls[0]) for kind, label_cls in TRANSPORTS.items()]
|
|
|
|
PO_PRIORITIES = {
|
|
-3: "disabled",
|
|
-2: "lowest",
|
|
-1: "low",
|
|
0: "normal",
|
|
1: "high",
|
|
2: "emergency",
|
|
}
|
|
|
|
NTFY_PRIORITIES = {
|
|
5: "max",
|
|
4: "high",
|
|
3: "default",
|
|
2: "low",
|
|
1: "min",
|
|
0: "disabled",
|
|
}
|
|
|
|
|
|
def isostring(dt: datetime | None) -> str | None:
|
|
"""Convert the datetime to ISO 8601 format with no microseconds."""
|
|
return dt.replace(microsecond=0).isoformat() if dt else None
|
|
|
|
|
|
class CheckDict(TypedDict, total=False):
|
|
uuid: str | None
|
|
name: str
|
|
slug: str
|
|
tags: str
|
|
desc: str
|
|
grace: int
|
|
n_pings: int
|
|
status: str
|
|
started: bool
|
|
last_ping: str | None
|
|
next_ping: str | None
|
|
manual_resume: bool
|
|
methods: str
|
|
subject: str
|
|
subject_fail: str
|
|
start_kw: str
|
|
success_kw: str
|
|
failure_kw: str
|
|
filter_subject: bool
|
|
filter_body: bool
|
|
badge_url: str
|
|
last_duration: int
|
|
unique_key: str
|
|
ping_url: str
|
|
update_url: str
|
|
pause_url: str
|
|
resume_url: str
|
|
channels: str
|
|
timeout: int
|
|
schedule: str
|
|
tz: str
|
|
|
|
|
|
@dataclass
|
|
class DowntimeRecord:
|
|
boundary: datetime # The start of this time interval (timezone-aware)
|
|
tz: str # For calculating total seconds in a month
|
|
no_data: bool # True if the check did not yet exist in this time interval
|
|
duration: td # Total downtime in this time interval
|
|
count: int # The number of downtime events in this time interval
|
|
|
|
def monthly_uptime(self) -> float:
|
|
# NB: this method assumes monthly boundaries.
|
|
# It will yield incorrect results for weekly boundaries
|
|
max_seconds = seconds_in_month(self.boundary.date(), self.tz)
|
|
up_seconds = max_seconds - self.duration.total_seconds()
|
|
return up_seconds / max_seconds
|
|
|
|
|
|
class DowntimeRecorder:
|
|
def __init__(self, boundaries: list[datetime], tz: str, created: datetime) -> None:
|
|
"""
|
|
`boundaries` is a list of timezone-aware datetimes of the starts of time
|
|
intervals (months or weeks), and should be pre-sorted in descending order.
|
|
"""
|
|
self.records = []
|
|
prev_boundary = None
|
|
for b in boundaries:
|
|
# If the check was created *after* the start of the previous time
|
|
# interval then the check did not yet exist during this time interval:
|
|
no_data = prev_boundary is not None and created > prev_boundary
|
|
self.records.append(DowntimeRecord(b, tz, no_data, td(), 0))
|
|
prev_boundary = b
|
|
|
|
def add(self, when: datetime, duration: td) -> None:
|
|
for record in self.records:
|
|
if when >= record.boundary:
|
|
record.duration += duration
|
|
record.count += 1
|
|
return
|
|
|
|
|
|
class Check(models.Model):
|
|
name = models.CharField(max_length=100, blank=True)
|
|
slug = models.CharField(max_length=100, blank=True)
|
|
tags = models.CharField(max_length=500, blank=True)
|
|
code = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
|
|
desc = models.TextField(blank=True)
|
|
project = models.ForeignKey(Project, models.CASCADE)
|
|
created = models.DateTimeField(default=now)
|
|
kind = models.CharField(max_length=10, default="simple", choices=CHECK_KINDS)
|
|
timeout = models.DurationField(default=DEFAULT_TIMEOUT)
|
|
grace = models.DurationField(default=DEFAULT_GRACE)
|
|
schedule = models.CharField(max_length=100, default="* * * * *")
|
|
tz = models.CharField(max_length=36, default="UTC")
|
|
filter_subject = models.BooleanField(default=False)
|
|
filter_body = models.BooleanField(default=False)
|
|
start_kw = models.CharField(max_length=200, blank=True)
|
|
success_kw = models.CharField(max_length=200, blank=True)
|
|
failure_kw = models.CharField(max_length=200, blank=True)
|
|
methods = models.CharField(max_length=30, blank=True)
|
|
manual_resume = models.BooleanField(default=False)
|
|
badge_key = models.UUIDField(default=uuid.uuid4, unique=True)
|
|
|
|
n_pings = models.IntegerField(default=0)
|
|
last_ping = models.DateTimeField(null=True, blank=True)
|
|
last_start = models.DateTimeField(null=True, blank=True)
|
|
last_start_rid = models.UUIDField(null=True)
|
|
last_duration = models.DurationField(null=True, blank=True)
|
|
has_confirmation_link = models.BooleanField(default=False)
|
|
alert_after = models.DateTimeField(null=True, blank=True, editable=False)
|
|
status = models.CharField(max_length=6, choices=STATUSES, default="new")
|
|
|
|
class Meta:
|
|
indexes = [
|
|
# Index for the alert_after field. Exclude rows with status=down.
|
|
# Used in the sendalerts management command.
|
|
models.Index(
|
|
fields=["alert_after"],
|
|
name="api_check_aa_not_down",
|
|
condition=~models.Q(status="down"),
|
|
),
|
|
models.Index(fields=["project_id", "slug"], name="api_check_project_slug"),
|
|
]
|
|
|
|
def __str__(self) -> str:
|
|
return "%s (%d)" % (self.name or self.code, self.id)
|
|
|
|
def name_then_code(self) -> str:
|
|
if self.name:
|
|
return self.name
|
|
|
|
return str(self.code)
|
|
|
|
def url(self) -> str | None:
|
|
"""Return check's ping url in user's preferred style.
|
|
|
|
Note: this method reads self.project. If project is not loaded already,
|
|
this causes a SQL query.
|
|
|
|
"""
|
|
|
|
if self.project_id and self.project.show_slugs:
|
|
if not self.slug:
|
|
return None
|
|
|
|
# If ping_key is not set, use dummy placeholder
|
|
key = self.project.ping_key or "{ping_key}"
|
|
return settings.PING_ENDPOINT + key + "/" + self.slug
|
|
|
|
return settings.PING_ENDPOINT + str(self.code)
|
|
|
|
def details_url(self, full: bool = True) -> str:
|
|
if not full:
|
|
return reverse("hc-details", args=[self.code])
|
|
return absolute_reverse("hc-details", args=[self.code])
|
|
|
|
def get_absolute_url(self) -> str:
|
|
return self.details_url(full=False)
|
|
|
|
def cloaked_url(self) -> str:
|
|
return absolute_reverse("hc-uncloak", args=[self.unique_key])
|
|
|
|
def email(self) -> str:
|
|
return "%s@%s" % (self.code, settings.PING_EMAIL_DOMAIN)
|
|
|
|
def clamped_last_duration(self) -> td | None:
|
|
if self.last_duration and self.last_duration < MAX_DURATION:
|
|
return self.last_duration
|
|
return None
|
|
|
|
def get_grace_start(self, *, with_started: bool = True) -> datetime | None:
|
|
"""Return the datetime when the grace period starts.
|
|
|
|
If the check is currently new, paused or down, return None.
|
|
"""
|
|
# NEVER is a constant sentinel value (year 3000).
|
|
# Using None instead would make the min() logic clunky.
|
|
result = NEVER
|
|
|
|
if self.kind == "simple" and self.status == "up":
|
|
assert self.last_ping is not None
|
|
result = self.last_ping + self.timeout
|
|
elif self.kind == "cron" and self.status == "up":
|
|
assert self.last_ping is not None
|
|
# The complex case, next ping is expected based on cron schedule.
|
|
# Don't convert to naive datetimes (and so avoid ambiguities around
|
|
# DST transitions). cronsim will handle the timezone-aware datetimes.
|
|
last_local = self.last_ping.astimezone(ZoneInfo(self.tz))
|
|
result = next(CronSim(self.schedule, last_local))
|
|
# Important: convert from the local timezone back to UTC.
|
|
# If the result is kept in the local timezone, adding
|
|
# a timedelta to it later (in `going_down_after` and in `get_status`)
|
|
# may yield incorrect results during DST transitions.
|
|
result = result.astimezone(timezone.utc)
|
|
elif self.kind == "oncalendar" and self.status == "up":
|
|
assert self.last_ping is not None
|
|
last_local = self.last_ping.astimezone(ZoneInfo(self.tz))
|
|
try:
|
|
result = next(OnCalendar(self.schedule, last_local))
|
|
# Same as for cron, convert back to UTC:
|
|
result = result.astimezone(timezone.utc)
|
|
except StopIteration:
|
|
result = NEVER
|
|
|
|
if with_started and self.last_start and self.status != "down":
|
|
result = min(result, self.last_start)
|
|
|
|
return result if result != NEVER else None
|
|
|
|
def going_down_after(self) -> datetime | None:
|
|
"""Return the datetime when the check goes down.
|
|
|
|
If the check is new or paused, and not currently running, return None.
|
|
If the check is already down, also return None.
|
|
"""
|
|
grace_start = self.get_grace_start()
|
|
if grace_start is not None:
|
|
return grace_start + self.grace
|
|
|
|
return None
|
|
|
|
@cached_property
|
|
def cached_status(self) -> str:
|
|
return self.get_status()
|
|
|
|
def get_status(self, *, with_started: bool = False) -> str:
|
|
"""Return current status for display."""
|
|
frozen_now = now()
|
|
|
|
if self.last_start:
|
|
if frozen_now >= self.last_start + self.grace:
|
|
return "down"
|
|
elif with_started:
|
|
return "started"
|
|
|
|
if self.status in ("new", "paused", "down"):
|
|
return self.status
|
|
|
|
grace_start = self.get_grace_start(with_started=False)
|
|
if grace_start is None:
|
|
# next elapse is "never", so this check will stay up indefinitely
|
|
return "up"
|
|
|
|
grace_end = grace_start + self.grace
|
|
if frozen_now >= grace_end:
|
|
return "down"
|
|
|
|
if frozen_now >= grace_start:
|
|
return "grace"
|
|
|
|
return "up"
|
|
|
|
def lock_and_delete(self) -> None:
|
|
"""Acquire a DB lock for this check, then delete the check.
|
|
|
|
Without the lock the delete can fail, if the check gets pinged while it is
|
|
in the process of deletion.
|
|
"""
|
|
with transaction.atomic():
|
|
Check.objects.select_for_update().filter(id=self.id).delete()
|
|
|
|
def assign_all_channels(self) -> None:
|
|
channels = Channel.objects.filter(project=self.project)
|
|
self.channel_set.set(channels)
|
|
|
|
def tags_list(self) -> list[str]:
|
|
return [t.strip() for t in self.tags.split(" ") if t.strip()]
|
|
|
|
def matches_tag_set(self, tag_set: set[str]) -> bool:
|
|
return tag_set.issubset(self.tags_list())
|
|
|
|
def channels_str(self) -> str:
|
|
"""Return a comma-separated string of assigned channel codes."""
|
|
|
|
# Is this an unsaved instance?
|
|
if not self.id:
|
|
return ""
|
|
|
|
# self.channel_set may already be prefetched.
|
|
# Sort in python to make sure we don't run additional queries
|
|
codes = [str(channel.code) for channel in self.channel_set.all()]
|
|
return ",".join(sorted(codes))
|
|
|
|
@property
|
|
def unique_key(self) -> str:
|
|
code_half = self.code.hex[:16]
|
|
return hashlib.sha1(code_half.encode()).hexdigest()
|
|
|
|
def to_dict(self, *, readonly: bool = False, v: int = 3) -> CheckDict:
|
|
with_started = v == 1
|
|
result: CheckDict = {
|
|
"name": self.name,
|
|
"slug": self.slug,
|
|
"tags": self.tags,
|
|
"desc": self.desc,
|
|
"grace": int(self.grace.total_seconds()),
|
|
"n_pings": self.n_pings,
|
|
"status": self.get_status(with_started=with_started),
|
|
"started": self.last_start is not None,
|
|
"last_ping": isostring(self.last_ping),
|
|
"next_ping": isostring(self.get_grace_start()),
|
|
"manual_resume": self.manual_resume,
|
|
"methods": self.methods,
|
|
"subject": self.success_kw if self.filter_subject else "",
|
|
"subject_fail": self.failure_kw if self.filter_subject else "",
|
|
"start_kw": self.start_kw,
|
|
"success_kw": self.success_kw,
|
|
"failure_kw": self.failure_kw,
|
|
"filter_subject": self.filter_subject,
|
|
"filter_body": self.filter_body,
|
|
# Optimization: construct badge URLs manually instead of using reverse().
|
|
# This is significantly quicker when returning hundreds of checks.
|
|
"badge_url": f"{settings.SITE_ROOT}/b/2/{self.badge_key}.svg",
|
|
}
|
|
|
|
if self.last_duration:
|
|
result["last_duration"] = int(self.last_duration.total_seconds())
|
|
|
|
if readonly:
|
|
result["unique_key"] = self.unique_key
|
|
else:
|
|
result["uuid"] = str(self.code)
|
|
result["ping_url"] = settings.PING_ENDPOINT + str(self.code)
|
|
|
|
# Optimization: construct API URLs manually instead of using reverse().
|
|
# This is significantly quicker when returning hundreds of checks.
|
|
update_url = f"{settings.SITE_ROOT}/api/v{v}/checks/{self.code}"
|
|
result["update_url"] = update_url
|
|
result["pause_url"] = update_url + "/pause"
|
|
result["resume_url"] = update_url + "/resume"
|
|
result["channels"] = self.channels_str()
|
|
|
|
if self.kind == "simple":
|
|
result["timeout"] = int(self.timeout.total_seconds())
|
|
elif self.kind in ("cron", "oncalendar"):
|
|
result["schedule"] = self.schedule
|
|
result["tz"] = self.tz
|
|
|
|
return result
|
|
|
|
def ping(
|
|
self,
|
|
remote_addr: str,
|
|
scheme: str,
|
|
method: str,
|
|
ua: str,
|
|
body: bytes,
|
|
action: str,
|
|
rid: uuid.UUID | None,
|
|
exitstatus: int | None = None,
|
|
) -> None:
|
|
# The following block updates a Check object, then creates a Ping object.
|
|
# There's a possible race condition where the "sendalerts" command sees
|
|
# the updated Check object before the Ping object is created.
|
|
# To avoid this, put both operations inside a transaction:
|
|
with transaction.atomic():
|
|
# Acquire a lock. Without locking, on MariaDB, concurrent pings can
|
|
# lead to a deadlock
|
|
self = Check.objects.select_for_update().get(id=self.id)
|
|
frozen_now = now()
|
|
|
|
if self.status == "paused" and self.manual_resume:
|
|
action = "ign"
|
|
|
|
if action == "start":
|
|
self.last_start = frozen_now
|
|
self.last_start_rid = rid
|
|
# Don't update "last_ping" field.
|
|
elif action == "ign":
|
|
pass
|
|
elif action == "log":
|
|
pass
|
|
else:
|
|
self.last_ping = frozen_now
|
|
self.last_duration = None
|
|
if self.last_start:
|
|
if self.last_start_rid == rid:
|
|
# rid matches: calculate last_duration, clear last_start
|
|
self.last_duration = self.last_ping - self.last_start
|
|
self.last_start = None
|
|
elif action == "fail" or rid is None:
|
|
# clear last_start (exit the "running" state) on:
|
|
# - "success" event with no rid
|
|
# - "fail" event, regardless of rid mismatch
|
|
self.last_start = None
|
|
|
|
new_status = "down" if action == "fail" else "up"
|
|
if self.status != new_status:
|
|
reason = "fail" if action == "fail" else ""
|
|
self.create_flip(new_status, reason=reason)
|
|
self.status = new_status
|
|
|
|
self.alert_after = self.going_down_after()
|
|
self.n_pings = models.F("n_pings") + 1
|
|
body_lowercase = body.decode(errors="replace").lower()
|
|
self.has_confirmation_link = "confirm" in body_lowercase
|
|
self.save()
|
|
self.refresh_from_db()
|
|
|
|
ping = Ping(owner=self)
|
|
ping.n = self.n_pings
|
|
ping.created = frozen_now
|
|
if action in ("start", "fail", "ign", "log"):
|
|
ping.kind = action
|
|
|
|
ping.remote_addr = remote_addr
|
|
ping.scheme = scheme
|
|
ping.method = method
|
|
# If User-Agent is longer than 200 characters, truncate it:
|
|
ping.ua = ua[:200]
|
|
if len(body) > 100 and settings.S3_BUCKET:
|
|
ping.object_size = len(body)
|
|
else:
|
|
ping.body_raw = body
|
|
ping.rid = rid
|
|
ping.exitstatus = exitstatus
|
|
ping.save()
|
|
|
|
# Upload ping body to S3 outside the DB transaction, because this operation
|
|
# can potentially take a long time:
|
|
if ping.object_size:
|
|
put_object(self.code, ping.n, body)
|
|
|
|
# Every 100 received pings, prune old pings and notifications:
|
|
if self.n_pings % 100 == 0:
|
|
self.prune()
|
|
|
|
def prune(self, wait: bool = False) -> None:
|
|
"""Remove old pings and notifications."""
|
|
|
|
threshold = self.n_pings - self.project.owner_profile.ping_log_limit
|
|
|
|
# Remove ping bodies from object storage
|
|
if settings.S3_BUCKET:
|
|
remove_objects(str(self.code), threshold, wait=wait)
|
|
|
|
# Remove ping objects from db
|
|
self.ping_set.filter(n__lte=threshold).delete()
|
|
|
|
try:
|
|
# Important: sort by "created", not by "id". Sorting by id
|
|
# may cause Postgres to use the "api_ping_pkey" index, and scan
|
|
# a huge number of rows.
|
|
ping = self.ping_set.earliest("created")
|
|
|
|
# Delete notifications older than the oldest retained ping
|
|
self.notification_set.filter(created__lt=ping.created).delete()
|
|
|
|
# Delete flips older than the oldest retained ping *and*
|
|
# older than 93 days. We need ~3 months of flips for calculating
|
|
# downtime statistics. The precise requirement is
|
|
# "we need the current month and full two previous months of data".
|
|
# We could calculate this precisely, but 3*31 is close enough and
|
|
# much simpler.
|
|
flip_threshold = min(ping.created, now() - td(days=93))
|
|
self.flip_set.filter(created__lt=flip_threshold).delete()
|
|
except Ping.DoesNotExist:
|
|
pass
|
|
|
|
@property
|
|
def visible_pings(self) -> QuerySet[Ping]:
|
|
threshold = self.n_pings - self.project.owner_profile.ping_log_limit
|
|
return self.ping_set.filter(n__gt=threshold)
|
|
|
|
def downtimes_by_boundary(
|
|
self, boundaries: list[datetime], tz: str
|
|
) -> list[DowntimeRecord]:
|
|
"""Calculate downtime counts and durations for the given time intervals.
|
|
|
|
Returns a list of DowntimeRecord instances in descending datetime order.
|
|
|
|
`boundaries` are timezone-aware datetimes of the first days of time intervals
|
|
(months or weeks), and should be pre-sorted in descending order.
|
|
|
|
"""
|
|
|
|
summary = DowntimeRecorder(boundaries, tz, self.created)
|
|
|
|
# A list of flips and time interval boundaries
|
|
events = [(b, "---") for b in boundaries]
|
|
q = self.flip_set.filter(created__gt=min(boundaries))
|
|
for pair in q.values_list("created", "old_status"):
|
|
events.append(pair)
|
|
|
|
# Iterate through flips and boundaries,
|
|
# and for each "down" event increase the counters in `totals`.
|
|
dt, status = now(), self.status
|
|
for prev_dt, prev_status in sorted(events, reverse=True):
|
|
if status == "down":
|
|
# Before subtracting datetimes convert them to UTC.
|
|
# Otherwise we will get incorrect results around DST transitions:
|
|
delta = dt.astimezone(timezone.utc) - prev_dt.astimezone(timezone.utc)
|
|
summary.add(prev_dt, delta)
|
|
|
|
dt = prev_dt
|
|
if prev_status != "---":
|
|
status = prev_status
|
|
|
|
return summary.records
|
|
|
|
def downtimes(self, months: int, tz: str) -> list[DowntimeRecord]:
|
|
boundaries = month_boundaries(months, tz)
|
|
return self.downtimes_by_boundary(boundaries, tz)
|
|
|
|
def create_flip(
|
|
self, new_status: str, reason: str = "", mark_as_processed: bool = False
|
|
) -> None:
|
|
"""Create a Flip object for this check.
|
|
|
|
Flip objects record check status changes, and have two uses:
|
|
- for sending notifications asynchronously (create a flip object in
|
|
wwww process, a separate "sendalerts" process picks it up and processes it)
|
|
- for downtime statistics calculation. The Check.downtimes() method
|
|
analyzes the flips and calculates downtime counts and durations per
|
|
month.
|
|
"""
|
|
|
|
flip = Flip(owner=self)
|
|
flip.created = now()
|
|
if mark_as_processed:
|
|
flip.processed = flip.created
|
|
flip.old_status = self.status
|
|
flip.new_status = new_status
|
|
flip.reason = reason
|
|
flip.save()
|
|
|
|
|
|
class PingDict(TypedDict, total=False):
|
|
type: str
|
|
date: str
|
|
n: int | None
|
|
scheme: str
|
|
remote_addr: str | None
|
|
method: str
|
|
ua: str
|
|
rid: uuid.UUID | None
|
|
duration: float
|
|
body_url: str | None
|
|
|
|
|
|
class Ping(models.Model):
|
|
id = models.BigAutoField(primary_key=True)
|
|
n = models.IntegerField(null=True)
|
|
owner = models.ForeignKey(Check, models.CASCADE)
|
|
created = models.DateTimeField(default=now)
|
|
kind = models.CharField(max_length=6, blank=True, null=True)
|
|
scheme = models.CharField(max_length=10, default="http")
|
|
remote_addr = models.GenericIPAddressField(blank=True, null=True)
|
|
method = models.CharField(max_length=10, blank=True)
|
|
ua = models.CharField(max_length=200, blank=True)
|
|
body_raw = models.BinaryField(null=True)
|
|
object_size = models.IntegerField(null=True)
|
|
exitstatus = models.SmallIntegerField(null=True)
|
|
rid = models.UUIDField(null=True)
|
|
|
|
class GetBodyError(Exception):
|
|
pass
|
|
|
|
def to_dict(self) -> PingDict:
|
|
if self.has_body():
|
|
args = [self.owner.code, self.n]
|
|
body_url = absolute_reverse("hc-api-ping-body", args=args)
|
|
else:
|
|
body_url = None
|
|
|
|
result: PingDict = {
|
|
"type": self.kind or "success",
|
|
"date": self.created.isoformat(),
|
|
"n": self.n,
|
|
"scheme": self.scheme,
|
|
"remote_addr": self.remote_addr,
|
|
"method": self.method,
|
|
"ua": self.ua,
|
|
"rid": self.rid,
|
|
"body_url": body_url,
|
|
}
|
|
|
|
duration = self.duration
|
|
if duration is not None:
|
|
result["duration"] = duration.total_seconds()
|
|
|
|
return result
|
|
|
|
def has_body(self) -> bool:
|
|
if self.body_raw or self.object_size:
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_body_bytes(self) -> bytes | None:
|
|
if self.object_size and self.n:
|
|
# Do not attemt to touch S3 if we have recorded more than 3
|
|
# errors (503 responses, request timeouts) in the last minute
|
|
# when accessing S3.
|
|
# If we don't do this, a S3 outage can clog our requests handlers and
|
|
# cause a bigger issue.
|
|
if not TokenBucket.s3_is_healthy():
|
|
raise self.GetBodyError()
|
|
|
|
try:
|
|
return get_object(str(self.owner.code), self.n)
|
|
except GetObjectError:
|
|
# If S3 access resulted in error, record this fact:
|
|
TokenBucket.record_s3_get_object_error()
|
|
raise self.GetBodyError()
|
|
|
|
if self.body_raw:
|
|
return self.body_raw
|
|
|
|
return None
|
|
|
|
def get_body(self) -> str | None:
|
|
try:
|
|
body_bytes = self.get_body_bytes()
|
|
except self.GetBodyError:
|
|
return None
|
|
|
|
if body_bytes:
|
|
return bytes(body_bytes).decode(errors="replace")
|
|
|
|
return None
|
|
|
|
def get_body_size(self) -> int:
|
|
if self.body_raw:
|
|
return len(self.body_raw)
|
|
if self.object_size:
|
|
return self.object_size
|
|
return 0
|
|
|
|
def get_kind_display(self) -> str:
|
|
if self.kind == "ign":
|
|
return "Ignored"
|
|
if self.kind == "fail":
|
|
if self.exitstatus:
|
|
return f"Exit status {self.exitstatus}"
|
|
return "Failure"
|
|
if self.kind == "start":
|
|
return "Start"
|
|
if self.kind == "log":
|
|
return "Log"
|
|
|
|
return "Success"
|
|
|
|
@cached_property
|
|
def duration(self) -> td | None:
|
|
# Return early if this is not a success or failure ping,
|
|
# or if this is the very first ping:
|
|
if self.kind not in (None, "", "fail") or self.n == 1:
|
|
return None
|
|
|
|
pings = Ping.objects.filter(owner=self.owner_id)
|
|
# only look backwards but don't look further than MAX_DURATION in the past
|
|
pings = pings.filter(id__lt=self.id, created__gte=self.created - MAX_DURATION)
|
|
|
|
# Look for a "start" event, with no success/fail event in between:
|
|
for ping in pings.order_by("-id").only("created", "kind", "rid"):
|
|
if ping.kind == "start" and ping.rid == self.rid:
|
|
return self.created - ping.created
|
|
elif ping.kind in (None, "", "fail") and ping.rid == self.rid:
|
|
return None
|
|
|
|
return None
|
|
|
|
def formatted_kind_created(self) -> str:
|
|
"""Return a string in "Success, 10 minutes" form."""
|
|
# xa0 is non-breaking spaces, we want regular spaces
|
|
created_str = naturaltime(self.created).replace("\xa0", " ")
|
|
return f"{self.get_kind_display()}, {created_str}"
|
|
|
|
|
|
class WebhookSpec(BaseModel):
|
|
method: str
|
|
url: str
|
|
body: str
|
|
headers: dict[str, str]
|
|
|
|
|
|
class TelegramConf(BaseModel):
|
|
id: int
|
|
thread_id: int | None = None
|
|
type: str | None = None
|
|
name: str | None = None
|
|
|
|
|
|
class ShellConf(BaseModel):
|
|
cmd_down: str
|
|
cmd_up: str
|
|
|
|
|
|
class PdConf(BaseModel):
|
|
service_key: str
|
|
account: str | None = None
|
|
|
|
@classmethod
|
|
def load(cls, data: Any) -> PdConf:
|
|
# Is it plain service_key value?
|
|
if not data.startswith("{"):
|
|
return cls.model_validate({"service_key": data})
|
|
|
|
return super().model_validate_json(data)
|
|
|
|
|
|
class PhoneConf(BaseModel):
|
|
value: str
|
|
notify_up: bool | None = Field(None, alias="up")
|
|
notify_down: bool | None = Field(None, alias="down")
|
|
|
|
|
|
class EmailConf(BaseModel):
|
|
value: str
|
|
notify_up: bool = Field(alias="up")
|
|
notify_down: bool = Field(alias="down")
|
|
|
|
@classmethod
|
|
def load(cls, data: Any) -> EmailConf:
|
|
# Is it a plain email address?
|
|
if not data.startswith("{"):
|
|
return cls.model_validate({"value": data, "up": True, "down": True})
|
|
|
|
return super().model_validate_json(data)
|
|
|
|
|
|
class OpsgenieConf(BaseModel):
|
|
key: str
|
|
region: str
|
|
|
|
|
|
class ZulipConf(BaseModel):
|
|
bot_email: str
|
|
api_key: str
|
|
mtype: str
|
|
to: str
|
|
site: str = ""
|
|
topic: str = ""
|
|
|
|
def model_post_init(self, context: Any) -> None:
|
|
if self.site == "":
|
|
# Fallback if we don't have the site value:
|
|
# derive it from bot's email
|
|
_, domain = self.bot_email.split("@")
|
|
self.site = f"https://{domain}"
|
|
|
|
|
|
class NtfyConf(BaseModel):
|
|
topic: str
|
|
url: str
|
|
priority: int
|
|
priority_up: int
|
|
token: str = ""
|
|
|
|
@property
|
|
def priority_display(self) -> str:
|
|
return NTFY_PRIORITIES[self.priority]
|
|
|
|
|
|
class TrelloConf(BaseModel):
|
|
token: str
|
|
list_id: str
|
|
board_name: str
|
|
list_name: str
|
|
|
|
|
|
class GotifyConf(BaseModel):
|
|
url: str
|
|
token: str
|
|
|
|
|
|
class Channel(models.Model):
|
|
name = models.CharField(max_length=100, blank=True)
|
|
code = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
|
|
project = models.ForeignKey(Project, models.CASCADE)
|
|
created = models.DateTimeField(default=now)
|
|
kind = models.CharField(max_length=20, choices=CHANNEL_KINDS)
|
|
value = models.TextField(blank=True)
|
|
email_verified = models.BooleanField(default=False)
|
|
disabled = models.BooleanField(default=False)
|
|
last_notify = models.DateTimeField(null=True, blank=True)
|
|
last_notify_duration = models.DurationField(null=True, blank=True)
|
|
last_error = models.CharField(max_length=200, blank=True)
|
|
checks = models.ManyToManyField(Check)
|
|
|
|
def __str__(self) -> str:
|
|
if self.name:
|
|
return self.name
|
|
if self.kind == "email":
|
|
return f"Email to {self.email.value}"
|
|
elif self.kind == "sms":
|
|
return f"SMS to {self.phone.value}"
|
|
elif self.kind == "slack":
|
|
return f"Slack {self.slack_channel}"
|
|
elif self.kind == "telegram":
|
|
return f"Telegram {self.telegram.name}"
|
|
elif self.kind == "zulip":
|
|
if self.zulip.mtype == "stream":
|
|
return f"Zulip stream {self.zulip.to}"
|
|
if self.zulip.mtype == "private":
|
|
return f"Zulip user {self.zulip.to}"
|
|
|
|
return self.get_kind_display()
|
|
|
|
def to_dict(self) -> dict[str, str]:
|
|
return {"id": str(self.code), "name": self.name, "kind": self.kind}
|
|
|
|
def is_editable(self) -> bool:
|
|
return self.kind in (
|
|
"email",
|
|
"webhook",
|
|
"sms",
|
|
"signal",
|
|
"whatsapp",
|
|
"ntfy",
|
|
"group",
|
|
)
|
|
|
|
def assign_all_checks(self) -> None:
|
|
checks = Check.objects.filter(project=self.project)
|
|
self.checks.add(*checks)
|
|
|
|
def make_token(self) -> str:
|
|
seed = "%s%s" % (self.code, settings.SECRET_KEY)
|
|
seed_bytes = seed.encode()
|
|
return hashlib.sha1(seed_bytes).hexdigest()
|
|
|
|
def send_verify_link(self) -> None:
|
|
args = [self.code, self.make_token()]
|
|
verify_link = absolute_reverse("hc-verify-email", args=args)
|
|
emails.verify_email(self.email.value, {"verify_link": verify_link})
|
|
|
|
def get_unsub_link(self) -> str:
|
|
signer = TimestampSigner(salt="alerts")
|
|
signed_token = signer.sign(self.make_token())
|
|
args = [self.code, signed_token]
|
|
return absolute_reverse("hc-unsubscribe-alerts", args=args)
|
|
|
|
def send_signal_captcha_alert(self, challenge: str, raw: str) -> None:
|
|
subject = "Signal CAPTCHA proof required"
|
|
message = f"Challenge token: {challenge}"
|
|
hostname = socket.gethostname()
|
|
submit_url = absolute_reverse("hc-signal-captcha")
|
|
submit_url += "?" + urlencode({"host": hostname, "challenge": challenge})
|
|
html_message = f"""
|
|
On host <b>{hostname}</b>, run:<br>
|
|
<pre>manage.py submitchallenge {challenge} CAPTCHA-SOLUTION-HERE</pre><br>
|
|
<br>
|
|
Alternatively, <a href="{submit_url}">submit CAPTCHA solution here</a>.<br>
|
|
<br>
|
|
Message from Signal:<br>
|
|
<pre>{raw}</pre>
|
|
"""
|
|
mail_admins(subject, message, html_message=html_message)
|
|
|
|
def send_signal_rate_limited_notice(self, message: str, plaintext: str) -> None:
|
|
email = self.project.owner.email
|
|
ctx = {
|
|
"recipient": self.phone.value,
|
|
"subject": plaintext.split("\n")[0],
|
|
"message": message,
|
|
"plaintext": plaintext,
|
|
}
|
|
emails.signal_rate_limited(email, ctx)
|
|
|
|
@property
|
|
def transport(self) -> transports.Transport:
|
|
if self.kind not in TRANSPORTS:
|
|
raise NotImplementedError(f"Unknown channel kind: {self.kind}")
|
|
|
|
_, cls = TRANSPORTS[self.kind]
|
|
return cls(self)
|
|
|
|
def notify(self, flip: Flip, is_test: bool = False) -> str:
|
|
if self.transport.is_noop(flip.new_status):
|
|
return "no-op"
|
|
|
|
n = Notification(channel=self)
|
|
if is_test:
|
|
# When sending a test notification we leave the owner field null.
|
|
# (the passed check is a dummy, unsaved Check instance)
|
|
pass
|
|
else:
|
|
n.owner = flip.owner
|
|
|
|
n.check_status = flip.new_status
|
|
n.error = "Sending"
|
|
n.save()
|
|
|
|
start, error, disabled = now(), "", self.disabled
|
|
try:
|
|
self.transport.notify(flip, notification=n)
|
|
|
|
except transports.TransportError as e:
|
|
disabled = True if e.permanent else disabled
|
|
error = e.message
|
|
|
|
Notification.objects.filter(id=n.id).update(error=error)
|
|
Channel.objects.filter(id=self.id).update(
|
|
last_notify=start,
|
|
last_notify_duration=now() - start,
|
|
last_error=error,
|
|
disabled=disabled,
|
|
)
|
|
|
|
return error
|
|
|
|
def icon_path(self) -> str:
|
|
return f"img/integrations/{self.kind}.png"
|
|
|
|
@property
|
|
def json(self) -> Any:
|
|
return json.loads(self.value)
|
|
|
|
@property
|
|
def po_priority(self) -> str:
|
|
assert self.kind == "po"
|
|
parts = self.value.split("|")
|
|
prio = int(parts[1])
|
|
return PO_PRIORITIES[prio]
|
|
|
|
def webhook_spec(self, status: str) -> WebhookSpec:
|
|
assert self.kind == "webhook"
|
|
assert status in ("up", "down")
|
|
|
|
doc = json.loads(self.value)
|
|
return WebhookSpec(
|
|
method=doc[f"method_{status}"],
|
|
url=doc[f"url_{status}"],
|
|
body=doc[f"body_{status}"],
|
|
headers=doc[f"headers_{status}"],
|
|
)
|
|
|
|
@property
|
|
def down_webhook_spec(self) -> WebhookSpec:
|
|
return self.webhook_spec("down")
|
|
|
|
@property
|
|
def up_webhook_spec(self) -> WebhookSpec:
|
|
return self.webhook_spec("up")
|
|
|
|
@property
|
|
def shell(self) -> ShellConf:
|
|
assert self.kind == "shell"
|
|
return ShellConf.model_validate_json(self.value)
|
|
|
|
@property
|
|
def slack_team(self) -> str | None:
|
|
assert self.kind == "slack"
|
|
if not self.value.startswith("{"):
|
|
return None
|
|
|
|
doc = json.loads(self.value)
|
|
if "team_name" in doc:
|
|
assert isinstance(doc["team_name"], str)
|
|
return doc["team_name"]
|
|
|
|
if "team" in doc:
|
|
assert isinstance(doc["team"]["name"], str)
|
|
return doc["team"]["name"]
|
|
|
|
return None
|
|
|
|
@property
|
|
def slack_channel(self) -> str | None:
|
|
assert self.kind == "slack"
|
|
if not self.value.startswith("{"):
|
|
return None
|
|
|
|
doc = json.loads(self.value)
|
|
v = doc["incoming_webhook"]["channel"]
|
|
assert isinstance(v, str)
|
|
return v
|
|
|
|
@property
|
|
def slack_webhook_url(self) -> str:
|
|
assert self.kind in ("slack", "mattermost")
|
|
if not self.value.startswith("{"):
|
|
return self.value
|
|
|
|
doc = json.loads(self.value)
|
|
v = doc["incoming_webhook"]["url"]
|
|
assert isinstance(v, str)
|
|
return v
|
|
|
|
@property
|
|
def discord_webhook_url(self) -> str:
|
|
assert self.kind == "discord"
|
|
url = self.json["webhook"]["url"]
|
|
assert isinstance(url, str)
|
|
# Discord migrated to discord.com,
|
|
# and is dropping support for discordapp.com on 7 November 2020
|
|
if url.startswith("https://discordapp.com/"):
|
|
url = "https://discord.com/" + url[23:]
|
|
|
|
return url
|
|
|
|
@property
|
|
def telegram(self) -> TelegramConf:
|
|
assert self.kind == "telegram"
|
|
return TelegramConf.model_validate_json(self.value)
|
|
|
|
def update_telegram_id(self, new_chat_id: int) -> None:
|
|
doc = json.loads(self.value)
|
|
doc["id"] = new_chat_id
|
|
self.value = json.dumps(doc)
|
|
self.save()
|
|
|
|
@property
|
|
def pd(self) -> PdConf:
|
|
assert self.kind == "pd"
|
|
return PdConf.load(self.value)
|
|
|
|
@property
|
|
def phone(self) -> PhoneConf:
|
|
assert self.kind in ("call", "sms", "whatsapp", "signal")
|
|
return PhoneConf.model_validate_json(self.value)
|
|
|
|
@property
|
|
def trello(self) -> TrelloConf:
|
|
assert self.kind == "trello"
|
|
return TrelloConf.model_validate_json(self.value, strict=True)
|
|
|
|
@property
|
|
def email(self) -> EmailConf:
|
|
return EmailConf.load(self.value)
|
|
|
|
@property
|
|
def opsgenie(self) -> OpsgenieConf:
|
|
return OpsgenieConf.model_validate_json(self.value)
|
|
|
|
@property
|
|
def zulip(self) -> ZulipConf:
|
|
return ZulipConf.model_validate_json(self.value)
|
|
|
|
@property
|
|
def linenotify_token(self) -> str:
|
|
assert self.kind == "linenotify"
|
|
return self.value
|
|
|
|
@property
|
|
def gotify(self) -> GotifyConf:
|
|
assert self.kind == "gotify"
|
|
return GotifyConf.model_validate_json(self.value, strict=True)
|
|
|
|
@property
|
|
def group_channels(self) -> QuerySet[Channel]:
|
|
assert self.kind == "group"
|
|
return Channel.objects.filter(
|
|
project=self.project, code__in=self.value.split(",")
|
|
)
|
|
|
|
@property
|
|
def ntfy(self) -> NtfyConf:
|
|
assert self.kind == "ntfy"
|
|
return NtfyConf.model_validate_json(self.value, strict=True)
|
|
|
|
|
|
class Notification(models.Model):
|
|
code = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
|
|
# owner is null for test notifications, produced by the "Test!" button
|
|
# in the Integrations page
|
|
owner = models.ForeignKey(Check, models.CASCADE, null=True)
|
|
check_status = models.CharField(max_length=6)
|
|
channel = models.ForeignKey(Channel, models.CASCADE)
|
|
created = models.DateTimeField(default=now)
|
|
error = models.CharField(max_length=200, blank=True)
|
|
|
|
class Meta:
|
|
get_latest_by = "created"
|
|
|
|
def status_url(self) -> str:
|
|
return absolute_reverse("hc-api-notification-status", args=[self.code])
|
|
|
|
|
|
class FlipDict(TypedDict):
|
|
timestamp: str
|
|
up: int
|
|
|
|
|
|
class Flip(models.Model):
|
|
owner = models.ForeignKey(Check, models.CASCADE)
|
|
created = models.DateTimeField()
|
|
processed = models.DateTimeField(null=True, blank=True)
|
|
old_status = models.CharField(max_length=8, choices=STATUSES)
|
|
new_status = models.CharField(max_length=8, choices=STATUSES)
|
|
reason = models.CharField(max_length=8, choices=REASONS, default="")
|
|
|
|
class Meta:
|
|
indexes = [
|
|
# For quickly looking up unprocessed flips.
|
|
# Used in the sendalerts management command.
|
|
models.Index(
|
|
fields=["processed"],
|
|
name="api_flip_not_processed",
|
|
condition=models.Q(processed=None),
|
|
),
|
|
# For efficiently selecting flips in hc.front.views._get_events
|
|
models.Index(
|
|
fields=["owner", "created"],
|
|
name="api_flip_owner_created",
|
|
),
|
|
]
|
|
|
|
def to_dict(self) -> FlipDict:
|
|
return {
|
|
"timestamp": self.created.replace(microsecond=0).isoformat(),
|
|
"up": 1 if self.new_status == "up" else 0,
|
|
}
|
|
|
|
def select_channels(self) -> list[Channel]:
|
|
"""Return a list of channels that need to be notified.
|
|
|
|
* Exclude all channels for new->up and paused->up transitions.
|
|
* Exclude disabled channels
|
|
* Exclude channels where transport.is_noop(status) returns True
|
|
* Sort channels by last_notify_duration (shorter durations first)
|
|
"""
|
|
|
|
# Don't send alerts on new->up and paused->up transitions
|
|
if self.new_status == "up" and self.old_status in ("new", "paused"):
|
|
return []
|
|
|
|
if self.new_status not in ("up", "down"):
|
|
raise NotImplementedError(f"Unexpected status: {self.new_status}")
|
|
|
|
q = self.owner.channel_set.exclude(disabled=True)
|
|
q = q.order_by(F("last_notify_duration").asc(nulls_last=True))
|
|
return [ch for ch in q if not ch.transport.is_noop(self.new_status)]
|
|
|
|
def reason_long(self) -> str | None:
|
|
if self.reason == "timeout":
|
|
return "success signal did not arrive on time, grace time passed"
|
|
if self.reason == "fail":
|
|
return "received a failure signal"
|
|
return None
|
|
|
|
|
|
class TokenBucket(models.Model):
|
|
value = models.CharField(max_length=80, unique=True)
|
|
tokens = models.FloatField(default=1.0)
|
|
updated = models.DateTimeField(default=now)
|
|
|
|
@staticmethod
|
|
def authorize(
|
|
value: str, capacity: int, refill_time_secs: int, force: bool = False
|
|
) -> bool:
|
|
frozen_now = now()
|
|
obj, created = TokenBucket.objects.get_or_create(value=value)
|
|
|
|
if not created:
|
|
# Top up the bucket:
|
|
duration_secs = (frozen_now - obj.updated).total_seconds()
|
|
obj.tokens = min(1.0, obj.tokens + duration_secs / refill_time_secs)
|
|
|
|
obj.tokens -= 1.0 / capacity
|
|
if obj.tokens < 0 and not force:
|
|
# Not enough tokens
|
|
return False
|
|
|
|
# Race condition: two concurrent authorize calls can overwrite each
|
|
# other's changes. It's OK to be a little inexact here for the sake
|
|
# of simplicity.
|
|
obj.updated = frozen_now
|
|
obj.save()
|
|
|
|
return True
|
|
|
|
@staticmethod
|
|
def authorize_auth_ip(request: HttpRequest) -> bool:
|
|
headers = request.META
|
|
remote_addr = headers.get("HTTP_X_FORWARDED_FOR", headers["REMOTE_ADDR"])
|
|
remote_addr = remote_addr.split(",")[0]
|
|
if "." in remote_addr and ":" in remote_addr:
|
|
# If remote_addr is in a ipv4address:port format
|
|
# (like in Azure App Service), remove the port:
|
|
remote_addr = remote_addr.split(":")[0]
|
|
|
|
value = f"auth-ip-{remote_addr}"
|
|
# 20 signup/login attempts for a single IP per hour:
|
|
return TokenBucket.authorize(value, 20, 3600)
|
|
|
|
@staticmethod
|
|
def authorize_login_email(email: str) -> bool:
|
|
# remove dots and alias:
|
|
mailbox, domain = email.split("@")
|
|
mailbox = mailbox.replace(".", "")
|
|
mailbox = mailbox.split("+")[0]
|
|
email = mailbox + "@" + domain
|
|
|
|
salted_encoded = (email + settings.SECRET_KEY).encode()
|
|
hashed = hashlib.sha1(salted_encoded).hexdigest()
|
|
|
|
# 20 login attempts for a single email per hour:
|
|
return TokenBucket.authorize(f"em-{hashed}", 20, 3600)
|
|
|
|
@staticmethod
|
|
def authorize_invite(user: User) -> bool:
|
|
value = "invite-%d" % user.id
|
|
|
|
# 20 invites per day
|
|
return TokenBucket.authorize(value, 20, 3600 * 24)
|
|
|
|
@staticmethod
|
|
def authorize_login_password(email: str) -> bool:
|
|
salted_encoded = (email + settings.SECRET_KEY).encode()
|
|
hashed = hashlib.sha1(salted_encoded).hexdigest()
|
|
|
|
# 20 password attempts per day
|
|
return TokenBucket.authorize(f"pw-{hashed}", 20, 3600 * 24)
|
|
|
|
@staticmethod
|
|
def authorize_telegram(telegram_id: int) -> bool:
|
|
# 6 messages for a single chat per minute:
|
|
return TokenBucket.authorize(f"tg-{telegram_id}", 6, 60)
|
|
|
|
@staticmethod
|
|
def authorize_signal(phone: str) -> bool:
|
|
salted_encoded = (phone + settings.SECRET_KEY).encode()
|
|
hashed = hashlib.sha1(salted_encoded).hexdigest()
|
|
|
|
# 6 messages for a single recipient per minute:
|
|
return TokenBucket.authorize(f"signal-{hashed}", 6, 60)
|
|
|
|
@staticmethod
|
|
def authorize_signal_verification(user: User) -> bool:
|
|
value = "signal-verify-%d" % user.id
|
|
|
|
# 50 signal recipient verifications per day
|
|
return TokenBucket.authorize(value, 50, 3600 * 24)
|
|
|
|
@staticmethod
|
|
def authorize_pushover(user_key: str) -> bool:
|
|
salted_encoded = (user_key + settings.SECRET_KEY).encode()
|
|
hashed = hashlib.sha1(salted_encoded).hexdigest()
|
|
|
|
# 6 messages for a single user key per minute:
|
|
return TokenBucket.authorize(f"po-{hashed}", 6, 60)
|
|
|
|
@staticmethod
|
|
def authorize_sudo_code(user: User) -> bool:
|
|
value = "sudo-%d" % user.id
|
|
|
|
# 10 sudo attempts per day
|
|
return TokenBucket.authorize(value, 10, 3600 * 24)
|
|
|
|
@staticmethod
|
|
def authorize_totp_attempt(user: User) -> bool:
|
|
value = "totp-%d" % user.id
|
|
|
|
# 96 attempts per user per 24 hours
|
|
# (or, on average, one attempt per 15 minutes)
|
|
return TokenBucket.authorize(value, 96, 3600 * 24)
|
|
|
|
@staticmethod
|
|
def authorize_totp_code(user: User, code: str) -> bool:
|
|
value = "totpc-%d-%s" % (user.id, code)
|
|
|
|
# A code has a validity period of 3 * 30 = 90 seconds.
|
|
# During that period, allow the code to only be used once,
|
|
# so an eavesdropping attacker cannot reuse a code.
|
|
return TokenBucket.authorize(value, 1, 90)
|
|
|
|
@staticmethod
|
|
def s3_is_healthy() -> bool:
|
|
"""Return True if fewer than 3 GetObject errors in the last minute."""
|
|
try:
|
|
obj = TokenBucket.objects.get(value="s3_get_object_error")
|
|
except TokenBucket.DoesNotExist:
|
|
return True
|
|
|
|
duration_secs = (now() - obj.updated).total_seconds()
|
|
# How many tokens we would have after top-up:
|
|
tokens = min(1.0, obj.tokens + duration_secs / 60)
|
|
return tokens >= 1.0 / 3
|
|
|
|
@staticmethod
|
|
def record_s3_get_object_error() -> None:
|
|
# Use force=True, we are recording the S3 error after the error already
|
|
# happened, and want to record it even if the tokens field would go negative.
|
|
TokenBucket.authorize("s3_get_object_error", 3, 60, force=True)
|