mirror of
https://github.com/healthchecks/healthchecks.git
synced 2024-11-21 15:17:57 +00:00
62b10be5fe
(instead of showing a table with a header row and no data rows)
2752 lines
91 KiB
Python
2752 lines
91 KiB
Python
from __future__ import annotations
|
|
|
|
import email
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import uuid
|
|
from collections import Counter, defaultdict
|
|
from collections.abc import Iterable
|
|
from datetime import datetime
|
|
from datetime import timedelta as td
|
|
from email.message import EmailMessage
|
|
from itertools import islice
|
|
from secrets import token_urlsafe
|
|
from typing import Literal, TypedDict, cast
|
|
from urllib.parse import urlencode, urlparse
|
|
from uuid import UUID
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from cronsim import CronSim, CronSimError
|
|
from django.conf import settings
|
|
from django.contrib import messages
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.core import signing
|
|
from django.core.exceptions import PermissionDenied
|
|
from django.db.models import Case, Count, F, Q, QuerySet, When
|
|
from django.db.models.functions import Substr
|
|
from django.http import (
|
|
Http404,
|
|
HttpRequest,
|
|
HttpResponse,
|
|
HttpResponseBadRequest,
|
|
HttpResponseForbidden,
|
|
JsonResponse,
|
|
)
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.template.loader import get_template, render_to_string
|
|
from django.urls import reverse
|
|
from django.utils.timezone import now
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.decorators.http import require_POST
|
|
from django_stubs_ext import WithAnnotations
|
|
from oncalendar import OnCalendar, OnCalendarError
|
|
from pydantic import BaseModel, TypeAdapter, ValidationError
|
|
|
|
from hc.accounts.http import AuthenticatedHttpRequest
|
|
from hc.accounts.models import Member, Profile, Project
|
|
from hc.api.models import (
|
|
DEFAULT_GRACE,
|
|
DEFAULT_TIMEOUT,
|
|
MAX_DURATION,
|
|
Channel,
|
|
Check,
|
|
Flip,
|
|
Notification,
|
|
Ping,
|
|
TokenBucket,
|
|
)
|
|
from hc.api.transports import Signal, Telegram, TransportError
|
|
from hc.front import forms
|
|
from hc.front.decorators import require_setting
|
|
from hc.front.templatetags.hc_extras import (
|
|
down_title,
|
|
num_down_title,
|
|
site_hostname,
|
|
sortchecks,
|
|
)
|
|
from hc.lib import curl
|
|
from hc.lib.badges import get_badge_url
|
|
from hc.lib.tz import all_timezones
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
VALID_SORT_VALUES = ("name", "-name", "last_ping", "-last_ping", "created")
|
|
STATUS_TEXT_TMPL = get_template("front/log_status_text.html")
|
|
LAST_PING_TMPL = get_template("front/last_ping_cell.html")
|
|
EVENTS_TMPL = get_template("front/details_events.html")
|
|
DOWNTIMES_TMPL = get_template("front/details_downtimes.html")
|
|
|
|
|
|
def _tags_counts(checks: Iterable[Check]) -> tuple[list[tuple[str, str, str]], int]:
|
|
num_down = 0
|
|
grace = set()
|
|
counts: Counter[str] = Counter()
|
|
down_counts: Counter[str] = Counter()
|
|
for check in checks:
|
|
counts.update(check.tags_list())
|
|
if check.cached_status == "down":
|
|
num_down += 1
|
|
down_counts.update(check.tags_list())
|
|
elif check.cached_status == "grace":
|
|
grace.update(check.tags_list())
|
|
|
|
result = []
|
|
for tag in counts:
|
|
if tag in down_counts:
|
|
status = "down"
|
|
text = f"{down_counts[tag]} of {counts[tag]} down"
|
|
else:
|
|
status = "grace" if tag in grace else "up"
|
|
text = f"{counts[tag]} up"
|
|
|
|
result.append((tag, status, text))
|
|
|
|
return result, num_down
|
|
|
|
|
|
def _common_timezones(checks: Iterable[Check]) -> list[str]:
|
|
counter: Counter[str] = Counter()
|
|
for check in checks:
|
|
counter[check.tz] += 1
|
|
|
|
return [tz for tz, _ in counter.most_common(3)]
|
|
|
|
|
|
def _get_check_for_user(
|
|
request: HttpRequest, code: UUID, preload_owner_profile: bool = False
|
|
) -> tuple[Check, bool]:
|
|
"""Return specified check if current user has access to it.
|
|
|
|
If `preload_owner_profile` is `True`, the returned check's
|
|
project.owner.profile will be already loaded. This helps avoid extra SQL queries
|
|
if the caller later looks up the project owner's check_limit or ping_log_limit.
|
|
|
|
"""
|
|
|
|
assert request.user.is_authenticated
|
|
|
|
q = Check.objects.select_related("project")
|
|
if preload_owner_profile:
|
|
q = q.select_related("project__owner__profile")
|
|
|
|
check = get_object_or_404(q, code=code)
|
|
if request.user.is_superuser:
|
|
return check, True
|
|
|
|
if request.user.id == check.project.owner_id:
|
|
return check, True
|
|
|
|
membership = get_object_or_404(Member, project=check.project, user=request.user)
|
|
return check, membership.is_rw
|
|
|
|
|
|
def _get_rw_check_for_user(request: HttpRequest, code: UUID) -> Check:
|
|
check, rw = _get_check_for_user(request, code)
|
|
if not rw:
|
|
raise PermissionDenied
|
|
|
|
return check
|
|
|
|
|
|
def _get_channel_for_user(request: HttpRequest, code: UUID) -> tuple[Channel, bool]:
|
|
"""Return specified channel if current user has access to it."""
|
|
|
|
assert request.user.is_authenticated
|
|
|
|
channel = get_object_or_404(Channel.objects.select_related("project"), code=code)
|
|
if request.user.is_superuser:
|
|
return channel, True
|
|
|
|
if request.user.id == channel.project.owner_id:
|
|
return channel, True
|
|
|
|
membership = get_object_or_404(Member, project=channel.project, user=request.user)
|
|
return channel, membership.is_rw
|
|
|
|
|
|
def _get_rw_channel_for_user(request: HttpRequest, code: UUID) -> Channel:
|
|
channel, rw = _get_channel_for_user(request, code)
|
|
if not rw:
|
|
raise PermissionDenied
|
|
|
|
return channel
|
|
|
|
|
|
def _get_project_for_user(request: HttpRequest, code: UUID) -> tuple[Project, bool]:
|
|
"""Check access, return (project, rw) tuple."""
|
|
|
|
project = get_object_or_404(Project, code=code)
|
|
if request.user.is_superuser:
|
|
return project, True
|
|
|
|
if request.user.id == project.owner_id:
|
|
return project, True
|
|
|
|
membership = get_object_or_404(Member, project=project, user=request.user)
|
|
|
|
return project, membership.is_rw
|
|
|
|
|
|
def _get_rw_project_for_user(request: HttpRequest, code: UUID) -> Project:
|
|
"""Check access, return (project, rw) tuple."""
|
|
|
|
project, rw = _get_project_for_user(request, code)
|
|
if not rw:
|
|
raise PermissionDenied
|
|
|
|
return project
|
|
|
|
|
|
def _refresh_last_active_date(request: AuthenticatedHttpRequest) -> None:
|
|
"""Update last_active_date if it is more than a day old."""
|
|
|
|
profile = request.profile
|
|
if profile.last_active_date is None or (now() - profile.last_active_date).days > 0:
|
|
profile.last_active_date = now()
|
|
profile.save()
|
|
|
|
# Also modify session to trigger session cookie refresh
|
|
# and push forward its expiry date:
|
|
request.session["last_active"] = profile.last_active_date.timestamp()
|
|
|
|
return None
|
|
|
|
|
|
def _get_referer_qs(request: HttpRequest) -> str:
|
|
parsed = urlparse(request.META.get("HTTP_REFERER", ""))
|
|
if parsed.query:
|
|
assert isinstance(parsed.query, str)
|
|
return "?" + parsed.query
|
|
return ""
|
|
|
|
|
|
def _status_match(check: Check, statuses: set[str]) -> bool:
|
|
if "started" in statuses and check.last_start:
|
|
return True
|
|
return check.cached_status in statuses
|
|
|
|
|
|
@login_required
|
|
def checks(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
_refresh_last_active_date(request)
|
|
project, rw = _get_project_for_user(request, code)
|
|
|
|
if request.GET.get("sort") in VALID_SORT_VALUES:
|
|
request.profile.sort = request.GET["sort"]
|
|
request.profile.save()
|
|
|
|
if request.GET.get("urls") in ("uuid", "slug") and rw:
|
|
project.show_slugs = request.GET["urls"] == "slug"
|
|
project.save()
|
|
|
|
if request.session.get("last_project_id") != project.id:
|
|
request.session["last_project_id"] = project.id
|
|
|
|
q = Check.objects.filter(project=project)
|
|
q = q.select_related("project")
|
|
checks = list(q.prefetch_related("channel_set"))
|
|
sortchecks(checks, request.profile.sort)
|
|
|
|
tags_counts, num_down = _tags_counts(checks)
|
|
tags_counts.sort(key=lambda item: item[0].lower())
|
|
|
|
is_group = Case(When(kind="group", then=0), default=1)
|
|
channels = project.channel_set.annotate(is_group=is_group)
|
|
# Sort groups first, then in the creation order
|
|
channels = channels.order_by("is_group", "created")
|
|
|
|
hidden_checks = set()
|
|
# Hide checks that don't match selected tags:
|
|
selected_tags = set(request.GET.getlist("tag", []))
|
|
if selected_tags:
|
|
for check in checks:
|
|
if not selected_tags.issubset(check.tags_list()):
|
|
hidden_checks.add(check)
|
|
|
|
# Hide checks that don't match the search string:
|
|
search = request.GET.get("search", "")
|
|
if search:
|
|
for check in checks:
|
|
haystack = f"{check.name}\n{check.slug}\n{check.code}"
|
|
if search not in haystack.lower():
|
|
hidden_checks.add(check)
|
|
|
|
# Hide checks that don't match status filters
|
|
selected_statuses = set(request.GET.getlist("status", []))
|
|
if selected_statuses:
|
|
for check in checks:
|
|
if not _status_match(check, selected_statuses):
|
|
hidden_checks.add(check)
|
|
|
|
# Figure out which checks have ambiguous ping URLs
|
|
seen, ambiguous = set(), set()
|
|
if project.show_slugs:
|
|
for check in checks:
|
|
if check.slug and check.slug in seen:
|
|
ambiguous.add(check.slug)
|
|
else:
|
|
seen.add(check.slug)
|
|
|
|
# Do we need to show the "Last Duration" header?
|
|
show_last_duration = False
|
|
for check in checks:
|
|
if check.clamped_last_duration():
|
|
show_last_duration = True
|
|
break
|
|
|
|
ctx = {
|
|
"page": "checks",
|
|
"rw": rw,
|
|
"checks": checks,
|
|
"channels": channels,
|
|
"num_down": num_down,
|
|
"tags": tags_counts,
|
|
"ping_endpoint": settings.PING_ENDPOINT,
|
|
"common_timezones": _common_timezones(checks),
|
|
"timezones": all_timezones,
|
|
"project": project,
|
|
"num_available": project.num_checks_available(),
|
|
"sort": request.profile.sort,
|
|
"selected_tags": selected_tags,
|
|
"selected_statuses": selected_statuses,
|
|
"search": search,
|
|
"hidden_checks": hidden_checks,
|
|
"num_visible": len(checks) - len(hidden_checks),
|
|
"ambiguous": ambiguous,
|
|
"show_last_duration": show_last_duration,
|
|
}
|
|
|
|
return render(request, "front/checks.html", ctx)
|
|
|
|
|
|
def status(request: HttpRequest, code: UUID) -> HttpResponse:
|
|
if not request.user.is_authenticated:
|
|
return HttpResponseForbidden()
|
|
|
|
project, rw = _get_project_for_user(request, code)
|
|
checks = list(Check.objects.filter(project=project))
|
|
|
|
details = []
|
|
for check in checks:
|
|
ctx = {"check": check}
|
|
details.append(
|
|
{
|
|
"code": str(check.code),
|
|
"status": check.get_status(),
|
|
"last_ping": LAST_PING_TMPL.render(ctx).strip(),
|
|
"started": check.last_start is not None,
|
|
}
|
|
)
|
|
|
|
tags_counts, num_down = _tags_counts(checks)
|
|
tags = {tag: (status, tooltip) for tag, status, tooltip in tags_counts}
|
|
return JsonResponse(
|
|
{"details": details, "tags": tags, "title": num_down_title(num_down)}
|
|
)
|
|
|
|
|
|
@login_required
|
|
@require_POST
|
|
def switch_channel(
|
|
request: AuthenticatedHttpRequest, code: UUID, channel_code: UUID
|
|
) -> HttpResponse:
|
|
check = _get_rw_check_for_user(request, code)
|
|
|
|
channel = get_object_or_404(Channel, code=channel_code)
|
|
if channel.project_id != check.project_id:
|
|
return HttpResponseBadRequest()
|
|
|
|
if request.POST.get("state") == "on":
|
|
channel.checks.add(check)
|
|
else:
|
|
channel.checks.remove(check)
|
|
|
|
return HttpResponse()
|
|
|
|
|
|
class ProjectStatus(TypedDict):
|
|
status: str
|
|
started: bool
|
|
|
|
|
|
def _get_project_summary(profile: Profile) -> dict[UUID, ProjectStatus]:
|
|
statuses: dict[UUID, ProjectStatus] = defaultdict(
|
|
lambda: {"status": "up", "started": False}
|
|
)
|
|
q = profile.checks_from_all_projects()
|
|
q = q.annotate(project_code=F("project__code"))
|
|
for check in q:
|
|
summary = statuses[check.project_code]
|
|
if check.last_start:
|
|
summary["started"] = True
|
|
|
|
if summary["status"] != "down":
|
|
status = check.get_status()
|
|
if status == "down" or (status == "grace" and summary["status"] == "up"):
|
|
summary["status"] = status
|
|
|
|
return statuses
|
|
|
|
|
|
def index(request: HttpRequest) -> HttpResponse:
|
|
if not request.user.is_authenticated:
|
|
return redirect("hc-login")
|
|
|
|
# We now know user is logged, tell the type checker request.profile exists-
|
|
request = cast(AuthenticatedHttpRequest, request)
|
|
_refresh_last_active_date(request)
|
|
summary = _get_project_summary(request.profile)
|
|
if "refresh" in request.GET:
|
|
return JsonResponse({str(k): v for k, v in summary.items()})
|
|
|
|
q = request.profile.projects()
|
|
q = q.annotate(n_checks=Count("check", distinct=True))
|
|
q = q.annotate(n_channels=Count("channel", distinct=True))
|
|
q = q.annotate(owner_email=F("owner__email"))
|
|
projects = list(q)
|
|
any_down = False
|
|
for project in projects:
|
|
setattr(project, "overall_status", summary[project.code]["status"])
|
|
setattr(project, "any_started", summary[project.code]["started"])
|
|
if summary[project.code]["status"] == "down":
|
|
any_down = True
|
|
|
|
# The list returned by projects() is already sorted . Do an additional sorting pass
|
|
# to move projects with overall_status=down to the front (without changing their
|
|
# relative order)
|
|
projects.sort(key=lambda p: getattr(p, "overall_status") != "down")
|
|
|
|
ctx = {
|
|
"page": "projects",
|
|
"projects": projects,
|
|
"last_project_id": request.session.get("last_project_id"),
|
|
"any_down": any_down,
|
|
}
|
|
|
|
return render(request, "front/projects.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def projects_menu(request: AuthenticatedHttpRequest) -> HttpResponse:
|
|
projects = list(request.profile.projects())
|
|
|
|
statuses: dict[int, str] = defaultdict(lambda: "up")
|
|
for check in Check.objects.filter(project__in=projects):
|
|
old_status = statuses[check.project_id]
|
|
if old_status != "down":
|
|
status = check.get_status()
|
|
if status == "down" or (status == "grace" and old_status == "up"):
|
|
statuses[check.project_id] = status
|
|
|
|
for p in projects:
|
|
setattr(p, "overall_status", statuses[p.id])
|
|
|
|
return render(request, "front/projects_menu.html", {"projects": projects})
|
|
|
|
|
|
def dashboard(request: HttpRequest) -> HttpResponse:
|
|
return render(request, "front/dashboard.html", {})
|
|
|
|
|
|
def _replace_placeholders(doc: str, html: str) -> str:
|
|
if doc.startswith("self_hosted"):
|
|
return html
|
|
|
|
limit = settings.PING_BODY_LIMIT or 100
|
|
if limit % 1000 == 0:
|
|
limit_fmt = f"{limit // 1000} kB"
|
|
else:
|
|
limit_fmt = f"{limit} bytes"
|
|
|
|
replaces = {
|
|
"{{ default_timeout }}": str(int(DEFAULT_TIMEOUT.total_seconds())),
|
|
"{{ default_grace }}": str(int(DEFAULT_GRACE.total_seconds())),
|
|
"SITE_NAME": settings.SITE_NAME,
|
|
"SITE_ROOT": settings.SITE_ROOT,
|
|
"SITE_HOSTNAME": site_hostname(),
|
|
"SITE_SCHEME": urlparse(settings.SITE_ROOT).scheme,
|
|
"PING_ENDPOINT": settings.PING_ENDPOINT,
|
|
"PING_URL": settings.PING_ENDPOINT + "your-uuid-here",
|
|
"PING_BODY_LIMIT_FORMATTED": limit_fmt,
|
|
"PING_BODY_LIMIT": str(limit),
|
|
"IMG_URL": os.path.join(settings.STATIC_URL, "img/docs"),
|
|
}
|
|
|
|
for placeholder, value in replaces.items():
|
|
html = html.replace(placeholder, value)
|
|
|
|
return html
|
|
|
|
|
|
def serve_doc(request: HttpRequest, doc: str = "introduction") -> HttpResponse:
|
|
# Filenames in /templates/docs/ consist of lowercase letters and underscores,
|
|
# -- make sure we don't accept anything else
|
|
if not re.match(r"^[0-9a-z_]+$", doc):
|
|
raise Http404("not found")
|
|
|
|
path = settings.BASE_DIR / f"templates/docs/{doc}.html-fragment"
|
|
if not path.exists():
|
|
raise Http404("not found")
|
|
|
|
with path.open("r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
content = _replace_placeholders(doc, content)
|
|
ctx = {
|
|
"page": "docs",
|
|
"section": doc,
|
|
"content": content,
|
|
"first_line": content.split("\n")[0],
|
|
}
|
|
|
|
return render(request, "front/docs_single.html", ctx)
|
|
|
|
|
|
@csrf_exempt
|
|
def docs_search(request: HttpRequest) -> HttpResponse:
|
|
form = forms.SearchForm(request.GET)
|
|
if not form.is_valid():
|
|
return render(request, "front/docs_search.html", {"results": []})
|
|
|
|
query = """
|
|
SELECT slug, title, snippet(docs, 2, '<span>', '</span>', '…', 10)
|
|
FROM docs
|
|
WHERE docs MATCH ?
|
|
ORDER BY bm25(docs, 2.0, 10.0, 1.0)
|
|
LIMIT 8
|
|
"""
|
|
|
|
# Wrap the query in double quotes to get a valid FTS string
|
|
# https://www.sqlite.org/fts5.html#full_text_query_syntax
|
|
q = '"%s"' % form.cleaned_data["q"]
|
|
con = sqlite3.connect(settings.BASE_DIR / "search.db")
|
|
cur = con.cursor()
|
|
res = cur.execute(query, (q,))
|
|
|
|
ctx = {"results": res.fetchall()}
|
|
return render(request, "front/docs_search.html", ctx)
|
|
|
|
|
|
def docs_cron(request: HttpRequest) -> HttpResponse:
|
|
return render(request, "front/docs_cron.html", {"page": "docs-cron"})
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def add_check(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
if project.num_checks_available() <= 0:
|
|
return HttpResponseBadRequest()
|
|
|
|
form = forms.AddCheckForm(request.POST)
|
|
if not form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
check = Check(project=project)
|
|
check.name = form.cleaned_data["name"]
|
|
check.slug = form.cleaned_data["slug"]
|
|
check.tags = form.cleaned_data["tags"]
|
|
check.kind = form.cleaned_data["kind"]
|
|
check.timeout = form.cleaned_data["timeout"]
|
|
check.schedule = form.cleaned_data["schedule"]
|
|
check.tz = form.cleaned_data["tz"]
|
|
check.grace = form.cleaned_data["grace"]
|
|
check.save()
|
|
|
|
check.assign_all_channels()
|
|
|
|
url = reverse("hc-checks", args=[project.code])
|
|
url += _get_referer_qs(request) # Preserve selected tags and search
|
|
return redirect(url)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def update_name(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
check = _get_rw_check_for_user(request, code)
|
|
|
|
form = forms.NameTagsForm(request.POST)
|
|
if form.is_valid():
|
|
check.name = form.cleaned_data["name"]
|
|
check.slug = form.cleaned_data["slug"]
|
|
check.tags = form.cleaned_data["tags"]
|
|
check.desc = form.cleaned_data["desc"]
|
|
check.save()
|
|
|
|
if "/details/" in request.META.get("HTTP_REFERER", ""):
|
|
return redirect("hc-details", code)
|
|
|
|
url = reverse("hc-checks", args=[check.project.code])
|
|
url += _get_referer_qs(request) # Preserve selected tags and search
|
|
return redirect(url)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def filtering_rules(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
check = _get_rw_check_for_user(request, code)
|
|
|
|
form = forms.FilteringRulesForm(request.POST)
|
|
if form.is_valid():
|
|
check.filter_subject = form.cleaned_data["filter_subject"]
|
|
check.filter_body = form.cleaned_data["filter_body"]
|
|
check.start_kw = form.cleaned_data["start_kw"]
|
|
check.success_kw = form.cleaned_data["success_kw"]
|
|
check.failure_kw = form.cleaned_data["failure_kw"]
|
|
|
|
check.methods = form.cleaned_data["methods"]
|
|
check.manual_resume = form.cleaned_data["manual_resume"]
|
|
check.save()
|
|
|
|
return redirect("hc-details", code)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def update_timeout(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
check = _get_rw_check_for_user(request, code)
|
|
|
|
kind = request.POST.get("kind")
|
|
if kind == "simple":
|
|
simple_form = forms.TimeoutForm(request.POST)
|
|
if not simple_form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
check.kind = "simple"
|
|
check.timeout = simple_form.cleaned_data["timeout"]
|
|
check.grace = simple_form.cleaned_data["grace"]
|
|
elif kind == "cron":
|
|
cron_form = forms.CronForm(request.POST)
|
|
if not cron_form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
check.kind = "cron"
|
|
check.schedule = cron_form.cleaned_data["schedule"]
|
|
check.tz = cron_form.cleaned_data["tz"]
|
|
check.grace = cron_form.cleaned_data["grace"]
|
|
elif kind == "oncalendar":
|
|
oncalendar_form = forms.OnCalendarForm(request.POST)
|
|
if not oncalendar_form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
check.kind = "oncalendar"
|
|
check.schedule = oncalendar_form.cleaned_data["schedule"]
|
|
check.tz = oncalendar_form.cleaned_data["tz"]
|
|
check.grace = oncalendar_form.cleaned_data["grace"]
|
|
|
|
check.alert_after = check.going_down_after()
|
|
if check.status == "up":
|
|
assert check.alert_after
|
|
if check.alert_after < now():
|
|
# Checks can flip from "up" to "down" state as a result of changing check's
|
|
# schedule. We don't want to send notifications when changing schedule
|
|
# interactively in the web UI. So we update the `alert_after` and `status`
|
|
# fields, and create a Flip object here the same way as `sendalerts` would
|
|
# do, but without sending an actual alert.
|
|
#
|
|
# We need to create the Flip object because otherwise the calculation
|
|
# in Check.downtimes() will come out wrong (when this check later comes up,
|
|
# we will have no record of when it went down).
|
|
check.create_flip("down", mark_as_processed=True)
|
|
|
|
check.alert_after = None
|
|
check.status = "down"
|
|
|
|
check.save()
|
|
|
|
if "/details/" in request.META.get("HTTP_REFERER", ""):
|
|
return redirect("hc-details", code)
|
|
|
|
url = reverse("hc-checks", args=[check.project.code])
|
|
url += _get_referer_qs(request) # Preserve selected tags and search
|
|
return redirect(url)
|
|
|
|
|
|
@require_POST
|
|
def cron_preview(request: HttpRequest) -> HttpResponse:
|
|
schedule = request.POST.get("schedule", "")
|
|
tz = request.POST.get("tz")
|
|
ctx: dict[str, object] = {"tz": tz}
|
|
|
|
if tz not in all_timezones:
|
|
ctx["bad_tz"] = True
|
|
return render(request, "front/cron_preview.html", ctx)
|
|
|
|
now_local = now().astimezone(ZoneInfo(tz))
|
|
try:
|
|
it = CronSim(schedule, now_local)
|
|
ctx["dates"] = list(islice(it, 0, 6))
|
|
ctx["desc"] = it.explain()
|
|
except CronSimError:
|
|
ctx["bad_schedule"] = True
|
|
|
|
if not ctx.get("dates"):
|
|
ctx["bad_schedule"] = True
|
|
|
|
return render(request, "front/cron_preview.html", ctx)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def oncalendar_preview(request: HttpRequest) -> HttpResponse:
|
|
schedule = request.POST.get("schedule", "")
|
|
tz = request.POST.get("tz")
|
|
ctx: dict[str, object] = {"tz": tz, "dates": []}
|
|
|
|
if tz not in all_timezones:
|
|
ctx["bad_tz"] = True
|
|
return render(request, "front/oncalendar_preview.html", ctx)
|
|
|
|
now_local = now().astimezone(ZoneInfo(tz))
|
|
try:
|
|
it = OnCalendar(schedule, now_local)
|
|
iterations = 6 if tz == "UTC" else 4
|
|
ctx["dates"] = list(islice(it, 0, iterations))
|
|
except OnCalendarError:
|
|
ctx["bad_schedule"] = True
|
|
|
|
if not ctx["dates"]:
|
|
ctx["bad_schedule"] = True
|
|
|
|
return render(request, "front/oncalendar_preview.html", ctx)
|
|
|
|
|
|
def validate_schedule(request: HttpRequest) -> HttpResponse:
|
|
kind = request.GET.get("kind", "")
|
|
iterator: type[CronSim] | type[OnCalendar]
|
|
if kind == "cron":
|
|
iterator = CronSim
|
|
elif kind == "oncalendar":
|
|
iterator = OnCalendar
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
schedule = request.GET.get("schedule", "")
|
|
result = True
|
|
try:
|
|
# Does cronsim/oncalendar accept the schedule?
|
|
it = iterator(schedule, now())
|
|
# Can it calculate the next datetime?
|
|
next(it)
|
|
except (CronSimError, OnCalendarError, StopIteration):
|
|
result = False
|
|
|
|
return JsonResponse({"result": result})
|
|
|
|
|
|
@login_required
|
|
def ping_details(
|
|
request: AuthenticatedHttpRequest, code: UUID, n: int | None = None
|
|
) -> HttpResponse:
|
|
check, rw = _get_check_for_user(request, code)
|
|
q = Ping.objects.filter(owner=check)
|
|
if n:
|
|
q = q.filter(n=n)
|
|
else:
|
|
# When n is not specified, look up the most recent success or failure,
|
|
# ignoring "start", "log", "ign" events
|
|
q = q.exclude(kind__in=("start", "log", "ign"))
|
|
|
|
try:
|
|
ping = q.latest("created")
|
|
except Ping.DoesNotExist:
|
|
return render(request, "front/ping_details_not_found.html")
|
|
|
|
body = ping.get_body()
|
|
ctx = {
|
|
"check": check,
|
|
"ping": ping,
|
|
"body": body,
|
|
"plain": None,
|
|
"html": None,
|
|
"active": None,
|
|
}
|
|
|
|
if ping.scheme == "email" and body:
|
|
parsed = email.message_from_string(body, policy=email.policy.SMTP)
|
|
assert isinstance(parsed, EmailMessage)
|
|
ctx["subject"] = parsed.get("subject", "")
|
|
|
|
# The "active" tab is set to show the value that's successfully parsed last.
|
|
# Per the current implementation, this means that if both plain text and HTML
|
|
# content are present, the ping details dialog will initially display the HTML
|
|
# content, otherwise - only one content type exists, and we default to that
|
|
# (either plain text or HTML, at least one of them should exist in a
|
|
# valid email).
|
|
#
|
|
# NOTE: If both plain text and html have not been parsed successfully the
|
|
# "active" tab is not set at all, but currently this is not an issue since in
|
|
# this case the "ping details" template does not render any tabs.
|
|
|
|
plain_mime_part = parsed.get_body(("plain",))
|
|
if plain_mime_part:
|
|
assert isinstance(plain_mime_part, EmailMessage)
|
|
ctx["plain"] = plain_mime_part.get_content()
|
|
ctx["active"] = "plain"
|
|
|
|
html_mime_part = parsed.get_body(("html",))
|
|
if html_mime_part:
|
|
assert isinstance(html_mime_part, EmailMessage)
|
|
ctx["html"] = html_mime_part.get_content()
|
|
ctx["active"] = "html"
|
|
|
|
return render(request, "front/ping_details.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def ping_body(request: AuthenticatedHttpRequest, code: UUID, n: int) -> HttpResponse:
|
|
check, rw = _get_check_for_user(request, code)
|
|
ping = get_object_or_404(Ping, owner=check, n=n)
|
|
|
|
body = ping.get_body_bytes()
|
|
if not body:
|
|
raise Http404("not found")
|
|
|
|
response = HttpResponse(body, content_type="application/octet-stream")
|
|
filename = "%s-%s" % (check.code, ping.n)
|
|
response["Content-Disposition"] = f'attachment; filename="{filename}.txt"'
|
|
return response
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def pause(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
check = _get_rw_check_for_user(request, code)
|
|
|
|
# Track the status change for correct downtime calculation in Check.downtimes()
|
|
check.create_flip("paused", mark_as_processed=True)
|
|
|
|
check.status = "paused"
|
|
check.last_start = None
|
|
check.alert_after = None
|
|
check.save()
|
|
|
|
# After pausing a check we must check if all checks are up,
|
|
# and Profile.next_nag_date needs to be cleared out:
|
|
check.project.update_next_nag_dates()
|
|
|
|
# Don't redirect after an AJAX request:
|
|
if request.META.get("HTTP_X_REQUESTED_WITH") == "XMLHttpRequest":
|
|
return HttpResponse()
|
|
|
|
return redirect("hc-details", code)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def resume(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
check = _get_rw_check_for_user(request, code)
|
|
if check.status != "paused":
|
|
return HttpResponseBadRequest()
|
|
|
|
check.create_flip("new", mark_as_processed=True)
|
|
|
|
check.status = "new"
|
|
check.last_start = None
|
|
check.last_ping = None
|
|
check.alert_after = None
|
|
check.save()
|
|
|
|
return redirect("hc-details", code)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def remove_check(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
check = _get_rw_check_for_user(request, code)
|
|
|
|
project = check.project
|
|
check.lock_and_delete()
|
|
return redirect("hc-checks", project.code)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def clear_events(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
check = _get_rw_check_for_user(request, code)
|
|
|
|
check.status = "new"
|
|
check.last_ping = None
|
|
check.last_start = None
|
|
check.last_duration = None
|
|
check.has_confirmation_link = False
|
|
check.alert_after = None
|
|
check.save()
|
|
|
|
check.ping_set.all().delete()
|
|
check.notification_set.all().delete()
|
|
check.flip_set.all().delete()
|
|
|
|
return redirect("hc-details", code)
|
|
|
|
|
|
class PingAnnotations(TypedDict):
|
|
body_raw_preview: bytes
|
|
|
|
|
|
def _get_events(
|
|
check: Check,
|
|
page_limit: int,
|
|
start: datetime,
|
|
end: datetime,
|
|
kinds: tuple[str, ...] | None = None,
|
|
) -> list[Notification | WithAnnotations[Ping, PingAnnotations] | Flip]:
|
|
# Sorting by "n" instead of "id" is important here. Both give the same
|
|
# query results, but sorting by "id" can cause postgres to pick
|
|
# api_ping.id index (slow if the api_ping table is big). Sorting by
|
|
# "n" works around the problem--postgres picks the api_ping.owner_id index.
|
|
pq = check.visible_pings.order_by("-n")
|
|
pq = pq.filter(created__gte=start, created__lte=end)
|
|
if kinds is not None:
|
|
kinds_filter = Q(kind__in=kinds)
|
|
if "success" in kinds:
|
|
kinds_filter = kinds_filter | Q(kind__isnull=True) | Q(kind="")
|
|
pq = pq.filter(kinds_filter)
|
|
|
|
# Optimization: defer loading body_raw, instead load its first 150 bytes
|
|
# as "body_raw_preview". This reduces both network I/O to database, and disk I/O
|
|
# on the database host if the database contains large request bodies.
|
|
pq = pq.defer("body_raw")
|
|
pq = pq.annotate(body_raw_preview=Substr("body_raw", 1, 151))
|
|
pings = list(pq[:page_limit])
|
|
|
|
# Optimization: the template will access Ping.duration, which would generate a
|
|
# SQL query per displayed ping. Since we've already fetched a list of pings,
|
|
# for some of them we can calculate durations more efficiently, without causing
|
|
# additional SQL queries:
|
|
starts: dict[UUID | None, datetime | None] = {}
|
|
num_misses = 0
|
|
for ping in reversed(pings):
|
|
if ping.kind == "start":
|
|
starts[ping.rid] = ping.created
|
|
elif ping.kind in (None, "", "fail"):
|
|
if ping.rid not in starts:
|
|
# We haven't seen a start, success or fail event for this rid.
|
|
# Will need to fall back to Ping.duration().
|
|
num_misses += 1
|
|
else:
|
|
ping.duration = None
|
|
matching_start = starts[ping.rid]
|
|
if matching_start is not None:
|
|
if ping.created - matching_start < MAX_DURATION:
|
|
ping.duration = ping.created - matching_start
|
|
|
|
starts[ping.rid] = None
|
|
|
|
# If we will need to fall back to Ping.duration() more than 10 times
|
|
# then disable duration display altogether:
|
|
if num_misses > 10:
|
|
for ping in pings:
|
|
ping.duration = None
|
|
|
|
alerts: list[Notification] = []
|
|
if kinds and "notification" in kinds:
|
|
aq = check.notification_set.order_by("-created")
|
|
aq = aq.filter(created__gte=start, created__lte=end, check_status="down")
|
|
aq = aq.select_related("channel")
|
|
alerts = list(aq[:page_limit])
|
|
|
|
flips: list[Flip] = []
|
|
if kinds is None or "flip" in kinds:
|
|
fq = check.flip_set.order_by("-created")
|
|
fq = fq.filter(created__gte=start, created__lte=end)
|
|
flips = list(fq[:page_limit])
|
|
|
|
events = pings + alerts + flips
|
|
# Sort events by the timestamp.
|
|
# If timestamps are equal, put flips chronologically after pings
|
|
events.sort(key=lambda el: (el.created, isinstance(el, Flip)), reverse=True)
|
|
return events[:page_limit]
|
|
|
|
|
|
@login_required
|
|
def log(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
check, rw = _get_check_for_user(request, code, preload_owner_profile=True)
|
|
|
|
smin = check.created
|
|
smax = now()
|
|
oldest_ping = check.visible_pings.order_by("n").first()
|
|
if oldest_ping:
|
|
smin = max(smin, oldest_ping.created)
|
|
|
|
events = _get_events(check, 1000, start=smin, end=smax)
|
|
ctx = {
|
|
"page": "log",
|
|
"project": check.project,
|
|
"check": check,
|
|
"min": smin,
|
|
"max": smax,
|
|
"events": events,
|
|
"oldest_ping": oldest_ping,
|
|
}
|
|
|
|
if events:
|
|
# A full precision timestamp of the most recent event.
|
|
# This will be used client-side for fetching live updates to specify
|
|
# "return any events after *this* point".
|
|
ctx["last_event_timestamp"] = events[0].created.timestamp()
|
|
|
|
return render(request, "front/log.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def details(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
_refresh_last_active_date(request)
|
|
check, rw = _get_check_for_user(request, code, preload_owner_profile=True)
|
|
|
|
if request.GET.get("urls") in ("uuid", "slug") and rw:
|
|
check.project.show_slugs = request.GET["urls"] == "slug"
|
|
check.project.save()
|
|
|
|
all_channels = check.project.channel_set.order_by("created")
|
|
regular_channels: list[Channel] = []
|
|
group_channels: list[Channel] = []
|
|
for channel in all_channels:
|
|
channels = group_channels if channel.kind == "group" else regular_channels
|
|
channels.append(channel)
|
|
|
|
all_tags = set()
|
|
sibling_checks = Check.objects.filter(project=check.project).only("tags", "tz")
|
|
for sibling in sibling_checks:
|
|
if sibling.tags:
|
|
all_tags.update(sibling.tags.split(" "))
|
|
|
|
ctx = {
|
|
"page": "details",
|
|
"project": check.project,
|
|
"check": check,
|
|
"rw": rw,
|
|
"channels": regular_channels,
|
|
"group_channels": group_channels,
|
|
"enabled_channels": list(check.channel_set.all()),
|
|
"common_timezones": _common_timezones(sibling_checks),
|
|
"timezones": all_timezones,
|
|
"downtimes": check.downtimes(3, request.profile.tz),
|
|
"tz": request.profile.tz,
|
|
"is_copied": "copied" in request.GET,
|
|
"all_tags": " ".join(sorted(all_tags)),
|
|
}
|
|
|
|
return render(request, "front/details.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def uncloak(request: AuthenticatedHttpRequest, unique_key: str) -> HttpResponse:
|
|
for check in request.profile.checks_from_all_projects().only("code"):
|
|
if check.unique_key == unique_key:
|
|
return redirect("hc-details", check.code)
|
|
|
|
raise Http404("not found")
|
|
|
|
|
|
@login_required
|
|
def transfer(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
check = _get_rw_check_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.TransferForm(request.POST)
|
|
if not form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
target_project = _get_rw_project_for_user(request, form.cleaned_data["project"])
|
|
if target_project.owner_id != check.project.owner_id:
|
|
if target_project.num_checks_available() <= 0:
|
|
return HttpResponseBadRequest()
|
|
|
|
check.project = target_project
|
|
check.save()
|
|
check.assign_all_channels()
|
|
|
|
messages.success(request, "Check transferred successfully!")
|
|
return redirect("hc-details", code)
|
|
|
|
ctx = {"check": check}
|
|
return render(request, "front/transfer_modal.html", ctx)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def copy(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
check = _get_rw_check_for_user(request, code)
|
|
|
|
if check.project.num_checks_available() <= 0:
|
|
return HttpResponseBadRequest()
|
|
|
|
new_name = check.name + " (copy)"
|
|
# Make sure we don't exceed the 100 character db field limit:
|
|
if len(new_name) > 100:
|
|
new_name = check.name[:90] + "... (copy)"
|
|
|
|
new_slug = check.slug + "-copy"
|
|
if len(new_slug) > 100:
|
|
new_slug = ""
|
|
|
|
copied = Check(project=check.project)
|
|
copied.name = new_name
|
|
copied.slug = new_slug
|
|
copied.desc, copied.tags = check.desc, check.tags
|
|
|
|
copied.filter_subject = check.filter_subject
|
|
copied.filter_body = check.filter_body
|
|
copied.start_kw = check.start_kw
|
|
copied.success_kw = check.success_kw
|
|
copied.failure_kw = check.failure_kw
|
|
|
|
copied.methods = check.methods
|
|
copied.manual_resume = check.manual_resume
|
|
|
|
copied.kind = check.kind
|
|
copied.timeout, copied.grace = check.timeout, check.grace
|
|
copied.schedule, copied.tz = check.schedule, check.tz
|
|
copied.save()
|
|
|
|
copied.channel_set.add(*check.channel_set.all())
|
|
|
|
url = reverse("hc-details", args=[copied.code])
|
|
return redirect(url + "?copied")
|
|
|
|
|
|
def status_single(request: HttpRequest, code: UUID) -> HttpResponse:
|
|
if not request.user.is_authenticated:
|
|
return HttpResponseForbidden()
|
|
|
|
# We now know user is logged, tell the type checker request.profile exists-
|
|
request = cast(AuthenticatedHttpRequest, request)
|
|
check, rw = _get_check_for_user(request, code, preload_owner_profile=True)
|
|
|
|
status = check.get_status()
|
|
events = _get_events(check, 30, start=check.created, end=now())
|
|
updated = "1"
|
|
if len(events):
|
|
updated = str(events[0].created.timestamp())
|
|
|
|
doc = {
|
|
"status": status,
|
|
"status_text": STATUS_TEXT_TMPL.render({"check": check, "rw": rw}),
|
|
"title": down_title(check),
|
|
"updated": updated,
|
|
"started": check.last_start is not None,
|
|
}
|
|
|
|
if updated != request.GET.get("u"):
|
|
doc["events"] = EVENTS_TMPL.render({"check": check, "events": events})
|
|
downtimes = check.downtimes(3, request.profile.tz)
|
|
doc["downtimes"] = DOWNTIMES_TMPL.render(
|
|
{"downtimes": downtimes, "tz": request.profile.tz}
|
|
)
|
|
|
|
return JsonResponse(doc)
|
|
|
|
|
|
@login_required
|
|
def badges(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project, rw = _get_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.BadgeSettingsForm(request.POST)
|
|
if not form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
fmt = form.cleaned_data["fmt"]
|
|
states = form.cleaned_data["states"]
|
|
with_late = True if states == "3" else False
|
|
if form.cleaned_data["target"] == "all":
|
|
label = settings.MASTER_BADGE_LABEL
|
|
url = get_badge_url(project.badge_key, "*", fmt, with_late)
|
|
elif form.cleaned_data["target"] == "tag":
|
|
label = form.cleaned_data["tag"]
|
|
url = get_badge_url(project.badge_key, label, fmt, with_late)
|
|
elif form.cleaned_data["target"] == "check":
|
|
check = project.check_set.get(code=form.cleaned_data["check"])
|
|
url = settings.SITE_ROOT + reverse(
|
|
"hc-badge-check", args=[states, check.prepare_badge_key(), fmt]
|
|
)
|
|
label = check.name_then_code()
|
|
|
|
if fmt == "shields":
|
|
url = "https://img.shields.io/endpoint?" + urlencode({"url": url})
|
|
|
|
ctx = {"fmt": fmt, "label": label, "url": url}
|
|
return render(request, "front/badges_preview.html", ctx)
|
|
|
|
checks = list(project.check_set.order_by("name"))
|
|
tags = set()
|
|
for check in checks:
|
|
tags.update(check.tags_list())
|
|
|
|
sorted_tags = sorted(tags, key=lambda s: s.lower())
|
|
|
|
ctx = {
|
|
"project": project,
|
|
"page": "badges",
|
|
"checks": checks,
|
|
"tags": sorted_tags,
|
|
"fmt": "svg",
|
|
"label": settings.MASTER_BADGE_LABEL,
|
|
"url": get_badge_url(project.badge_key, "*"),
|
|
}
|
|
return render(request, "front/badges.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def channels(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project, rw = _get_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
if not rw:
|
|
return HttpResponseForbidden()
|
|
|
|
channel_code = request.POST["channel"]
|
|
try:
|
|
channel = Channel.objects.get(code=channel_code)
|
|
except Channel.DoesNotExist:
|
|
return HttpResponseBadRequest()
|
|
if channel.project_id != project.id:
|
|
return HttpResponseForbidden()
|
|
|
|
new_checks = []
|
|
for key in request.POST:
|
|
if key.startswith("check-"):
|
|
check_code = key[6:]
|
|
try:
|
|
check = Check.objects.get(code=check_code)
|
|
except Check.DoesNotExist:
|
|
return HttpResponseBadRequest()
|
|
if check.project_id != project.id:
|
|
return HttpResponseForbidden()
|
|
new_checks.append(check)
|
|
|
|
channel.checks.set(new_checks)
|
|
return redirect("hc-channels", project.code)
|
|
|
|
channels = project.channel_set.annotate(n_checks=Count("checks"))
|
|
# Sort groups first, then in the creation order
|
|
channels = channels.annotate(is_group=Case(When(kind="group", then=0), default=1))
|
|
channels = channels.order_by("is_group", "created")
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"rw": rw,
|
|
"project": project,
|
|
"profile": project.owner_profile,
|
|
"channels": channels,
|
|
"enable_apprise": settings.APPRISE_ENABLED is True,
|
|
"enable_call": bool(settings.TWILIO_AUTH),
|
|
"enable_discord": bool(settings.DISCORD_CLIENT_ID),
|
|
"enable_matrix": bool(settings.MATRIX_ACCESS_TOKEN),
|
|
"enable_mattermost": settings.MATTERMOST_ENABLED is True,
|
|
"enable_msteams": settings.MSTEAMS_ENABLED is True,
|
|
"enable_opsgenie": settings.OPSGENIE_ENABLED is True,
|
|
"enable_pagertree": settings.PAGERTREE_ENABLED is True,
|
|
"enable_pd": settings.PD_ENABLED is True,
|
|
"enable_prometheus": settings.PROMETHEUS_ENABLED is True,
|
|
"enable_pushbullet": bool(settings.PUSHBULLET_CLIENT_ID),
|
|
"enable_pushover": bool(settings.PUSHOVER_API_TOKEN),
|
|
"enable_rocketchat": settings.ROCKETCHAT_ENABLED is True,
|
|
"enable_shell": settings.SHELL_ENABLED is True,
|
|
"enable_signal": bool(settings.SIGNAL_CLI_SOCKET),
|
|
"enable_slack": settings.SLACK_ENABLED is True,
|
|
"enable_slack_btn": bool(settings.SLACK_CLIENT_ID),
|
|
"enable_sms": bool(settings.TWILIO_AUTH),
|
|
"enable_spike": settings.SPIKE_ENABLED is True,
|
|
"enable_telegram": bool(settings.TELEGRAM_TOKEN),
|
|
"enable_trello": bool(settings.TRELLO_APP_KEY),
|
|
"enable_victorops": settings.VICTOROPS_ENABLED is True,
|
|
"enable_webhooks": settings.WEBHOOKS_ENABLED is True,
|
|
"enable_whatsapp": settings.TWILIO_USE_WHATSAPP,
|
|
"enable_zulip": settings.ZULIP_ENABLED is True,
|
|
"use_payments": settings.USE_PAYMENTS,
|
|
}
|
|
|
|
return render(request, "front/channels.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def channel_checks(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
channel = _get_rw_channel_for_user(request, code)
|
|
|
|
assigned = set(channel.checks.values_list("code", flat=True).distinct())
|
|
checks = channel.project.check_set.order_by("created")
|
|
ctx = {"checks": checks, "assigned": assigned, "channel": channel}
|
|
|
|
return render(request, "front/channel_checks.html", ctx)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def update_channel_name(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
channel = _get_rw_channel_for_user(request, code)
|
|
|
|
form = forms.ChannelNameForm(request.POST)
|
|
if form.is_valid():
|
|
channel.name = form.cleaned_data["name"]
|
|
channel.save()
|
|
|
|
return redirect("hc-channels", channel.project.code)
|
|
|
|
|
|
def verify_email(request: HttpRequest, code: UUID, token: str) -> HttpResponse:
|
|
channel = get_object_or_404(Channel, code=code)
|
|
if channel.make_token() == token:
|
|
channel.email_verified = True
|
|
channel.save()
|
|
return render(request, "front/verify_email_success.html")
|
|
|
|
return render(request, "bad_link.html")
|
|
|
|
|
|
@csrf_exempt
|
|
def unsubscribe_email(
|
|
request: HttpRequest, code: UUID, signed_token: str
|
|
) -> HttpResponse:
|
|
ctx = {}
|
|
|
|
# Some email servers open links in emails to check for malicious content.
|
|
# To work around this, on GET requests we serve a confirmation form.
|
|
# If the signature is at least 5 minutes old, we also include JS code to
|
|
# auto-submit the form.
|
|
signer = signing.TimestampSigner(salt="alerts")
|
|
|
|
# First, check the signature without looking at the timestamp:
|
|
try:
|
|
token = signer.unsign(signed_token)
|
|
except signing.BadSignature:
|
|
return render(request, "bad_link.html")
|
|
|
|
# Then, check if timestamp is older than 5 minutes:
|
|
try:
|
|
signer.unsign(signed_token, max_age=300)
|
|
except signing.SignatureExpired:
|
|
ctx["autosubmit"] = True
|
|
|
|
channel = get_object_or_404(Channel, code=code, kind="email")
|
|
if channel.make_token() != token:
|
|
return render(request, "bad_link.html")
|
|
|
|
if request.method != "POST":
|
|
return render(request, "accounts/unsubscribe_submit.html", ctx)
|
|
|
|
channel.delete()
|
|
return render(request, "front/unsubscribe_success.html")
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def send_test_notification(
|
|
request: AuthenticatedHttpRequest, code: UUID
|
|
) -> HttpResponse:
|
|
channel, rw = _get_channel_for_user(request, code)
|
|
|
|
dummy = Check(name="TEST", status="down", project=channel.project)
|
|
dummy.last_ping = now() - td(days=1)
|
|
dummy.n_pings = 42
|
|
|
|
dummy_flip = Flip(owner=dummy)
|
|
dummy_flip.created = now()
|
|
dummy_flip.old_status = "up"
|
|
dummy_flip.new_status = "down"
|
|
|
|
# Delete all older test notifications for this channel
|
|
Notification.objects.filter(channel=channel, owner=None).delete()
|
|
|
|
# Send the test notification
|
|
error = channel.notify(dummy_flip, is_test=True)
|
|
|
|
if error == "no-op":
|
|
# This channel may be configured to send "up" notifications only.
|
|
dummy_flip.old_status = "down"
|
|
dummy_flip.new_status = "up"
|
|
error = channel.notify(dummy_flip, is_test=True)
|
|
|
|
if error:
|
|
messages.warning(request, "Could not send a test notification. %s." % error)
|
|
else:
|
|
messages.success(request, "Test notification sent!")
|
|
|
|
return redirect("hc-channels", channel.project.code)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def remove_channel(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
channel = _get_rw_channel_for_user(request, code)
|
|
project = channel.project
|
|
channel.delete()
|
|
|
|
return redirect("hc-channels", project.code)
|
|
|
|
|
|
def email_form(request: AuthenticatedHttpRequest, channel: Channel) -> HttpResponse:
|
|
adding = channel._state.adding
|
|
if request.method == "POST":
|
|
form = forms.EmailForm(request.POST)
|
|
if form.is_valid():
|
|
if channel.disabled or form.cleaned_data["value"] != channel.email.value:
|
|
channel.disabled = False
|
|
|
|
if not settings.EMAIL_USE_VERIFICATION:
|
|
# In self-hosted setting, administrator can set
|
|
# EMAIL_USE_VERIFICATION=False to disable email verification
|
|
channel.email_verified = True
|
|
elif form.cleaned_data["value"] == request.user.email:
|
|
# If the user is adding *their own* address
|
|
# we skip the verification step
|
|
channel.email_verified = True
|
|
else:
|
|
channel.email_verified = False
|
|
|
|
channel.value = form.get_value()
|
|
channel.save()
|
|
|
|
if adding:
|
|
channel.assign_all_checks()
|
|
|
|
if not channel.email_verified:
|
|
channel.send_verify_link()
|
|
|
|
return redirect("hc-channels", channel.project.code)
|
|
elif adding:
|
|
form = forms.EmailForm()
|
|
else:
|
|
form = forms.EmailForm(
|
|
{
|
|
"value": channel.email.value,
|
|
"up": channel.email.notify_up,
|
|
"down": channel.email.notify_down,
|
|
}
|
|
)
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": channel.project,
|
|
"use_verification": settings.EMAIL_USE_VERIFICATION,
|
|
"form": form,
|
|
"is_new": adding,
|
|
}
|
|
return render(request, "integrations/email_form.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_email(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
channel = Channel(project=project, kind="email")
|
|
return email_form(request, channel)
|
|
|
|
|
|
@login_required
|
|
def edit_channel(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
channel = _get_rw_channel_for_user(request, code)
|
|
if channel.kind == "email":
|
|
return email_form(request, channel)
|
|
elif channel.kind == "webhook":
|
|
return webhook_form(request, channel)
|
|
elif channel.kind == "sms":
|
|
return sms_form(request, channel)
|
|
elif channel.kind == "signal":
|
|
return signal_form(request, channel)
|
|
elif channel.kind == "whatsapp":
|
|
return whatsapp_form(request, channel)
|
|
elif channel.kind == "ntfy":
|
|
return ntfy_form(request, channel)
|
|
elif channel.kind == "group":
|
|
return group_form(request, channel)
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
|
|
@require_setting("WEBHOOKS_ENABLED")
|
|
def webhook_form(request: HttpRequest, channel: Channel) -> HttpResponse:
|
|
adding = channel._state.adding
|
|
if request.method == "POST":
|
|
form = forms.WebhookForm(request.POST)
|
|
if form.is_valid():
|
|
channel.name = form.cleaned_data["name"]
|
|
channel.value = form.get_value()
|
|
channel.save()
|
|
|
|
if adding:
|
|
channel.assign_all_checks()
|
|
|
|
return redirect("hc-channels", channel.project.code)
|
|
elif adding:
|
|
form = forms.WebhookForm()
|
|
else:
|
|
|
|
def flatten(d: dict[str, str]) -> str:
|
|
return "\n".join("%s: %s" % pair for pair in d.items())
|
|
|
|
doc = json.loads(channel.value)
|
|
doc["headers_down"] = flatten(doc["headers_down"])
|
|
doc["headers_up"] = flatten(doc["headers_up"])
|
|
doc["name"] = channel.name
|
|
form = forms.WebhookForm(doc)
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": channel.project,
|
|
"form": form,
|
|
"is_new": adding,
|
|
}
|
|
return render(request, "integrations/webhook_form.html", ctx)
|
|
|
|
|
|
@require_setting("WEBHOOKS_ENABLED")
|
|
@login_required
|
|
def add_webhook(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
channel = Channel(project=project, kind="webhook")
|
|
return webhook_form(request, channel)
|
|
|
|
|
|
@require_setting("SHELL_ENABLED")
|
|
@login_required
|
|
def add_shell(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
if request.method == "POST":
|
|
form = forms.AddShellForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="shell")
|
|
channel.value = form.get_value()
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddShellForm()
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": project,
|
|
"form": form,
|
|
}
|
|
return render(request, "integrations/add_shell.html", ctx)
|
|
|
|
|
|
@require_setting("PD_ENABLED")
|
|
@login_required
|
|
def add_pd(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
# Simple Install Flow
|
|
if settings.PD_APP_ID:
|
|
state = token_urlsafe()
|
|
|
|
redirect_url = settings.SITE_ROOT + reverse("hc-add-pd-complete")
|
|
redirect_url += "?" + urlencode({"state": state})
|
|
|
|
install_url = "https://app.pagerduty.com/install/integration?" + urlencode(
|
|
{"app_id": settings.PD_APP_ID, "redirect_url": redirect_url, "version": "2"}
|
|
)
|
|
|
|
ctx = {"page": "channels", "project": project, "install_url": install_url}
|
|
request.session["pagerduty"] = (state, str(project.code))
|
|
return render(request, "integrations/add_pd_simple.html", ctx)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddPdForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="pd")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddPdForm()
|
|
|
|
ctx = {"page": "channels", "project": project, "form": form}
|
|
return render(request, "integrations/add_pd.html", ctx)
|
|
|
|
|
|
@require_setting("PD_ENABLED")
|
|
@require_setting("PD_APP_ID")
|
|
@login_required
|
|
def add_pd_complete(request: AuthenticatedHttpRequest) -> HttpResponse:
|
|
if "pagerduty" not in request.session:
|
|
return HttpResponseBadRequest()
|
|
|
|
state, code_str = request.session.pop("pagerduty")
|
|
code = UUID(code_str)
|
|
if request.GET.get("state") != state:
|
|
return HttpResponseForbidden()
|
|
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
doc = json.loads(request.GET["config"])
|
|
for item in doc["integration_keys"]:
|
|
channel = Channel(kind="pd", project=project)
|
|
channel.name = item["name"]
|
|
channel.value = json.dumps(
|
|
{"service_key": item["integration_key"], "account": doc["account"]["name"]}
|
|
)
|
|
channel.save()
|
|
channel.assign_all_checks()
|
|
|
|
messages.success(request, "The PagerDuty integration has been added!")
|
|
return redirect("hc-channels", project.code)
|
|
|
|
|
|
@require_setting("PD_ENABLED")
|
|
@require_setting("PD_APP_ID")
|
|
def pd_help(request: HttpRequest) -> HttpResponse:
|
|
ctx = {"page": "channels"}
|
|
return render(request, "integrations/add_pd_simple.html", ctx)
|
|
|
|
|
|
@require_setting("PAGERTREE_ENABLED")
|
|
@login_required
|
|
def add_pagertree(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="pagertree")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddUrlForm()
|
|
|
|
ctx = {"page": "channels", "project": project, "form": form}
|
|
return render(request, "integrations/add_pagertree.html", ctx)
|
|
|
|
|
|
@require_setting("SLACK_ENABLED")
|
|
@login_required
|
|
def add_slack(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="slack")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddUrlForm()
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"form": form,
|
|
}
|
|
|
|
return render(request, "integrations/add_slack.html", ctx)
|
|
|
|
|
|
@require_setting("SLACK_ENABLED")
|
|
@require_setting("SLACK_CLIENT_ID")
|
|
def slack_help(request: HttpRequest) -> HttpResponse:
|
|
ctx = {"page": "channels"}
|
|
return render(request, "integrations/add_slack_btn.html", ctx)
|
|
|
|
|
|
@require_setting("SLACK_ENABLED")
|
|
@require_setting("SLACK_CLIENT_ID")
|
|
@login_required
|
|
def add_slack_btn(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
state = token_urlsafe()
|
|
authorize_url = "https://slack.com/oauth/v2/authorize?" + urlencode(
|
|
{
|
|
"scope": "incoming-webhook",
|
|
"client_id": settings.SLACK_CLIENT_ID,
|
|
"state": state,
|
|
}
|
|
)
|
|
|
|
ctx = {
|
|
"project": project,
|
|
"page": "channels",
|
|
"authorize_url": authorize_url,
|
|
}
|
|
|
|
request.session["add_slack"] = (state, str(project.code))
|
|
return render(request, "integrations/add_slack_btn.html", ctx)
|
|
|
|
|
|
@require_setting("SLACK_ENABLED")
|
|
@require_setting("SLACK_CLIENT_ID")
|
|
@login_required
|
|
def add_slack_complete(request: AuthenticatedHttpRequest) -> HttpResponse:
|
|
if "add_slack" not in request.session:
|
|
return HttpResponseForbidden()
|
|
|
|
state, code_str = request.session.pop("add_slack")
|
|
code = UUID(code_str)
|
|
project = _get_rw_project_for_user(request, code)
|
|
if request.GET.get("error") == "access_denied":
|
|
messages.warning(request, "Slack setup was cancelled.")
|
|
return redirect("hc-channels", project.code)
|
|
|
|
if request.GET.get("state") != state:
|
|
return HttpResponseForbidden()
|
|
|
|
data = {
|
|
"client_id": settings.SLACK_CLIENT_ID,
|
|
"client_secret": settings.SLACK_CLIENT_SECRET,
|
|
"code": request.GET.get("code"),
|
|
}
|
|
result = curl.post("https://slack.com/api/oauth.v2.access", data)
|
|
|
|
doc = result.json()
|
|
if not isinstance(doc, dict) or not doc.get("ok"):
|
|
messages.warning(
|
|
request,
|
|
"Received an unexpected response from Slack. Integration not added.",
|
|
)
|
|
logger.warning("Unexpected Slack OAuth response: %s", result.content)
|
|
return redirect("hc-channels", project.code)
|
|
|
|
channel = Channel(kind="slack", project=project)
|
|
channel.value = result.text
|
|
if channel.slack_channel:
|
|
channel.name = channel.slack_channel
|
|
channel.save()
|
|
channel.assign_all_checks()
|
|
|
|
messages.success(request, "Success, integration added!")
|
|
return redirect("hc-channels", project.code)
|
|
|
|
|
|
@require_setting("MATTERMOST_ENABLED")
|
|
def mattermost_help(request: HttpRequest) -> HttpResponse:
|
|
return render(request, "integrations/add_mattermost.html")
|
|
|
|
|
|
@require_setting("MATTERMOST_ENABLED")
|
|
@login_required
|
|
def add_mattermost(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="mattermost")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddUrlForm()
|
|
|
|
ctx = {"page": "channels", "form": form, "project": project}
|
|
return render(request, "integrations/add_mattermost.html", ctx)
|
|
|
|
|
|
@require_setting("ROCKETCHAT_ENABLED")
|
|
def rocketchat_help(request: HttpRequest) -> HttpResponse:
|
|
return render(request, "integrations/add_rocketchat.html")
|
|
|
|
|
|
@require_setting("ROCKETCHAT_ENABLED")
|
|
@login_required
|
|
def add_rocketchat(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="rocketchat")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddUrlForm()
|
|
|
|
ctx = {"page": "channels", "form": form, "project": project}
|
|
return render(request, "integrations/add_rocketchat.html", ctx)
|
|
|
|
|
|
@require_setting("PUSHBULLET_CLIENT_ID")
|
|
@login_required
|
|
def add_pushbullet(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
state = token_urlsafe()
|
|
authorize_url = "https://www.pushbullet.com/authorize?" + urlencode(
|
|
{
|
|
"client_id": settings.PUSHBULLET_CLIENT_ID,
|
|
"redirect_uri": settings.SITE_ROOT + reverse(add_pushbullet_complete),
|
|
"response_type": "code",
|
|
"state": state,
|
|
}
|
|
)
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": project,
|
|
"authorize_url": authorize_url,
|
|
}
|
|
|
|
request.session["add_pushbullet"] = (state, str(project.code))
|
|
return render(request, "integrations/add_pushbullet.html", ctx)
|
|
|
|
|
|
class PushbulletOAuthResponse(BaseModel):
|
|
access_token: str
|
|
|
|
|
|
@require_setting("PUSHBULLET_CLIENT_ID")
|
|
@login_required
|
|
def add_pushbullet_complete(request: AuthenticatedHttpRequest) -> HttpResponse:
|
|
if "add_pushbullet" not in request.session:
|
|
return HttpResponseForbidden()
|
|
|
|
state, code_str = request.session.pop("add_pushbullet")
|
|
code = UUID(code_str)
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.GET.get("error") == "access_denied":
|
|
messages.warning(request, "Pushbullet setup was cancelled.")
|
|
return redirect("hc-channels", project.code)
|
|
|
|
if request.GET.get("state") != state:
|
|
return HttpResponseForbidden()
|
|
|
|
data = {
|
|
"client_id": settings.PUSHBULLET_CLIENT_ID,
|
|
"client_secret": settings.PUSHBULLET_CLIENT_SECRET,
|
|
"code": request.GET.get("code"),
|
|
"grant_type": "authorization_code",
|
|
}
|
|
result = curl.post("https://api.pushbullet.com/oauth2/token", data)
|
|
try:
|
|
doc = PushbulletOAuthResponse.model_validate_json(result.content, strict=True)
|
|
except ValidationError:
|
|
logger.warning("Unexpected Pushbullet OAuth response: %s", result.content)
|
|
messages.warning(
|
|
request,
|
|
"Received an unexpected response from Pushbullet. Integration not added.",
|
|
)
|
|
return redirect("hc-channels", project.code)
|
|
|
|
channel = Channel(kind="pushbullet", project=project)
|
|
channel.value = doc.access_token
|
|
channel.save()
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The Pushbullet integration has been added!")
|
|
return redirect("hc-channels", project.code)
|
|
|
|
|
|
@require_setting("DISCORD_CLIENT_ID")
|
|
@login_required
|
|
def add_discord(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
state = token_urlsafe()
|
|
auth_url = "https://discordapp.com/api/oauth2/authorize?" + urlencode(
|
|
{
|
|
"client_id": settings.DISCORD_CLIENT_ID,
|
|
"scope": "webhook.incoming",
|
|
"redirect_uri": settings.SITE_ROOT + reverse(add_discord_complete),
|
|
"response_type": "code",
|
|
"state": state,
|
|
}
|
|
)
|
|
|
|
ctx = {"page": "channels", "project": project, "authorize_url": auth_url}
|
|
|
|
request.session["add_discord"] = (state, str(project.code))
|
|
return render(request, "integrations/add_discord.html", ctx)
|
|
|
|
|
|
@require_setting("DISCORD_CLIENT_ID")
|
|
@login_required
|
|
def add_discord_complete(request: AuthenticatedHttpRequest) -> HttpResponse:
|
|
if "add_discord" not in request.session:
|
|
return HttpResponseForbidden()
|
|
|
|
state, code_str = request.session.pop("add_discord")
|
|
code = UUID(code_str)
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.GET.get("error") == "access_denied":
|
|
messages.warning(request, "Discord setup was cancelled.")
|
|
return redirect("hc-channels", project.code)
|
|
|
|
if request.GET.get("state") != state:
|
|
return HttpResponseForbidden()
|
|
|
|
data = {
|
|
"client_id": settings.DISCORD_CLIENT_ID,
|
|
"client_secret": settings.DISCORD_CLIENT_SECRET,
|
|
"code": request.GET.get("code"),
|
|
"grant_type": "authorization_code",
|
|
"redirect_uri": settings.SITE_ROOT + reverse(add_discord_complete),
|
|
}
|
|
result = curl.post("https://discordapp.com/api/oauth2/token", data)
|
|
|
|
doc = result.json()
|
|
if isinstance(doc, dict) and doc.get("code") == 30007:
|
|
e = "maximum number of webhooks reached"
|
|
messages.warning(request, f"Response from Discord: {e}. Integration not added.")
|
|
return redirect("hc-channels", project.code)
|
|
|
|
if not isinstance(doc, dict) or "access_token" not in doc:
|
|
messages.warning(
|
|
request,
|
|
"Received an unexpected response from Discord. Integration not added.",
|
|
)
|
|
logger.warning("Unexpected Discord OAuth response: %s", result.content)
|
|
return redirect("hc-channels", project.code)
|
|
|
|
channel = Channel(kind="discord", project=project)
|
|
channel.value = result.text
|
|
channel.save()
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The Discord integration has been added!")
|
|
return redirect("hc-channels", project.code)
|
|
|
|
|
|
@require_setting("PUSHOVER_API_TOKEN")
|
|
def pushover_help(request: HttpRequest) -> HttpResponse:
|
|
ctx = {"page": "channels"}
|
|
return render(request, "integrations/add_pushover_help.html", ctx)
|
|
|
|
|
|
@require_setting("PUSHOVER_API_TOKEN")
|
|
@login_required
|
|
def add_pushover(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
state = token_urlsafe().lower()
|
|
|
|
failure_url = settings.SITE_ROOT + reverse("hc-channels", args=[project.code])
|
|
success_url = (
|
|
settings.SITE_ROOT
|
|
+ reverse("hc-add-pushover", args=[project.code])
|
|
+ "?"
|
|
+ urlencode(
|
|
{
|
|
"state": state,
|
|
"prio": request.POST.get("po_priority", "0"),
|
|
"prio_up": request.POST.get("po_priority_up", "0"),
|
|
}
|
|
)
|
|
)
|
|
assert settings.PUSHOVER_SUBSCRIPTION_URL
|
|
subscription_url = (
|
|
settings.PUSHOVER_SUBSCRIPTION_URL
|
|
+ "?"
|
|
+ urlencode({"success": success_url, "failure": failure_url})
|
|
)
|
|
|
|
request.session["pushover"] = state
|
|
return redirect(subscription_url)
|
|
|
|
# Handle successful subscriptions
|
|
if "pushover_user_key" in request.GET:
|
|
if "pushover" not in request.session:
|
|
return HttpResponseForbidden()
|
|
|
|
state = request.session.pop("pushover")
|
|
if request.GET.get("state") != state:
|
|
return HttpResponseForbidden()
|
|
|
|
if request.GET.get("pushover_unsubscribed") == "1":
|
|
# Unsubscription: delete all Pushover channels for this project
|
|
Channel.objects.filter(project=project, kind="po").delete()
|
|
return redirect("hc-channels", project.code)
|
|
|
|
form = forms.AddPushoverForm(request.GET)
|
|
if not form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
channel = Channel(project=project, kind="po")
|
|
channel.value = form.get_value()
|
|
channel.save()
|
|
channel.assign_all_checks()
|
|
|
|
messages.success(request, "The Pushover integration has been added!")
|
|
return redirect("hc-channels", project.code)
|
|
|
|
# Show Integration Settings form
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": project,
|
|
"po_retry_delay": td(seconds=settings.PUSHOVER_EMERGENCY_RETRY_DELAY),
|
|
"po_expiration": td(seconds=settings.PUSHOVER_EMERGENCY_EXPIRATION),
|
|
}
|
|
return render(request, "integrations/add_pushover.html", ctx)
|
|
|
|
|
|
@require_setting("OPSGENIE_ENABLED")
|
|
@login_required
|
|
def add_opsgenie(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddOpsgenieForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="opsgenie")
|
|
v = {"region": form.cleaned_data["region"], "key": form.cleaned_data["key"]}
|
|
channel.value = json.dumps(v)
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddOpsgenieForm()
|
|
|
|
ctx = {"page": "channels", "project": project, "form": form}
|
|
return render(request, "integrations/add_opsgenie.html", ctx)
|
|
|
|
|
|
@require_setting("VICTOROPS_ENABLED")
|
|
@login_required
|
|
def add_victorops(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="victorops")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddUrlForm()
|
|
|
|
ctx = {"page": "channels", "project": project, "form": form}
|
|
return render(request, "integrations/add_victorops.html", ctx)
|
|
|
|
|
|
@require_setting("ZULIP_ENABLED")
|
|
@login_required
|
|
def add_zulip(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddZulipForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="zulip")
|
|
channel.value = form.get_value()
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddZulipForm()
|
|
|
|
ctx = {"page": "channels", "project": project, "form": form}
|
|
return render(request, "integrations/add_zulip.html", ctx)
|
|
|
|
|
|
class TelegramChat(BaseModel):
|
|
id: int
|
|
type: Literal["group", "private", "supergroup", "channel"]
|
|
title: str | None = None
|
|
username: str | None = None
|
|
|
|
|
|
class TelegramMessage(BaseModel):
|
|
chat: TelegramChat
|
|
text: str
|
|
message_thread_id: int | None = None
|
|
|
|
|
|
class TelegramCallback(BaseModel):
|
|
message: TelegramMessage
|
|
|
|
@classmethod
|
|
def load(self, data: bytes) -> TelegramCallback:
|
|
doc = json.loads(data.decode())
|
|
if "channel_post" in doc:
|
|
# Telegram's "channel_post" key uses the same structure as "message".
|
|
# To keep the validation and view logic simple, if the payload
|
|
# contains "channel_post", copy it to "message", and proceed as usual.
|
|
doc["message"] = doc["channel_post"]
|
|
return TelegramCallback.model_validate(doc, strict=True)
|
|
|
|
|
|
@csrf_exempt
|
|
@require_POST
|
|
def telegram_bot(request: HttpRequest) -> HttpResponse:
|
|
try:
|
|
doc = TelegramCallback.load(request.body)
|
|
except ValidationError:
|
|
# We don't recognize the message format, but don't want Telegram
|
|
# retrying this over and over again, so respond with 200 OK
|
|
return HttpResponse()
|
|
except ValueError:
|
|
return HttpResponseBadRequest()
|
|
|
|
if "/start" not in doc.message.text:
|
|
return HttpResponse()
|
|
|
|
chat = doc.message.chat
|
|
recipient = {
|
|
"id": chat.id,
|
|
"type": chat.type,
|
|
"name": chat.title or chat.username,
|
|
"thread_id": doc.message.message_thread_id,
|
|
}
|
|
|
|
invite = render_to_string(
|
|
"integrations/telegram_invite.html",
|
|
{"qs": signing.dumps(recipient)},
|
|
)
|
|
|
|
try:
|
|
Telegram.send(chat.id, doc.message.message_thread_id, invite)
|
|
except TransportError:
|
|
# Swallow the error and return HTTP 200 OK, otherwise Telegram will
|
|
# hit the webhook again and again.
|
|
pass
|
|
|
|
return HttpResponse()
|
|
|
|
|
|
@require_setting("TELEGRAM_TOKEN")
|
|
def telegram_help(request: HttpRequest) -> HttpResponse:
|
|
ctx = {
|
|
"page": "channels",
|
|
"bot_name": settings.TELEGRAM_BOT_NAME,
|
|
}
|
|
|
|
return render(request, "integrations/add_telegram.html", ctx)
|
|
|
|
|
|
@require_setting("TELEGRAM_TOKEN")
|
|
@login_required
|
|
def add_telegram(request: AuthenticatedHttpRequest) -> HttpResponse:
|
|
recipient = None
|
|
if qs := request.META["QUERY_STRING"]:
|
|
try:
|
|
recipient = signing.loads(qs, max_age=600)
|
|
assert isinstance(recipient, dict)
|
|
except signing.BadSignature:
|
|
return render(request, "bad_link.html")
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddTelegramForm(request.POST)
|
|
if not form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
project = _get_rw_project_for_user(request, form.cleaned_data["project"])
|
|
channel = Channel(project=project, kind="telegram")
|
|
channel.value = json.dumps(recipient)
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The Telegram integration has been added!")
|
|
return redirect("hc-channels", project.code)
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"projects": request.profile.projects(),
|
|
"recipient": recipient,
|
|
"bot_name": settings.TELEGRAM_BOT_NAME,
|
|
}
|
|
|
|
return render(request, "integrations/add_telegram.html", ctx)
|
|
|
|
|
|
@require_setting("TWILIO_AUTH")
|
|
def sms_form(request: HttpRequest, channel: Channel) -> HttpResponse:
|
|
adding = channel._state.adding
|
|
if request.method == "POST":
|
|
form = forms.PhoneUpDownForm(request.POST)
|
|
if form.is_valid():
|
|
channel.name = form.cleaned_data["label"]
|
|
channel.value = form.get_json()
|
|
channel.save()
|
|
|
|
if adding:
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", channel.project.code)
|
|
elif adding:
|
|
form = forms.PhoneUpDownForm(initial={"up": False})
|
|
else:
|
|
form = forms.PhoneUpDownForm(
|
|
{
|
|
"label": channel.name,
|
|
"phone": channel.phone.value,
|
|
"up": channel.phone.notify_up,
|
|
"down": channel.phone.notify_down,
|
|
}
|
|
)
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": channel.project,
|
|
"form": form,
|
|
"profile": channel.project.owner_profile,
|
|
"is_new": adding,
|
|
}
|
|
return render(request, "integrations/sms_form.html", ctx)
|
|
|
|
|
|
@require_setting("TWILIO_AUTH")
|
|
@login_required
|
|
def add_sms(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
channel = Channel(project=project, kind="sms")
|
|
return sms_form(request, channel)
|
|
|
|
|
|
@require_setting("TWILIO_AUTH")
|
|
@login_required
|
|
def add_call(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
if request.method == "POST":
|
|
form = forms.PhoneNumberForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="call")
|
|
channel.name = form.cleaned_data["label"]
|
|
channel.value = form.get_json()
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.PhoneNumberForm()
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": project,
|
|
"form": form,
|
|
"profile": project.owner_profile,
|
|
}
|
|
return render(request, "integrations/add_call.html", ctx)
|
|
|
|
|
|
@require_setting("TWILIO_USE_WHATSAPP")
|
|
def whatsapp_form(request: HttpRequest, channel: Channel) -> HttpResponse:
|
|
adding = channel._state.adding
|
|
if request.method == "POST":
|
|
form = forms.PhoneUpDownForm(request.POST)
|
|
if form.is_valid():
|
|
channel.name = form.cleaned_data["label"]
|
|
channel.value = form.get_json()
|
|
channel.save()
|
|
|
|
if adding:
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", channel.project.code)
|
|
elif adding:
|
|
form = forms.PhoneUpDownForm()
|
|
else:
|
|
form = forms.PhoneUpDownForm(
|
|
{
|
|
"label": channel.name,
|
|
"phone": channel.phone.value,
|
|
"up": channel.phone.notify_up,
|
|
"down": channel.phone.notify_down,
|
|
}
|
|
)
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": channel.project,
|
|
"form": form,
|
|
"profile": channel.project.owner_profile,
|
|
"is_new": adding,
|
|
}
|
|
return render(request, "integrations/whatsapp_form.html", ctx)
|
|
|
|
|
|
@require_setting("TWILIO_USE_WHATSAPP")
|
|
@login_required
|
|
def add_whatsapp(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
channel = Channel(project=project, kind="whatsapp")
|
|
return whatsapp_form(request, channel)
|
|
|
|
|
|
@require_setting("SIGNAL_CLI_SOCKET")
|
|
def signal_form(request: HttpRequest, channel: Channel) -> HttpResponse:
|
|
adding = channel._state.adding
|
|
if request.method == "POST":
|
|
form = forms.PhoneUpDownForm(request.POST)
|
|
if form.is_valid():
|
|
channel.name = form.cleaned_data["label"]
|
|
channel.value = form.get_json()
|
|
channel.save()
|
|
|
|
if adding:
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", channel.project.code)
|
|
elif adding:
|
|
form = forms.PhoneUpDownForm()
|
|
else:
|
|
form = forms.PhoneUpDownForm(
|
|
{
|
|
"label": channel.name,
|
|
"phone": channel.phone.value,
|
|
"up": channel.phone.notify_up,
|
|
"down": channel.phone.notify_down,
|
|
}
|
|
)
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": channel.project,
|
|
"form": form,
|
|
"is_new": adding,
|
|
}
|
|
return render(request, "integrations/signal_form.html", ctx)
|
|
|
|
|
|
@require_setting("SIGNAL_CLI_SOCKET")
|
|
@login_required
|
|
def add_signal(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
channel = Channel(project=project, kind="signal")
|
|
return signal_form(request, channel)
|
|
|
|
|
|
@require_setting("TRELLO_APP_KEY")
|
|
@login_required
|
|
def add_trello(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
if request.method == "POST":
|
|
form = forms.AddTrelloForm(request.POST)
|
|
if not form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
channel = Channel(project=project, kind="trello")
|
|
channel.value = form.get_value()
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
|
|
return_url = settings.SITE_ROOT + reverse("hc-add-trello", args=[project.code])
|
|
authorize_url = "https://trello.com/1/authorize?" + urlencode(
|
|
{
|
|
"expiration": "never",
|
|
"name": settings.SITE_NAME,
|
|
"scope": "read,write",
|
|
"response_type": "token",
|
|
"key": settings.TRELLO_APP_KEY,
|
|
"return_url": return_url,
|
|
}
|
|
)
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": project,
|
|
"authorize_url": authorize_url,
|
|
}
|
|
|
|
return render(request, "integrations/add_trello.html", ctx)
|
|
|
|
|
|
@require_setting("MATRIX_ACCESS_TOKEN")
|
|
@login_required
|
|
def add_matrix(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
if request.method == "POST":
|
|
form = forms.AddMatrixForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="matrix")
|
|
channel.value = form.cleaned_data["room_id"]
|
|
|
|
# If user supplied room alias instead of ID, use it as channel name
|
|
alias = form.cleaned_data["alias"]
|
|
if not alias.startswith("!"):
|
|
channel.name = alias
|
|
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The Matrix integration has been added!")
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddMatrixForm()
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": project,
|
|
"form": form,
|
|
"matrix_user_id": settings.MATRIX_USER_ID,
|
|
}
|
|
return render(request, "integrations/add_matrix.html", ctx)
|
|
|
|
|
|
@require_setting("APPRISE_ENABLED")
|
|
@login_required
|
|
def add_apprise(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddAppriseForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="apprise")
|
|
channel.value = form.cleaned_data["url"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The Apprise integration has been added!")
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddAppriseForm()
|
|
|
|
ctx = {"page": "channels", "project": project, "form": form}
|
|
return render(request, "integrations/add_apprise.html", ctx)
|
|
|
|
|
|
class TrelloList(BaseModel):
|
|
id: str
|
|
name: str
|
|
|
|
|
|
class TrelloBoard(BaseModel):
|
|
id: str
|
|
name: str
|
|
lists: list[TrelloList]
|
|
|
|
|
|
TrelloBoards = TypeAdapter(list[TrelloBoard])
|
|
|
|
|
|
@require_setting("TRELLO_APP_KEY")
|
|
@login_required
|
|
@require_POST
|
|
def trello_settings(request: AuthenticatedHttpRequest) -> HttpResponse:
|
|
token = request.POST.get("token", "")
|
|
|
|
url = "https://api.trello.com/1/members/me/boards"
|
|
assert settings.TRELLO_APP_KEY
|
|
params = {
|
|
"key": settings.TRELLO_APP_KEY,
|
|
"token": token,
|
|
"filter": "open",
|
|
"fields": "id,name",
|
|
"lists": "open",
|
|
"list_fields": "id,name",
|
|
}
|
|
|
|
result = curl.get(url, params)
|
|
try:
|
|
boards = TrelloBoards.validate_json(result.content)
|
|
except ValidationError:
|
|
logger.warning("Unexpected Trello API response: %s", result.content)
|
|
return render(request, "integrations/trello_settings.html", {"error": 1})
|
|
|
|
num_lists = sum(len(board.lists) for board in boards)
|
|
ctx = {"token": token, "boards": boards, "num_lists": num_lists}
|
|
return render(request, "integrations/trello_settings.html", ctx)
|
|
|
|
|
|
@require_setting("MSTEAMS_ENABLED")
|
|
@login_required
|
|
def add_msteams(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="msteamsw")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddUrlForm()
|
|
|
|
ctx = {"page": "channels", "project": project, "form": form}
|
|
return render(request, "integrations/add_msteams.html", ctx)
|
|
|
|
|
|
@require_setting("PROMETHEUS_ENABLED")
|
|
@login_required
|
|
def add_prometheus(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project, rw = _get_project_for_user(request, code)
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": project,
|
|
"site_scheme": urlparse(settings.SITE_ROOT).scheme,
|
|
}
|
|
return render(request, "integrations/add_prometheus.html", ctx)
|
|
|
|
|
|
@require_setting("PROMETHEUS_ENABLED")
|
|
def metrics(request: HttpRequest, code: UUID, key: str) -> HttpResponse:
|
|
if len(key) != 32:
|
|
return HttpResponseBadRequest()
|
|
|
|
q = Project.objects.filter(code=code, api_key_readonly=key)
|
|
try:
|
|
project = q.get()
|
|
except Project.DoesNotExist:
|
|
return HttpResponseForbidden()
|
|
|
|
checks = Check.objects.filter(project_id=project.id).order_by("id")
|
|
|
|
def esc(s: str) -> str:
|
|
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
|
|
|
|
def output(checks: QuerySet[Check]) -> Iterable[str]:
|
|
help = "Whether the check is currently up (1 for yes, 0 for no)."
|
|
yield f"# HELP hc_check_up {help}\n"
|
|
yield "# TYPE hc_check_up gauge\n"
|
|
|
|
TMPL = """hc_check_up{name="%s", tags="%s", unique_key="%s"} %d\n"""
|
|
for check in checks:
|
|
value = 0 if check.get_status() == "down" else 1
|
|
yield TMPL % (esc(check.name), esc(check.tags), check.unique_key, value)
|
|
|
|
yield "\n"
|
|
help = "Whether the check is currently started (1 for yes, 0 for no)."
|
|
yield f"# HELP hc_check_started {help}\n"
|
|
yield "# TYPE hc_check_started gauge\n"
|
|
TMPL = """hc_check_started{name="%s", tags="%s", unique_key="%s"} %d\n"""
|
|
for check in checks:
|
|
value = 1 if check.last_start is not None else 0
|
|
yield TMPL % (esc(check.name), esc(check.tags), check.unique_key, value)
|
|
|
|
all_tags, down_tags, num_down = set(), set(), 0
|
|
for check in checks:
|
|
all_tags.update(check.tags_list())
|
|
if check.get_status() == "down":
|
|
num_down += 1
|
|
down_tags.update(check.tags_list())
|
|
|
|
yield "\n"
|
|
help = "Whether all checks with this tag are up (1 for yes, 0 for no)."
|
|
yield f"# HELP hc_tag_up {help}\n"
|
|
yield "# TYPE hc_tag_up gauge\n"
|
|
TMPL = """hc_tag_up{tag="%s"} %d\n"""
|
|
for tag in sorted(all_tags):
|
|
value = 0 if tag in down_tags else 1
|
|
yield TMPL % (esc(tag), value)
|
|
|
|
yield "\n"
|
|
yield "# HELP hc_checks_total The total number of checks.\n"
|
|
yield "# TYPE hc_checks_total gauge\n"
|
|
yield "hc_checks_total %d\n" % len(checks)
|
|
yield "\n"
|
|
|
|
yield "# HELP hc_checks_down_total The number of checks currently down.\n"
|
|
yield "# TYPE hc_checks_down_total gauge\n"
|
|
yield "hc_checks_down_total %d\n" % num_down
|
|
|
|
return HttpResponse(output(checks), content_type="text/plain")
|
|
|
|
|
|
@require_setting("SPIKE_ENABLED")
|
|
@login_required
|
|
def add_spike(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="spike")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddUrlForm()
|
|
|
|
ctx = {"page": "channels", "project": project, "form": form}
|
|
return render(request, "integrations/add_spike.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_gotify(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddGotifyForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=project, kind="gotify")
|
|
channel.value = form.get_value()
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", project.code)
|
|
else:
|
|
form = forms.AddGotifyForm()
|
|
|
|
ctx = {"page": "channels", "project": project, "form": form}
|
|
return render(request, "integrations/add_gotify.html", ctx)
|
|
|
|
|
|
def group_form(request: HttpRequest, channel: Channel) -> HttpResponse:
|
|
adding = channel._state.adding
|
|
if request.method == "POST":
|
|
form = forms.GroupForm(request.POST, project=channel.project)
|
|
if form.is_valid():
|
|
channel.name = form.cleaned_data["label"]
|
|
channel.value = form.get_value()
|
|
channel.save()
|
|
|
|
if adding:
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", channel.project.code)
|
|
elif adding:
|
|
form = forms.GroupForm(project=channel.project)
|
|
else:
|
|
# Filter out unavailable channels
|
|
channels = list(channel.group_channels.values_list("code", flat=True))
|
|
form = forms.GroupForm(
|
|
{"channels": channels, "label": channel.name}, project=channel.project
|
|
)
|
|
|
|
ctx = {"page": "channels", "project": channel.project, "form": form}
|
|
return render(request, "integrations/group_form.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_group(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
channel = Channel(project=project, kind="group")
|
|
return group_form(request, channel)
|
|
|
|
|
|
def ntfy_form(request: HttpRequest, channel: Channel) -> HttpResponse:
|
|
adding = channel._state.adding
|
|
if request.method == "POST":
|
|
form = forms.NtfyForm(request.POST)
|
|
if form.is_valid():
|
|
channel.value = form.get_value()
|
|
channel.save()
|
|
|
|
if adding:
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels", channel.project.code)
|
|
elif adding:
|
|
form = forms.NtfyForm()
|
|
else:
|
|
form = forms.NtfyForm(
|
|
{
|
|
"topic": channel.ntfy.topic,
|
|
"url": channel.ntfy.url,
|
|
"priority": channel.ntfy.priority,
|
|
"priority_up": channel.ntfy.priority_up,
|
|
"token": channel.ntfy.token,
|
|
}
|
|
)
|
|
|
|
ctx = {"page": "channels", "project": channel.project, "form": form}
|
|
return render(request, "integrations/ntfy_form.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_ntfy(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse:
|
|
project = _get_rw_project_for_user(request, code)
|
|
channel = Channel(project=project, kind="ntfy")
|
|
return ntfy_form(request, channel)
|
|
|
|
|
|
@require_setting("SIGNAL_CLI_SOCKET")
|
|
@login_required
|
|
def signal_captcha(request: AuthenticatedHttpRequest) -> HttpResponse:
|
|
if not request.user.is_superuser:
|
|
return HttpResponseForbidden()
|
|
|
|
ctx = {"challenge": request.GET.get("challenge", "")}
|
|
if request.method == "POST":
|
|
challenge = request.POST.get("challenge", "")
|
|
captcha = request.POST.get("captcha", "")
|
|
if captcha.startswith("signalcaptcha://"):
|
|
captcha = captcha[16:]
|
|
|
|
payload = {
|
|
"jsonrpc": "2.0",
|
|
"method": "submitRateLimitChallenge",
|
|
"params": {"challenge": str(challenge), "captcha": captcha},
|
|
"id": str(uuid.uuid4()),
|
|
}
|
|
|
|
payload_bytes = (json.dumps(payload) + "\n").encode()
|
|
for reply_bytes in Signal._read_replies(payload_bytes):
|
|
try:
|
|
reply = json.loads(reply_bytes.decode())
|
|
except ValueError:
|
|
ctx["result"] = "submitRateLimitChallenge failed"
|
|
break
|
|
|
|
if reply.get("id") == payload["id"]:
|
|
ctx["result"] = reply_bytes.decode()
|
|
break
|
|
|
|
return render(request, "front/signal_captcha.html", ctx)
|
|
|
|
|
|
@require_setting("SIGNAL_CLI_SOCKET")
|
|
@login_required
|
|
@require_POST
|
|
def verify_signal_number(request: AuthenticatedHttpRequest) -> HttpResponse:
|
|
def render_result(result: str | None) -> HttpResponse:
|
|
return render(request, "integrations/signal_result.html", {"result": result})
|
|
|
|
# Enforce per-account rate limit (50 verifications per day)
|
|
if not TokenBucket.authorize_signal_verification(request.user):
|
|
return render_result("Verification rate limit exceeded")
|
|
|
|
form = forms.PhoneNumberForm(request.POST)
|
|
if not form.is_valid():
|
|
return render_result("Invalid phone number")
|
|
|
|
phone = form.cleaned_data["phone"]
|
|
# Enforce per-recipient rate limit (6 messages per minute)
|
|
if not TokenBucket.authorize_signal(phone):
|
|
return render_result("Verification rate limit exceeded")
|
|
|
|
try:
|
|
Signal.send(phone, f"Test message from {settings.SITE_NAME}")
|
|
except TransportError as e:
|
|
return render_result(e.message)
|
|
|
|
# Success!
|
|
return render_result(None)
|
|
|
|
|
|
def log_events(request: HttpRequest, code: UUID) -> HttpResponse:
|
|
if not request.user.is_authenticated:
|
|
return HttpResponseForbidden()
|
|
|
|
check, rw = _get_check_for_user(request, code, preload_owner_profile=True)
|
|
form = forms.LogFiltersForm(request.GET)
|
|
if not form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
if form.cleaned_data["u"]:
|
|
# We are live-loading more events
|
|
start = form.cleaned_data["u"] + td(microseconds=1)
|
|
end = now()
|
|
else:
|
|
# We're applying new filters
|
|
start = check.created
|
|
end = form.cleaned_data["end"] or now()
|
|
|
|
# clamp start to the date of the oldest visible ping
|
|
oldest_ping = check.visible_pings.order_by("n").first()
|
|
if oldest_ping:
|
|
start = max(start, oldest_ping.created)
|
|
|
|
events = _get_events(check, 1000, start=start, end=end, kinds=form.kinds())
|
|
response = render(request, "front/log_rows.html", {"events": events})
|
|
|
|
if events:
|
|
# Include a full precision timestamp of the most recent event in a
|
|
# response header. This will be used client-side for fetching live updates
|
|
# to specify "return any events after *this* point".
|
|
response["X-Last-Event-Timestamp"] = str(events[0].created.timestamp())
|
|
return response
|
|
|
|
|
|
# Forks: add custom views after this line
|