healthchecks_healthchecks/hc/api/views.py
Pēteris Caune 79da9e9f4f
Fix auto-fixable ruff warnings
(`ruff check --fix`)
2024-11-07 15:15:58 +02:00

922 lines
29 KiB
Python

from __future__ import annotations
import email.policy
import time
from collections.abc import Iterable
from datetime import datetime
from datetime import timedelta as td
from datetime import timezone
from email import message_from_bytes
from typing import Any, Literal
from uuid import UUID
from cronsim import CronSim, CronSimError
from django.conf import settings
from django.core.signing import BadSignature
from django.db import connection, transaction
from django.db.models import Prefetch
from django.http import (
Http404,
HttpRequest,
HttpResponse,
HttpResponseBadRequest,
HttpResponseForbidden,
HttpResponseNotFound,
JsonResponse,
)
from django.shortcuts import get_object_or_404
from django.utils.text import slugify
from django.utils.timezone import now
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from oncalendar import OnCalendar, OnCalendarError
from pydantic import BaseModel, Field, ValidationError, field_validator, model_validator
from pydantic_core import PydanticCustomError
from hc.accounts.models import Profile, Project
from hc.api.decorators import ApiRequest, authorize, authorize_read, cors
from hc.api.forms import FlipsFiltersForm
from hc.api.models import MAX_DURATION, Channel, Check, Flip, Notification, Ping
from hc.lib.badges import check_signature, get_badge_svg, get_badge_url
from hc.lib.signing import unsign_bounce_id
from hc.lib.string import is_valid_uuid_string
from hc.lib.tz import all_timezones, legacy_timezones
class BadChannelException(Exception):
def __init__(self, message: str):
self.message = message
def guess_kind(schedule: str) -> str:
# If it is a single line with 5 components, it is probably a cron expression:
if "\n" not in schedule.strip() and len(schedule.split()) == 5:
return "cron"
return "oncalendar"
class Spec(BaseModel):
channels: str | None = None
desc: str | None = None
failure_kw: str | None = Field(None, max_length=200)
filter_body: bool | None = None
filter_subject: bool | None = None
grace: td | None = Field(None, ge=60, le=31536000)
manual_resume: bool | None = None
methods: Literal["", "POST"] | None = None
name: str | None = Field(None, max_length=100)
schedule: str | None = Field(None, max_length=100)
slug: str | None = Field(None, max_length=100, pattern="^[a-z0-9-_]*$")
start_kw: str | None = Field(None, max_length=200)
subject: str | None = Field(None, max_length=200)
subject_fail: str | None = Field(None, max_length=200)
success_kw: str | None = Field(None, max_length=200)
tags: str | None = None
timeout: td | None = Field(None, ge=60, le=31536000)
tz: str | None = None
unique: list[Literal["name", "slug", "tags", "timeout", "grace"]] | None = None
@model_validator(mode="before")
@classmethod
def check_nulls(cls, data: dict[str, Any]) -> dict[str, Any]:
# Look for any null values in the incoming data. Replace them with a
# float. None of the fields have a float type, and we are using
# strict validation, so this will cause type validation to fail.
for k, v in data.items():
if v is None:
data[k] = 0.0
return data
@field_validator("timeout", "grace", mode="before")
@classmethod
def convert_to_timedelta(cls, v: Any) -> Any:
if isinstance(v, int):
return td(seconds=v)
return v
@field_validator("tz")
@classmethod
def check_tz(cls, v: str) -> str:
if v in legacy_timezones:
# Replace legacy timezone with the current canonical time zone
# (for example, Europe/Kiev -> Europe/Kyiv)
v = legacy_timezones[v]
if v not in all_timezones:
raise PydanticCustomError("tz_syntax", "not a valid timezone")
return v
@field_validator("schedule")
@classmethod
def check_schedule(cls, v: str) -> str:
if guess_kind(v) == "cron":
try:
# Test if cronsim accepts it and can calculate the next datetime
it = CronSim(v, datetime(2000, 1, 1))
next(it)
except (CronSimError, StopIteration):
raise PydanticCustomError("cron_syntax", "not a valid cron expression")
else:
try:
# Test if oncalendar accepts it, and can calculate the next datetime
oncalendar_it = OnCalendar(v, datetime(2000, 1, 1, tzinfo=timezone.utc))
next(oncalendar_it)
except (OnCalendarError, StopIteration):
raise PydanticCustomError("cron_syntax", "not a valid expression")
return v
def kind(self) -> str | None:
if self.schedule:
return guess_kind(self.schedule)
if self.timeout:
return "simple"
return None
CUSTOM_ERRORS = {
"too_long": "%s is too long",
"string_too_long": "%s is too long",
"string_type": "%s is not a string",
"string_pattern_mismatch": "%s does not match pattern",
"less_than_equal": "%s is too large",
"greater_than_equal": "%s is too small",
"int_type": "%s is not a number",
"bool_type": "%s is not a boolean",
"literal_error": "%s has unexpected value",
"list_type": "%s is not an array",
"cron_syntax": "%s is not a valid cron or OnCalendar expression",
"tz_syntax": "%s is not a valid timezone",
"time_delta_type": "%s is not a number",
}
def format_first_error(exc: ValidationError) -> str:
first_error = exc.errors()[0]
subject = first_error["loc"][0]
if len(first_error["loc"]) == 2:
subject = f"an item in '{subject}'"
tmpl = CUSTOM_ERRORS[first_error["type"]]
return "json validation error: " + tmpl % subject
@csrf_exempt
@never_cache
def ping(
request: HttpRequest,
code: UUID,
check: Check | None = None,
action: str = "success",
exitstatus: int | None = None,
) -> HttpResponse:
if check is None:
try:
check = Check.objects.get(code=code)
except Check.DoesNotExist:
return HttpResponseNotFound("not found")
if exitstatus is not None and exitstatus > 255:
return HttpResponseBadRequest("invalid url format")
headers = request.META
remote_addr = headers.get("HTTP_X_FORWARDED_FOR", headers["REMOTE_ADDR"])
remote_addr = remote_addr.split(",")[0]
if "." in remote_addr and ":" in remote_addr:
# If remote_addr is in a ipv4address:port format (like in Azure App Service),
# remove the port:
remote_addr = remote_addr.split(":")[0]
scheme = headers.get("HTTP_X_FORWARDED_PROTO", "http")
method = headers["REQUEST_METHOD"]
ua = headers.get("HTTP_USER_AGENT", "")
body = request.body[: settings.PING_BODY_LIMIT]
if exitstatus is not None and exitstatus > 0:
action = "fail"
if check.methods == "POST" and method != "POST":
action = "ign"
rid, rid_str = None, request.GET.get("rid")
if rid_str is not None:
if not is_valid_uuid_string(rid_str):
return HttpResponseBadRequest("invalid uuid format")
rid = UUID(rid_str)
check.ping(remote_addr, scheme, method, ua, body, action, rid, exitstatus)
response = HttpResponse("OK")
if settings.PING_BODY_LIMIT is not None:
response["Ping-Body-Limit"] = str(settings.PING_BODY_LIMIT)
response["Access-Control-Allow-Origin"] = "*"
return response
@csrf_exempt
def ping_by_slug(
request: HttpRequest,
ping_key: str,
slug: str,
action: str = "success",
exitstatus: int | None = None,
) -> HttpResponse:
created = False
try:
check = Check.objects.get(slug=slug, project__ping_key=ping_key)
except Check.DoesNotExist:
if request.GET.get("create") != "1":
return HttpResponseNotFound("not found")
try:
project = Project.objects.get(ping_key=ping_key)
except Project.DoesNotExist:
return HttpResponseNotFound("not found")
check = Check(project=project, name=slug, slug=slug)
check.save()
check.assign_all_channels()
created = True
except Check.MultipleObjectsReturned:
return HttpResponse("ambiguous slug", status=409)
response = ping(request, check.code, check, action, exitstatus)
if response.status_code == 200 and created:
response.content = b"Created"
response.status_code = 201
return response
def _lookup(project: Project, spec: Spec) -> Check | None:
if not spec.unique:
return None
for field_name in spec.unique:
# If any field referenced in 'unique' is absent then return None
# (meaning, did not find a matching Check)
if getattr(spec, field_name) is None:
return None
existing_checks = Check.objects.filter(project=project)
if "name" in spec.unique:
existing_checks = existing_checks.filter(name=spec.name)
if "slug" in spec.unique:
existing_checks = existing_checks.filter(slug=spec.slug)
if "tags" in spec.unique:
existing_checks = existing_checks.filter(tags=spec.tags)
if "timeout" in spec.unique:
existing_checks = existing_checks.filter(timeout=spec.timeout)
if "grace" in spec.unique:
existing_checks = existing_checks.filter(grace=spec.grace)
return existing_checks.first()
def _update(check: Check, spec: Spec, v: int) -> None:
new_channels: Iterable[Channel] | None
# First, validate the supplied channel codes/names
if spec.channels is None:
# If the channels key is not present, don't update check's channels
new_channels = None
elif spec.channels == "*":
# "*" means "all project's channels"
new_channels = Channel.objects.filter(project=check.project)
elif spec.channels == "":
# "" means "empty list"
new_channels = []
else:
# expect a comma-separated list of channel codes or names
new_channels = set()
available = list(Channel.objects.filter(project=check.project))
for s in spec.channels.split(","):
if s == "":
raise BadChannelException("empty channel identifier")
matches = [c for c in available if str(c.code) == s or c.name == s]
if len(matches) == 0:
raise BadChannelException("invalid channel identifier: %s" % s)
elif len(matches) > 1:
raise BadChannelException("non-unique channel identifier: %s" % s)
new_channels.add(matches[0])
need_save = False
if check.pk is None:
# Empty pk means we're inserting a new check,
# and so do need to save() it:
need_save = True
if spec.name is not None and check.name != spec.name:
check.name = spec.name
if v < 3:
# v1 and v2 generates slug automatically from name
check.slug = slugify(spec.name)
need_save = True
kind = spec.kind()
if kind == "simple":
if check.kind != "simple" or check.timeout != spec.timeout:
check.kind = "simple"
check.timeout = spec.timeout
need_save = True
if kind in ("cron", "oncalendar"):
if check.kind != kind or check.schedule != spec.schedule:
check.kind = kind
assert spec.schedule is not None
check.schedule = spec.schedule
need_save = True
if spec.subject is not None:
check.success_kw = spec.subject
check.filter_subject = bool(check.success_kw or check.failure_kw)
need_save = True
if spec.subject_fail is not None:
check.failure_kw = spec.subject_fail
check.filter_subject = bool(check.success_kw or check.failure_kw)
need_save = True
for key in (
"slug",
"tags",
"desc",
"manual_resume",
"methods",
"tz",
"start_kw",
"success_kw",
"failure_kw",
"filter_subject",
"filter_body",
"grace",
):
v = getattr(spec, key)
if v is not None and getattr(check, key) != v:
setattr(check, key, v)
need_save = True
if need_save:
check.alert_after = check.going_down_after()
check.save()
# This needs to be done after saving the check, because of
# the M2M relation between checks and channels:
if new_channels is not None:
check.channel_set.set(new_channels)
@authorize_read
def get_checks(request: ApiRequest) -> JsonResponse:
q = Check.objects.filter(project=request.project)
if not request.readonly:
# Use QuerySet.only() and Prefetch() to prefetch channel codes only:
channel_q = Channel.objects.only("code")
q = q.prefetch_related(Prefetch("channel_set", queryset=channel_q))
tags = set(request.GET.getlist("tag"))
for tag in tags:
# approximate filtering by tags
q = q.filter(tags__contains=tag)
if slug := request.GET.get("slug"):
q = q.filter(slug=slug)
checks = []
for check in q:
# precise, final filtering
if not tags or check.matches_tag_set(tags):
checks.append(check.to_dict(readonly=request.readonly, v=request.v))
return JsonResponse({"checks": checks})
@authorize
def create_check(request: ApiRequest) -> HttpResponse:
try:
spec = Spec.model_validate(request.json, strict=True)
except ValidationError as e:
return JsonResponse({"error": format_first_error(e)}, status=400)
created = False
check = _lookup(request.project, spec)
if check is None:
if request.project.num_checks_available() <= 0:
return HttpResponseForbidden()
check = Check(project=request.project)
created = True
try:
_update(check, spec, request.v)
except BadChannelException as e:
return JsonResponse({"error": e.message}, status=400)
return JsonResponse(check.to_dict(v=request.v), status=201 if created else 200)
@csrf_exempt
@cors("GET", "POST")
def checks(request: HttpRequest) -> HttpResponse:
if request.method == "POST":
return create_check(request)
return get_checks(request)
@cors("GET")
@csrf_exempt
@authorize
def channels(request: ApiRequest) -> JsonResponse:
q = Channel.objects.filter(project=request.project)
channels = [ch.to_dict() for ch in q]
return JsonResponse({"channels": channels})
@authorize_read
def get_check(request: ApiRequest, code: UUID) -> HttpResponse:
check = get_object_or_404(Check, code=code)
if check.project_id != request.project.id:
return HttpResponseForbidden()
return JsonResponse(check.to_dict(readonly=request.readonly, v=request.v))
@cors("GET")
@csrf_exempt
@authorize_read
def get_check_by_unique_key(request: ApiRequest, unique_key: str) -> HttpResponse:
for check in request.project.check_set.all():
if check.unique_key == unique_key:
return JsonResponse(check.to_dict(readonly=request.readonly, v=request.v))
return HttpResponseNotFound()
@authorize
def update_check(request: ApiRequest, code: UUID) -> HttpResponse:
# Don't acquire lock right away, first see if the check exists
# and matches the API key
check = get_object_or_404(Check, code=code)
if check.project_id != request.project.id:
return HttpResponseForbidden()
try:
spec = Spec.model_validate(request.json, strict=True)
except ValidationError as e:
return JsonResponse({"error": format_first_error(e)}, status=400)
# Start a transaction, select for update, update.
# Use get_object_or_404 here again, in case another concurrent request
# has *just* deleted this check.
with transaction.atomic():
check = get_object_or_404(Check.objects.select_for_update(), code=code)
try:
_update(check, spec, request.v)
except BadChannelException as e:
return JsonResponse({"error": e.message}, status=400)
return JsonResponse(check.to_dict(v=request.v))
@authorize
def delete_check(request: ApiRequest, code: UUID) -> HttpResponse:
# Don't acquire lock right away, first see if the check exists
# and matches the API key
check = get_object_or_404(Check, code=code)
if check.project_id != request.project.id:
return HttpResponseForbidden()
# Start a transaction, select for update, delete.
# Use get_object_or_404 here again, in case another concurrent request
# has *just* deleted this check.
with transaction.atomic():
check = get_object_or_404(Check.objects.select_for_update(), code=code)
check.delete()
return JsonResponse(check.to_dict(v=request.v))
@csrf_exempt
@cors("POST", "DELETE", "GET")
def single(request: HttpRequest, code: UUID) -> HttpResponse:
if request.method == "POST":
return update_check(request, code)
if request.method == "DELETE":
return delete_check(request, code)
return get_check(request, code)
@cors("POST")
@csrf_exempt
@authorize
def pause(request: ApiRequest, code: UUID) -> HttpResponse:
check = get_object_or_404(Check, code=code)
if check.project_id != request.project.id:
return HttpResponseForbidden()
# Track the status change for correct downtime calculation in Check.downtimes()
check.create_flip("paused", mark_as_processed=True)
check.status = "paused"
check.last_start = None
check.alert_after = None
check.save()
# After pausing a check we must check if all checks are up,
# and Profile.next_nag_date needs to be cleared out:
check.project.update_next_nag_dates()
return JsonResponse(check.to_dict(v=request.v))
@cors("POST")
@csrf_exempt
@authorize
def resume(request: ApiRequest, code: UUID) -> HttpResponse:
check = get_object_or_404(Check, code=code)
if check.project_id != request.project.id:
return HttpResponseForbidden()
if check.status != "paused":
return HttpResponse("check is not paused", status=409)
check.create_flip("new", mark_as_processed=True)
check.status = "new"
check.last_start = None
check.last_ping = None
check.alert_after = None
check.save()
return JsonResponse(check.to_dict(v=request.v))
@cors("GET")
@csrf_exempt
@authorize
def pings(request: ApiRequest, code: UUID) -> HttpResponse:
check = get_object_or_404(Check, code=code)
if check.project_id != request.project.id:
return HttpResponseForbidden()
# Look up ping log limit from account's profile.
# There might be more pings in the database (depends on how pruning is handled)
# but we will not return more than the limit allows.
profile = Profile.objects.get(user__project=request.project)
limit = profile.ping_log_limit
# Query in descending order so we're sure to get the most recent
# pings, regardless of the limit restriction
pings = list(Ping.objects.filter(owner=check).order_by("-id")[:limit])
starts: dict[UUID | None, datetime | None] = {}
num_misses = 0
for ping in reversed(pings):
if ping.kind == "start":
starts[ping.rid] = ping.created
elif ping.kind in (None, "", "fail"):
if ping.rid not in starts:
# We haven't seen a start, success or fail event for this rid.
# Will need to fall back to Ping.duration().
num_misses += 1
else:
ping.duration = None
start = starts[ping.rid]
if start and (ping.created - start) < MAX_DURATION:
ping.duration = ping.created - start
starts[ping.rid] = None
# If we will need to fall back to Ping.duration() more than 10 times
# then disable duration display altogether:
if num_misses > 10:
for ping in pings:
ping.duration = None
return JsonResponse({"pings": [p.to_dict() for p in pings]})
@cors("GET")
@csrf_exempt
@authorize
def ping_body(request: ApiRequest, code: UUID, n: int) -> HttpResponse:
check = get_object_or_404(Check, code=code)
if check.project_id != request.project.id:
return HttpResponseForbidden()
profile = Profile.objects.get(user__project=request.project)
threshold = check.n_pings - profile.ping_log_limit
if n <= threshold:
raise Http404()
ping = get_object_or_404(Ping, owner=check, n=n)
body = ping.get_body_bytes()
if not body:
raise Http404()
response = HttpResponse(body, content_type="text/plain")
return response
def flips(request: ApiRequest, check: Check) -> HttpResponse:
if check.project_id != request.project.id:
return HttpResponseForbidden()
form = FlipsFiltersForm(request.GET)
if not form.is_valid():
return HttpResponseBadRequest()
flips = Flip.objects.filter(owner=check).order_by("-id")
if form.cleaned_data["start"]:
flips = flips.filter(created__gte=form.cleaned_data["start"])
if form.cleaned_data["end"]:
flips = flips.filter(created__lt=form.cleaned_data["end"])
if form.cleaned_data["seconds"]:
threshold = now() - td(seconds=form.cleaned_data["seconds"])
flips = flips.filter(created__gte=threshold)
return JsonResponse({"flips": [flip.to_dict() for flip in flips]})
@cors("GET")
@csrf_exempt
@authorize_read
def flips_by_uuid(request: ApiRequest, code: UUID) -> HttpResponse:
check = get_object_or_404(Check, code=code)
return flips(request, check)
@cors("GET")
@csrf_exempt
@authorize_read
def flips_by_unique_key(request: ApiRequest, unique_key: str) -> HttpResponse:
for check in request.project.check_set.all():
if check.unique_key == unique_key:
return flips(request, check)
return HttpResponseNotFound()
@cors("GET")
@csrf_exempt
@authorize_read
def badges(request: ApiRequest) -> JsonResponse:
tags = set(["*"])
for check in request.project.check_set.all():
tags.update(check.tags_list())
key = request.project.badge_key
badges = {}
for tag in tags:
badges[tag] = {
"svg": get_badge_url(key, tag),
"svg3": get_badge_url(key, tag, with_late=True),
"json": get_badge_url(key, tag, fmt="json"),
"json3": get_badge_url(key, tag, fmt="json", with_late=True),
"shields": get_badge_url(key, tag, fmt="shields"),
"shields3": get_badge_url(key, tag, fmt="shields", with_late=True),
}
return JsonResponse({"badges": badges})
SHIELDS_COLORS = {"up": "success", "late": "important", "down": "critical"}
def _shields_response(label: str, status: str) -> JsonResponse:
return JsonResponse(
{
"schemaVersion": 1,
"label": label,
"message": status,
"color": SHIELDS_COLORS[status],
}
)
@never_cache
@cors("GET")
def badge(
request: HttpRequest, badge_key: str, signature: str, tag: str, fmt: str
) -> HttpResponse:
if fmt not in ("svg", "json", "shields"):
return HttpResponseNotFound()
with_late = True
if len(signature) == 10 and signature.endswith("-2"):
with_late = False
if not check_signature(badge_key, tag, signature):
return HttpResponseNotFound()
q = Check.objects.filter(project__badge_key=badge_key)
if tag == "*":
label = settings.MASTER_BADGE_LABEL
else:
q = q.filter(tags__contains=tag)
label = tag
status, total, grace, down = "up", 0, 0, 0
for check in q:
if tag != "*" and tag not in check.tags_list():
continue
total += 1
check_status = check.get_status()
if check_status == "down":
down += 1
status = "down"
if fmt == "svg":
# For SVG badges, we can leave the loop as soon as we
# find the first "down"
break
elif check_status == "grace":
grace += 1
if status == "up" and with_late:
status = "late"
if fmt == "shields":
return _shields_response(label, status)
if fmt == "json":
return JsonResponse(
{"status": status, "total": total, "grace": grace, "down": down}
)
svg = get_badge_svg(label, status)
return HttpResponse(svg, content_type="image/svg+xml")
@never_cache
@cors("GET")
def check_badge(
request: HttpRequest, states: int, badge_key: UUID, fmt: str
) -> HttpResponse:
if fmt not in ("svg", "json", "shields"):
return HttpResponseNotFound()
check = get_object_or_404(Check, badge_key=badge_key)
check_status = check.get_status()
status = "up"
if check_status == "down":
status = "down"
elif check_status == "grace" and states == 3:
status = "late"
if fmt == "shields":
return _shields_response(check.name_then_code(), status)
if fmt == "json":
return JsonResponse(
{
"status": status,
"total": 1,
"grace": 1 if check_status == "grace" else 0,
"down": 1 if check_status == "down" else 0,
}
)
svg = get_badge_svg(check.name_then_code(), status)
return HttpResponse(svg, content_type="image/svg+xml")
@csrf_exempt
@require_POST
def notification_status(request: HttpRequest, code: UUID) -> HttpResponse:
"""Handle notification delivery status callbacks."""
try:
cutoff = now() - td(hours=1)
notification = Notification.objects.get(code=code, created__gt=cutoff)
except Notification.DoesNotExist:
# If the notification does not exist, or is more than a hour old,
# return HTTP 200 so the other party doesn't retry over and over again:
return HttpResponse()
error, mark_disabled = None, False
# Look for "error" and "mark_disabled" keys:
if request.POST.get("error"):
error = request.POST["error"][:200]
mark_disabled = bool(request.POST.get("mark_disabled"))
# Handle "MessageStatus" key from Twilio
if request.POST.get("MessageStatus") in ("failed", "undelivered"):
status = request.POST["MessageStatus"]
error = f"Delivery failed (status={status})."
# Handle "CallStatus" key from Twilio
if request.POST.get("CallStatus") == "failed":
error = "Delivery failed (status=failed)."
if error:
notification.error = error
notification.save(update_fields=["error"])
channel_q = Channel.objects.filter(id=notification.channel_id)
channel_q.update(last_error=error)
if mark_disabled:
channel_q.update(disabled=True)
return HttpResponse()
def metrics(request: HttpRequest) -> HttpResponse:
if not settings.METRICS_KEY:
return HttpResponseForbidden()
key = request.META.get("HTTP_X_METRICS_KEY")
if key != settings.METRICS_KEY:
return HttpResponseForbidden()
doc = {
"ts": int(time.time()),
"max_ping_id": Ping.objects.values_list("id", flat=True).last(),
"max_notification_id": Notification.objects.values_list("id", flat=True).last(),
"num_unprocessed_flips": Flip.objects.filter(processed__isnull=True).count(),
}
return JsonResponse(doc)
def status(request: HttpRequest) -> HttpResponse:
with connection.cursor() as c:
c.execute("SELECT 1")
c.fetchone()
return HttpResponse("OK")
@csrf_exempt
def bounces(request: HttpRequest) -> HttpResponse:
msg = message_from_bytes(request.body, policy=email.policy.SMTP)
to_local = msg.get("To", "").split("@")[0]
try:
unsigned = unsign_bounce_id(to_local, max_age=3600 * 48)
except BadSignature:
# If the signature is invalid or expired return HTTP 200 so the other party
# doesn't retry over and over again-
return HttpResponse("OK (bad signature)")
status, diagnostic = "", ""
for part in msg.walk():
if "Status" in part and "Action" in part:
status = part["Status"]
diagnostic = part.get("Diagnostic-Code", "")
if diagnostic.lower().startswith("smtp; "):
diagnostic = diagnostic[6:]
break
permanent = status.startswith("5.")
transient = status.startswith("4.")
if not permanent and not transient:
return HttpResponse("OK (ignored)")
if unsigned.startswith("n."):
notification_code = unsigned[2:]
try:
cutoff = now() - td(hours=48)
n = Notification.objects.get(code=notification_code, created__gt=cutoff)
except Notification.DoesNotExist:
return HttpResponse("OK (notification not found)")
if diagnostic:
error = f"Delivery failed ({diagnostic})"[:200]
else:
error = f"Delivery failed (SMTP status code: {status})"[:200]
n.error = error
n.save(update_fields=["error"])
channel_q = Channel.objects.filter(id=n.channel_id)
channel_q.update(last_error=error)
if permanent:
channel_q.update(disabled=True)
if unsigned.startswith("r.") and permanent:
username = unsigned[2:]
try:
profile = Profile.objects.get(user__username=username)
except Profile.DoesNotExist:
return HttpResponse("OK (user not found)")
profile.reports = "off"
profile.next_report_date = None
profile.nag_period = td()
profile.next_nag_date = None
profile.save()
return HttpResponse("OK")