0
0
Fork 0
mirror of https://github.com/healthchecks/healthchecks.git synced 2025-04-07 14:15:34 +00:00
healthchecks_healthchecks/hc/front/views.py
2022-09-09 14:16:17 +03:00

2245 lines
69 KiB
Python

from collections import defaultdict
from datetime import timedelta as td
import email
import json
import os
import re
from secrets import token_urlsafe
import sqlite3
from urllib.parse import urlencode, urlparse
from cron_descriptor import ExpressionDescriptor
from cronsim.cronsim import CronSim, CronSimError
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.core import signing
from django.core.exceptions import PermissionDenied
from django.db.models import Count, F
from django.http import (
Http404,
HttpResponse,
HttpResponseBadRequest,
HttpResponseForbidden,
JsonResponse,
)
from django.shortcuts import get_object_or_404, redirect, render
from django.template.loader import get_template, render_to_string
from django.urls import reverse
from django.utils.timezone import now
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from hc.accounts.models import Project, Member
from hc.api.models import (
DEFAULT_GRACE,
DEFAULT_TIMEOUT,
MAX_DELTA,
Channel,
Check,
Ping,
Notification,
)
from hc.api.transports import Telegram, TransportError
from hc.front.decorators import require_setting
from hc.front import forms
from hc.front.schemas import telegram_callback
from hc.front.templatetags.hc_extras import (
num_down_title,
down_title,
sortchecks,
site_hostname,
)
from hc.lib import curl, jsonschema
from hc.lib.badges import get_badge_url
from hc.lib.tz import all_timezones
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo
VALID_SORT_VALUES = ("name", "-name", "last_ping", "-last_ping", "created")
STATUS_TEXT_TMPL = get_template("front/log_status_text.html")
LAST_PING_TMPL = get_template("front/last_ping_cell.html")
EVENTS_TMPL = get_template("front/details_events.html")
DOWNTIMES_TMPL = get_template("front/details_downtimes.html")
def _tags_statuses(checks):
tags, down, grace, num_down = {}, {}, {}, 0
for check in checks:
status = check.get_status()
if status == "down":
num_down += 1
for tag in check.tags_list():
down[tag] = "down"
elif status == "grace":
for tag in check.tags_list():
grace[tag] = "grace"
else:
for tag in check.tags_list():
tags[tag] = "up"
tags.update(grace)
tags.update(down)
return tags, num_down
def _get_check_for_user(request, code):
"""Return specified check if current user has access to it."""
assert request.user.is_authenticated
check = get_object_or_404(Check.objects.select_related("project"), code=code)
if request.user.is_superuser:
return check, True
if request.user.id == check.project.owner_id:
return check, True
membership = get_object_or_404(Member, project=check.project, user=request.user)
return check, membership.is_rw
def _get_rw_check_for_user(request, code):
check, rw = _get_check_for_user(request, code)
if not rw:
raise PermissionDenied
return check
def _get_channel_for_user(request, code):
"""Return specified channel if current user has access to it."""
assert request.user.is_authenticated
channel = get_object_or_404(Channel.objects.select_related("project"), code=code)
if request.user.is_superuser:
return channel, True
if request.user.id == channel.project.owner_id:
return channel, True
membership = get_object_or_404(Member, project=channel.project, user=request.user)
return channel, membership.is_rw
def _get_rw_channel_for_user(request, code):
channel, rw = _get_channel_for_user(request, code)
if not rw:
raise PermissionDenied
return channel
def _get_project_for_user(request, project_code):
"""Check access, return (project, rw) tuple."""
project = get_object_or_404(Project, code=project_code)
if request.user.is_superuser:
return project, True
if request.user.id == project.owner_id:
return project, True
membership = get_object_or_404(Member, project=project, user=request.user)
return project, membership.is_rw
def _get_rw_project_for_user(request, project_code):
"""Check access, return (project, rw) tuple."""
project, rw = _get_project_for_user(request, project_code)
if not rw:
raise PermissionDenied
return project
def _refresh_last_active_date(profile):
"""Update last_active_date if it is more than a day old."""
if profile.last_active_date is None or (now() - profile.last_active_date).days > 0:
profile.last_active_date = now()
profile.save()
def _get_referer_qs(request):
parsed = urlparse(request.META.get("HTTP_REFERER", ""))
if parsed.query:
return "?" + parsed.query
return ""
@login_required
def my_checks(request, code):
_refresh_last_active_date(request.profile)
project, rw = _get_project_for_user(request, code)
if request.GET.get("sort") in VALID_SORT_VALUES:
request.profile.sort = request.GET["sort"]
request.profile.save()
if request.GET.get("urls") in ("uuid", "slug") and rw:
project.show_slugs = request.GET["urls"] == "slug"
project.save()
if request.session.get("last_project_id") != project.id:
request.session["last_project_id"] = project.id
q = Check.objects.filter(project=project)
q = q.select_related("project")
checks = list(q.prefetch_related("channel_set"))
sortchecks(checks, request.profile.sort)
tags_statuses, num_down = _tags_statuses(checks)
pairs = list(tags_statuses.items())
pairs.sort(key=lambda pair: pair[0].lower())
channels = Channel.objects.filter(project=project)
channels = list(channels.order_by("created"))
hidden_checks = set()
# Hide checks that don't match selected tags:
selected_tags = set(request.GET.getlist("tag", []))
if selected_tags:
for check in checks:
if not selected_tags.issubset(check.tags_list()):
hidden_checks.add(check)
# Hide checks that don't match the search string:
search = request.GET.get("search", "")
if search:
for check in checks:
search_key = "%s\n%s" % (check.name.lower(), check.code)
if search not in search_key:
hidden_checks.add(check)
# Figure out which checks have ambiguous ping URLs
seen, ambiguous = set(), set()
if project.show_slugs:
for check in checks:
if check.slug and check.slug in seen:
ambiguous.add(check.slug)
else:
seen.add(check.slug)
# Do we need to show the "Last Duration" header?
show_last_duration = False
for check in checks:
if check.clamped_last_duration():
show_last_duration = True
break
ctx = {
"page": "checks",
"rw": rw,
"checks": checks,
"channels": channels,
"num_down": num_down,
"tags": pairs,
"ping_endpoint": settings.PING_ENDPOINT,
"timezones": all_timezones,
"project": project,
"num_available": project.num_checks_available(),
"sort": request.profile.sort,
"selected_tags": selected_tags,
"search": search,
"hidden_checks": hidden_checks,
"ambiguous": ambiguous,
"show_last_duration": show_last_duration,
}
return render(request, "front/my_checks.html", ctx)
@login_required
def status(request, code):
_get_project_for_user(request, code)
checks = list(Check.objects.filter(project__code=code))
details = []
for check in checks:
ctx = {"check": check}
details.append(
{
"code": str(check.code),
"status": check.get_status(),
"last_ping": LAST_PING_TMPL.render(ctx).strip(),
"started": check.last_start is not None,
}
)
tags_statuses, num_down = _tags_statuses(checks)
return JsonResponse(
{"details": details, "tags": tags_statuses, "title": num_down_title(num_down)}
)
@login_required
@require_POST
def switch_channel(request, code, channel_code):
check = _get_rw_check_for_user(request, code)
channel = get_object_or_404(Channel, code=channel_code)
if channel.project_id != check.project_id:
return HttpResponseBadRequest()
if request.POST.get("state") == "on":
channel.checks.add(check)
else:
channel.checks.remove(check)
return HttpResponse()
def _get_project_summary(profile):
statuses = defaultdict(lambda: {"status": "up", "started": False})
q = profile.checks_from_all_projects()
q = q.annotate(project_code=F("project__code"))
for check in q:
summary = statuses[check.project_code]
if check.last_start:
summary["started"] = True
if summary["status"] != "down":
status = check.get_status()
if status == "down" or (status == "grace" and summary["status"] == "up"):
summary["status"] = status
return statuses
def index(request):
if not request.user.is_authenticated:
return redirect("hc-login")
summary = _get_project_summary(request.profile)
if "refresh" in request.GET:
return JsonResponse({str(k): v for k, v in summary.items()})
q = request.profile.projects()
q = q.annotate(n_checks=Count("check", distinct=True))
q = q.annotate(n_channels=Count("channel", distinct=True))
q = q.annotate(owner_email=F("owner__email"))
projects = list(q)
for project in projects:
project.overall_status = summary[project.code]["status"]
project.any_started = summary[project.code]["started"]
# Primary sort key: projects with overall_status=down go first
# Secondary sort key: project's name
projects.sort(key=lambda p: (p.overall_status != "down", p.name))
ctx = {
"page": "projects",
"projects": projects,
"last_project_id": request.session.get("last_project_id"),
}
return render(request, "front/projects.html", ctx)
def dashboard(request):
return render(request, "front/dashboard.html", {})
def _replace_placeholders(doc, html):
if doc.startswith("self_hosted"):
return html
replaces = {
"{{ default_timeout }}": str(int(DEFAULT_TIMEOUT.total_seconds())),
"{{ default_grace }}": str(int(DEFAULT_GRACE.total_seconds())),
"SITE_NAME": settings.SITE_NAME,
"SITE_ROOT": settings.SITE_ROOT,
"SITE_HOSTNAME": site_hostname(),
"SITE_SCHEME": urlparse(settings.SITE_ROOT).scheme,
"PING_ENDPOINT": settings.PING_ENDPOINT,
"PING_URL": settings.PING_ENDPOINT + "your-uuid-here",
"PING_BODY_LIMIT": str(settings.PING_BODY_LIMIT or 100),
"IMG_URL": os.path.join(settings.STATIC_URL, "img/docs"),
}
for placeholder, value in replaces.items():
html = html.replace(placeholder, value)
return html
def serve_doc(request, doc="introduction"):
# Filenames in /templates/docs/ consist of lowercase letters and underscores,
# -- make sure we don't accept anything else
if not re.match(r"^[a-z_]+$", doc):
raise Http404("not found")
path = os.path.join(settings.BASE_DIR, "templates/docs", doc + ".html")
if not os.path.exists(path):
raise Http404("not found")
with open(path, "r", encoding="utf-8") as f:
content = f.read()
content = _replace_placeholders(doc, content)
ctx = {
"page": "docs",
"section": doc,
"content": content,
"first_line": content.split("\n")[0],
}
return render(request, "front/docs_single.html", ctx)
@csrf_exempt
def docs_search(request):
form = forms.SearchForm(request.GET)
if not form.is_valid():
ctx = {"results": []}
return render(request, "front/docs_search.html", ctx)
query = """
SELECT slug, title, snippet(docs, 2, '<span>', '</span>', '&hellip;', 10)
FROM docs
WHERE docs MATCH ?
ORDER BY bm25(docs, 2.0, 10.0, 1.0)
LIMIT 8
"""
q = form.cleaned_data["q"]
con = sqlite3.connect(os.path.join(settings.BASE_DIR, "search.db"))
cur = con.cursor()
res = cur.execute(query, (q,))
ctx = {"results": res.fetchall()}
return render(request, "front/docs_search.html", ctx)
def docs_cron(request):
return render(request, "front/docs_cron.html", {})
@require_POST
@login_required
def add_check(request, code):
project = _get_rw_project_for_user(request, code)
if project.num_checks_available() <= 0:
return HttpResponseBadRequest()
form = forms.AddCheckForm(request.POST)
if not form.is_valid():
return HttpResponseBadRequest()
check = Check(project=project)
check.set_name_slug(form.cleaned_data["name"])
check.tags = form.cleaned_data["tags"]
check.kind = form.cleaned_data["kind"]
check.timeout = form.cleaned_data["timeout"]
check.schedule = form.cleaned_data["schedule"]
check.tz = form.cleaned_data["tz"]
check.grace = form.cleaned_data["grace"]
check.save()
check.assign_all_channels()
url = reverse("hc-checks", args=[project.code])
url += _get_referer_qs(request) # Preserve selected tags and search
return redirect(url)
@require_POST
@login_required
def update_name(request, code):
check = _get_rw_check_for_user(request, code)
form = forms.NameTagsForm(request.POST)
if form.is_valid():
check.set_name_slug(form.cleaned_data["name"])
check.tags = form.cleaned_data["tags"]
check.desc = form.cleaned_data["desc"]
check.save()
if "/details/" in request.META.get("HTTP_REFERER", ""):
return redirect("hc-details", code)
url = reverse("hc-checks", args=[check.project.code])
url += _get_referer_qs(request) # Preserve selected tags and search
return redirect(url)
@require_POST
@login_required
def filtering_rules(request, code):
check = _get_rw_check_for_user(request, code)
form = forms.FilteringRulesForm(request.POST)
if form.is_valid():
check.filter_subject = form.cleaned_data["filter_subject"]
check.filter_body = form.cleaned_data["filter_body"]
check.success_kw = form.cleaned_data["success_kw"]
check.failure_kw = form.cleaned_data["failure_kw"]
check.methods = form.cleaned_data["methods"]
check.manual_resume = form.cleaned_data["manual_resume"]
check.save()
return redirect("hc-details", code)
@require_POST
@login_required
def update_timeout(request, code):
check = _get_rw_check_for_user(request, code)
kind = request.POST.get("kind")
if kind == "simple":
form = forms.TimeoutForm(request.POST)
if not form.is_valid():
return HttpResponseBadRequest()
check.kind = "simple"
check.timeout = form.cleaned_data["timeout"]
check.grace = form.cleaned_data["grace"]
elif kind == "cron":
form = forms.CronForm(request.POST)
if not form.is_valid():
return HttpResponseBadRequest()
check.kind = "cron"
check.schedule = form.cleaned_data["schedule"]
check.tz = form.cleaned_data["tz"]
check.grace = td(minutes=form.cleaned_data["grace"])
check.alert_after = check.going_down_after()
if check.status == "up" and check.alert_after < now():
# Checks can flip from "up" to "down" state as a result of changing check's
# schedule. We don't want to send notifications when changing schedule
# interactively in the web UI. So we update the `alert_after` and `status`
# fields, and create a Flip object here the same way as `sendalerts` would do,
# but without sending an actual alert.
#
# We need to create the Flip object because otherwise the calculation
# in Check.downtimes() will come out wrong (when this check later comes up,
# we will have no record of when it went down).
check.create_flip("down", mark_as_processed=True)
check.alert_after = None
check.status = "down"
check.save()
if "/details/" in request.META.get("HTTP_REFERER", ""):
return redirect("hc-details", code)
url = reverse("hc-checks", args=[check.project.code])
url += _get_referer_qs(request) # Preserve selected tags and search
return redirect(url)
@require_POST
def cron_preview(request):
schedule = request.POST.get("schedule", "")
tz = request.POST.get("tz")
ctx = {"tz": tz, "dates": []}
if tz not in all_timezones:
ctx["bad_tz"] = True
return render(request, "front/cron_preview.html", ctx)
now_local = now().astimezone(ZoneInfo(tz))
try:
it = CronSim(schedule, now_local)
for i in range(0, 6):
ctx["dates"].append(next(it))
except CronSimError:
ctx["bad_schedule"] = True
if ctx["dates"]:
try:
descriptor = ExpressionDescriptor(schedule, use_24hour_time_format=True)
ctx["desc"] = descriptor.get_description()
except:
# We assume the schedule is valid if cronsim accepts it.
# If cron-descriptor throws an exception, don't show the description
# to the user.
pass
return render(request, "front/cron_preview.html", ctx)
def validate_schedule(request):
schedule = request.GET.get("schedule", "")
result = True
try:
CronSim(schedule, now())
except CronSimError:
result = False
return JsonResponse({"result": result})
@login_required
def ping_details(request, code, n=None):
check, rw = _get_check_for_user(request, code)
q = Ping.objects.filter(owner=check)
if n:
q = q.filter(n=n)
try:
ping = q.latest("created")
except Ping.DoesNotExist:
return render(request, "front/ping_details_not_found.html")
body = ping.get_body()
ctx = {"check": check, "ping": ping, "plain": None, "html": None, "body": body}
if ping.scheme == "email":
parsed = email.message_from_string(body, policy=email.policy.SMTP)
ctx["subject"] = parsed.get("subject", "")
plain_mime_part = parsed.get_body(("plain",))
if plain_mime_part:
ctx["plain"] = plain_mime_part.get_content()
html_mime_part = parsed.get_body(("html",))
if html_mime_part:
ctx["html"] = html_mime_part.get_content()
return render(request, "front/ping_details.html", ctx)
@login_required
def ping_body(request, code, n):
check, rw = _get_check_for_user(request, code)
ping = get_object_or_404(Ping, owner=check, n=n)
body = ping.get_body()
if not body:
raise Http404("not found")
response = HttpResponse(body, content_type="application/octet-stream")
filename = "%s-%s" % (check.code, ping.n)
response["Content-Disposition"] = 'attachment; filename="%s"' % filename
return response
@require_POST
@login_required
def pause(request, code):
check = _get_rw_check_for_user(request, code)
# Track the status change for correct downtime calculation in Check.downtimes()
check.create_flip("paused", mark_as_processed=True)
check.status = "paused"
check.last_start = None
check.alert_after = None
check.save()
# After pausing a check we must check if all checks are up,
# and Profile.next_nag_date needs to be cleared out:
check.project.update_next_nag_dates()
# Don't redirect after an AJAX request:
if request.META.get("HTTP_X_REQUESTED_WITH") == "XMLHttpRequest":
return HttpResponse()
return redirect("hc-details", code)
@require_POST
@login_required
def resume(request, code):
check = _get_rw_check_for_user(request, code)
if check.status != "paused":
return HttpResponseBadRequest()
check.create_flip("new", mark_as_processed=True)
check.status = "new"
check.last_start = None
check.last_ping = None
check.alert_after = None
check.save()
return redirect("hc-details", code)
@require_POST
@login_required
def remove_check(request, code):
check = _get_rw_check_for_user(request, code)
project = check.project
check.delete()
return redirect("hc-checks", project.code)
def _get_min(check):
min_n = check.n_pings - check.project.owner_profile.ping_log_limit
ping = check.ping_set.filter(n__gt=min_n).first()
dt = ping.created if ping else now()
return (dt - td(hours=24)).replace(minute=0, second=0)
def _get_total(check, start, end):
ping_log_limit = check.project.owner_profile.ping_log_limit
pings = check.ping_set.filter(created__gte=start, created__lte=end)
return min(pings.count(), ping_log_limit)
def _get_events(check, page_limit, start=None, end=None):
min_n = check.n_pings - check.project.owner_profile.ping_log_limit
pings = check.ping_set.filter(n__gt=min_n).order_by("-id")
if start and end:
pings = pings.filter(created__gte=start, created__lte=end)
pings = list(pings[:page_limit])
last_start = None
for ping in reversed(pings):
if ping.kind == "start":
last_start = ping.created
elif ping.kind in (None, "", "fail") and last_start:
delta = ping.created - last_start
last_start = None
if delta < MAX_DELTA:
setattr(ping, "delta", delta)
alerts = Notification.objects.select_related("channel")
alerts = alerts.filter(owner=check, check_status="down")
if start and end:
alerts = alerts.filter(created__gte=start, created__lte=end)
elif len(pings):
cutoff = pings[-1].created
alerts = alerts.filter(created__gt=cutoff)
else:
alerts = []
events = pings + list(alerts)
events.sort(key=lambda el: el.created, reverse=True)
return events
@login_required
def log(request, code):
check, rw = _get_check_for_user(request, code)
form = forms.SeekForm(request.GET)
smin, smax = _get_min(check), now()
if form.is_valid():
start = form.cleaned_data["start"]
end = form.cleaned_data["end"]
else:
start = smin
end = smax
ctx = {
"page": "log",
"project": check.project,
"check": check,
"min": smin,
"max": smax,
"start": start,
"end": end,
"events": _get_events(check, 1000, start=start, end=end),
"num_total": _get_total(check, start, end),
}
return render(request, "front/log.html", ctx)
@login_required
def details(request, code):
_refresh_last_active_date(request.profile)
check, rw = _get_check_for_user(request, code)
if request.GET.get("urls") in ("uuid", "slug") and rw:
check.project.show_slugs = request.GET["urls"] == "slug"
check.project.save()
channels = Channel.objects.filter(project=check.project)
channels = list(channels.order_by("created"))
all_tags = set()
q = Check.objects.filter(project=check.project).exclude(tags="")
for tags in q.values_list("tags", flat=True):
all_tags.update(tags.split(" "))
ctx = {
"page": "details",
"project": check.project,
"check": check,
"rw": rw,
"channels": channels,
"enabled_channels": list(check.channel_set.all()),
"timezones": all_timezones,
"downtimes": check.downtimes(months=3),
"is_copied": "copied" in request.GET,
"all_tags": " ".join(sorted(all_tags)),
}
return render(request, "front/details.html", ctx)
@login_required
def uncloak(request, unique_key):
for check in request.profile.checks_from_all_projects().only("code"):
if check.unique_key == unique_key:
return redirect("hc-details", check.code)
raise Http404("not found")
@login_required
def transfer(request, code):
check = _get_rw_check_for_user(request, code)
if request.method == "POST":
target_project = _get_rw_project_for_user(request, request.POST["project"])
if target_project.num_checks_available() <= 0:
return HttpResponseBadRequest()
check.project = target_project
check.save()
check.assign_all_channels()
messages.success(request, "Check transferred successfully!")
return redirect("hc-details", code)
ctx = {"check": check}
return render(request, "front/transfer_modal.html", ctx)
@require_POST
@login_required
def copy(request, code):
check = _get_rw_check_for_user(request, code)
if check.project.num_checks_available() <= 0:
return HttpResponseBadRequest()
new_name = check.name + " (copy)"
# Make sure we don't exceed the 100 character db field limit:
if len(new_name) > 100:
new_name = check.name[:90] + "... (copy)"
copied = Check(project=check.project)
copied.set_name_slug(new_name)
copied.desc, copied.tags = check.desc, check.tags
copied.filter_subject = check.filter_subject
copied.filter_body = check.filter_body
copied.success_kw = check.success_kw
copied.failure_kw = check.failure_kw
copied.methods = check.methods
copied.manual_resume = check.manual_resume
copied.kind = check.kind
copied.timeout, copied.grace = check.timeout, check.grace
copied.schedule, copied.tz = check.schedule, check.tz
copied.save()
copied.channel_set.add(*check.channel_set.all())
url = reverse("hc-details", args=[copied.code])
return redirect(url + "?copied")
@login_required
def status_single(request, code):
check, rw = _get_check_for_user(request, code)
status = check.get_status()
events = _get_events(check, 20)
updated = "1"
if len(events):
updated = str(events[0].created.timestamp())
doc = {
"status": status,
"status_text": STATUS_TEXT_TMPL.render({"check": check, "rw": rw}),
"title": down_title(check),
"updated": updated,
"started": check.last_start is not None,
}
if updated != request.GET.get("u"):
doc["events"] = EVENTS_TMPL.render({"check": check, "events": events})
doc["downtimes"] = DOWNTIMES_TMPL.render({"downtimes": check.downtimes(3)})
return JsonResponse(doc)
@login_required
def badges(request, code):
project, rw = _get_project_for_user(request, code)
tags = set()
for check in Check.objects.filter(project=project):
tags.update(check.tags_list())
sorted_tags = sorted(tags, key=lambda s: s.lower())
sorted_tags.append("*") # For the "overall status" badge
key = project.badge_key
urls = []
for tag in sorted_tags:
urls.append(
{
"tag": tag,
"svg": get_badge_url(key, tag),
"svg3": get_badge_url(key, tag, with_late=True),
"json": get_badge_url(key, tag, fmt="json"),
"json3": get_badge_url(key, tag, fmt="json", with_late=True),
"shields": get_badge_url(key, tag, fmt="shields"),
"shields3": get_badge_url(key, tag, fmt="shields", with_late=True),
}
)
ctx = {
"have_tags": len(urls) > 1,
"page": "badges",
"project": project,
"badges": urls,
}
return render(request, "front/badges.html", ctx)
@login_required
def channels(request, code):
project, rw = _get_project_for_user(request, code)
if request.method == "POST":
if not rw:
return HttpResponseForbidden()
code = request.POST["channel"]
try:
channel = Channel.objects.get(code=code)
except Channel.DoesNotExist:
return HttpResponseBadRequest()
if channel.project_id != project.id:
return HttpResponseForbidden()
new_checks = []
for key in request.POST:
if key.startswith("check-"):
code = key[6:]
try:
check = Check.objects.get(code=code)
except Check.DoesNotExist:
return HttpResponseBadRequest()
if check.project_id != project.id:
return HttpResponseForbidden()
new_checks.append(check)
channel.checks.set(new_checks)
return redirect("hc-channels", project.code)
channels = Channel.objects.filter(project=project)
channels = channels.order_by("created")
channels = channels.annotate(n_checks=Count("checks"))
ctx = {
"page": "channels",
"rw": rw,
"project": project,
"profile": project.owner_profile,
"channels": channels,
"enable_apprise": settings.APPRISE_ENABLED is True,
"enable_call": bool(settings.TWILIO_AUTH),
"enable_discord": bool(settings.DISCORD_CLIENT_ID),
"enable_linenotify": bool(settings.LINENOTIFY_CLIENT_ID),
"enable_matrix": bool(settings.MATRIX_ACCESS_TOKEN),
"enable_mattermost": settings.MATTERMOST_ENABLED is True,
"enable_msteams": settings.MSTEAMS_ENABLED is True,
"enable_opsgenie": settings.OPSGENIE_ENABLED is True,
"enable_pagertree": settings.PAGERTREE_ENABLED is True,
"enable_pd": settings.PD_ENABLED is True,
"enable_prometheus": settings.PROMETHEUS_ENABLED is True,
"enable_pushbullet": bool(settings.PUSHBULLET_CLIENT_ID),
"enable_pushover": bool(settings.PUSHOVER_API_TOKEN),
"enable_shell": settings.SHELL_ENABLED is True,
"enable_signal": bool(settings.SIGNAL_CLI_SOCKET),
"enable_slack": settings.SLACK_ENABLED is True,
"enable_slack_btn": bool(settings.SLACK_CLIENT_ID),
"enable_sms": bool(settings.TWILIO_AUTH),
"enable_spike": settings.SPIKE_ENABLED is True,
"enable_telegram": bool(settings.TELEGRAM_TOKEN),
"enable_trello": bool(settings.TRELLO_APP_KEY),
"enable_victorops": settings.VICTOROPS_ENABLED is True,
"enable_webhooks": settings.WEBHOOKS_ENABLED is True,
"enable_whatsapp": settings.TWILIO_USE_WHATSAPP,
"enable_zulip": settings.ZULIP_ENABLED is True,
"use_payments": settings.USE_PAYMENTS,
}
return render(request, "front/channels.html", ctx)
@login_required
def channel_checks(request, code):
channel = _get_rw_channel_for_user(request, code)
assigned = set(channel.checks.values_list("code", flat=True).distinct())
checks = Check.objects.filter(project=channel.project).order_by("created")
ctx = {"checks": checks, "assigned": assigned, "channel": channel}
return render(request, "front/channel_checks.html", ctx)
@require_POST
@login_required
def update_channel_name(request, code):
channel = _get_rw_channel_for_user(request, code)
form = forms.ChannelNameForm(request.POST)
if form.is_valid():
channel.name = form.cleaned_data["name"]
channel.save()
return redirect("hc-channels", channel.project.code)
def verify_email(request, code, token):
channel = get_object_or_404(Channel, code=code)
if channel.make_token() == token:
channel.email_verified = True
channel.save()
return render(request, "front/verify_email_success.html")
return render(request, "bad_link.html")
@csrf_exempt
def unsubscribe_email(request, code, signed_token):
ctx = {}
# Some email servers open links in emails to check for malicious content.
# To work around this, on GET requests we serve a confirmation form.
# If the signature is at least 5 minutes old, we also include JS code to
# auto-submit the form.
signer = signing.TimestampSigner(salt="alerts")
# First, check the signature without looking at the timestamp:
try:
token = signer.unsign(signed_token)
except signing.BadSignature:
return render(request, "bad_link.html")
# Then, check if timestamp is older than 5 minutes:
try:
signer.unsign(signed_token, max_age=300)
except signing.SignatureExpired:
ctx["autosubmit"] = True
channel = get_object_or_404(Channel, code=code, kind="email")
if channel.make_token() != token:
return render(request, "bad_link.html")
if request.method != "POST":
return render(request, "accounts/unsubscribe_submit.html", ctx)
channel.delete()
return render(request, "front/unsubscribe_success.html")
@require_POST
@login_required
def send_test_notification(request, code):
channel, rw = _get_channel_for_user(request, code)
dummy = Check(name="TEST", status="down", project=channel.project)
dummy.last_ping = now() - td(days=1)
dummy.n_pings = 42
# Delete all older test notifications for this channel
Notification.objects.filter(channel=channel, owner=None).delete()
# Send the test notification
error = channel.notify(dummy, is_test=True)
if error == "no-op":
# This channel may be configured to send "up" notifications only.
dummy.status = "up"
error = channel.notify(dummy, is_test=True)
if error:
messages.warning(request, "Could not send a test notification. %s." % error)
else:
messages.success(request, "Test notification sent!")
return redirect("hc-channels", channel.project.code)
@require_POST
@login_required
def remove_channel(request, code):
channel = _get_rw_channel_for_user(request, code)
project = channel.project
channel.delete()
return redirect("hc-channels", project.code)
@login_required
def email_form(request, channel=None, code=None):
"""Add email integration or edit an existing email integration."""
is_new = channel is None
if is_new:
project = _get_rw_project_for_user(request, code)
channel = Channel(project=project, kind="email")
if request.method == "POST":
form = forms.EmailForm(request.POST)
if form.is_valid():
if channel.disabled or form.cleaned_data["value"] != channel.email_value:
channel.disabled = False
if not settings.EMAIL_USE_VERIFICATION:
# In self-hosted setting, administator can set
# EMAIL_USE_VERIFICATION=False to disable email verification
channel.email_verified = True
elif form.cleaned_data["value"] == request.user.email:
# If the user is adding *their own* address
# we skip the verification step
channel.email_verified = True
else:
channel.email_verified = False
channel.value = form.get_value()
channel.save()
if is_new:
channel.assign_all_checks()
if not channel.email_verified:
channel.send_verify_link()
return redirect("hc-channels", channel.project.code)
elif is_new:
form = forms.EmailForm()
else:
form = forms.EmailForm(
{
"value": channel.email_value,
"up": channel.email_notify_up,
"down": channel.email_notify_down,
}
)
ctx = {
"page": "channels",
"project": channel.project,
"use_verification": settings.EMAIL_USE_VERIFICATION,
"form": form,
"is_new": is_new,
}
return render(request, "integrations/email_form.html", ctx)
@login_required
def edit_channel(request, code):
channel = _get_rw_channel_for_user(request, code)
if channel.kind == "email":
return email_form(request, channel=channel)
if channel.kind == "webhook":
return webhook_form(request, channel=channel)
if channel.kind == "sms":
return sms_form(request, channel=channel)
if channel.kind == "signal":
return signal_form(request, channel=channel)
if channel.kind == "whatsapp":
return whatsapp_form(request, channel=channel)
return HttpResponseBadRequest()
@require_setting("WEBHOOKS_ENABLED")
@login_required
def webhook_form(request, channel=None, code=None):
is_new = channel is None
if is_new:
project = _get_rw_project_for_user(request, code)
channel = Channel(project=project, kind="webhook")
if request.method == "POST":
form = forms.WebhookForm(request.POST)
if form.is_valid():
channel.name = form.cleaned_data["name"]
channel.value = form.get_value()
channel.save()
if is_new:
channel.assign_all_checks()
return redirect("hc-channels", channel.project.code)
elif is_new:
form = forms.WebhookForm()
else:
def flatten(d):
return "\n".join("%s: %s" % pair for pair in d.items())
doc = json.loads(channel.value)
doc["headers_down"] = flatten(doc["headers_down"])
doc["headers_up"] = flatten(doc["headers_up"])
doc["name"] = channel.name
form = forms.WebhookForm(doc)
ctx = {
"page": "channels",
"project": channel.project,
"form": form,
"is_new": is_new,
}
return render(request, "integrations/webhook_form.html", ctx)
@require_setting("SHELL_ENABLED")
@login_required
def add_shell(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddShellForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="shell")
channel.value = form.get_value()
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddShellForm()
ctx = {
"page": "channels",
"project": project,
"form": form,
}
return render(request, "integrations/add_shell.html", ctx)
@require_setting("PD_ENABLED")
@login_required
def add_pd(request, code):
project = _get_rw_project_for_user(request, code)
# Simple Install Flow
if settings.PD_APP_ID:
state = token_urlsafe()
redirect_url = settings.SITE_ROOT + reverse("hc-add-pd-complete")
redirect_url += "?" + urlencode({"state": state})
install_url = "https://app.pagerduty.com/install/integration?" + urlencode(
{"app_id": settings.PD_APP_ID, "redirect_url": redirect_url, "version": "2"}
)
ctx = {"page": "channels", "project": project, "install_url": install_url}
request.session["pagerduty"] = (state, str(project.code))
return render(request, "integrations/add_pd_simple.html", ctx)
if request.method == "POST":
form = forms.AddPdForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="pd")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddPdForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_pd.html", ctx)
@require_setting("PD_ENABLED")
@require_setting("PD_APP_ID")
@login_required
def add_pd_complete(request):
if "pagerduty" not in request.session:
return HttpResponseBadRequest()
state, code = request.session.pop("pagerduty")
if request.GET.get("state") != state:
return HttpResponseForbidden()
project = _get_rw_project_for_user(request, code)
doc = json.loads(request.GET["config"])
for item in doc["integration_keys"]:
channel = Channel(kind="pd", project=project)
channel.name = item["name"]
channel.value = json.dumps(
{"service_key": item["integration_key"], "account": doc["account"]["name"]}
)
channel.save()
channel.assign_all_checks()
messages.success(request, "The PagerDuty integration has been added!")
return redirect("hc-channels", project.code)
@require_setting("PD_ENABLED")
@require_setting("PD_APP_ID")
def pd_help(request):
ctx = {"page": "channels"}
return render(request, "integrations/add_pd_simple.html", ctx)
@require_setting("PAGERTREE_ENABLED")
@login_required
def add_pagertree(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="pagertree")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_pagertree.html", ctx)
@require_setting("SLACK_ENABLED")
@login_required
def add_slack(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="slack")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {
"page": "channels",
"form": form,
}
return render(request, "integrations/add_slack.html", ctx)
@require_setting("SLACK_ENABLED")
@require_setting("SLACK_CLIENT_ID")
def slack_help(request):
ctx = {"page": "channels"}
return render(request, "integrations/add_slack_btn.html", ctx)
@require_setting("SLACK_ENABLED")
@require_setting("SLACK_CLIENT_ID")
@login_required
def add_slack_btn(request, code):
project = _get_rw_project_for_user(request, code)
state = token_urlsafe()
authorize_url = "https://slack.com/oauth/v2/authorize?" + urlencode(
{
"scope": "incoming-webhook",
"client_id": settings.SLACK_CLIENT_ID,
"state": state,
}
)
ctx = {
"project": project,
"page": "channels",
"authorize_url": authorize_url,
}
request.session["add_slack"] = (state, str(project.code))
return render(request, "integrations/add_slack_btn.html", ctx)
@require_setting("SLACK_ENABLED")
@require_setting("SLACK_CLIENT_ID")
@login_required
def add_slack_complete(request):
if "add_slack" not in request.session:
return HttpResponseForbidden()
state, code = request.session.pop("add_slack")
project = _get_rw_project_for_user(request, code)
if request.GET.get("error") == "access_denied":
messages.warning(request, "Slack setup was cancelled.")
return redirect("hc-channels", project.code)
if request.GET.get("state") != state:
return HttpResponseForbidden()
data = {
"client_id": settings.SLACK_CLIENT_ID,
"client_secret": settings.SLACK_CLIENT_SECRET,
"code": request.GET.get("code"),
}
result = curl.post("https://slack.com/api/oauth.v2.access", data)
doc = result.json()
if doc.get("ok"):
channel = Channel(kind="slack", project=project)
channel.value = result.text
channel.save()
channel.assign_all_checks()
messages.success(request, "The Slack integration has been added!")
else:
s = doc.get("error")
messages.warning(request, "Error message from slack: %s" % s)
return redirect("hc-channels", project.code)
@require_setting("MATTERMOST_ENABLED")
@login_required
def add_mattermost(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="mattermost")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {"page": "channels", "form": form, "project": project}
return render(request, "integrations/add_mattermost.html", ctx)
@require_setting("PUSHBULLET_CLIENT_ID")
@login_required
def add_pushbullet(request, code):
project = _get_rw_project_for_user(request, code)
state = token_urlsafe()
authorize_url = "https://www.pushbullet.com/authorize?" + urlencode(
{
"client_id": settings.PUSHBULLET_CLIENT_ID,
"redirect_uri": settings.SITE_ROOT + reverse(add_pushbullet_complete),
"response_type": "code",
"state": state,
}
)
ctx = {
"page": "channels",
"project": project,
"authorize_url": authorize_url,
}
request.session["add_pushbullet"] = (state, str(project.code))
return render(request, "integrations/add_pushbullet.html", ctx)
@require_setting("PUSHBULLET_CLIENT_ID")
@login_required
def add_pushbullet_complete(request):
if "add_pushbullet" not in request.session:
return HttpResponseForbidden()
state, code = request.session.pop("add_pushbullet")
project = _get_rw_project_for_user(request, code)
if request.GET.get("error") == "access_denied":
messages.warning(request, "Pushbullet setup was cancelled.")
return redirect("hc-channels", project.code)
if request.GET.get("state") != state:
return HttpResponseForbidden()
data = {
"client_id": settings.PUSHBULLET_CLIENT_ID,
"client_secret": settings.PUSHBULLET_CLIENT_SECRET,
"code": request.GET.get("code"),
"grant_type": "authorization_code",
}
result = curl.post("https://api.pushbullet.com/oauth2/token", data)
doc = result.json()
if "access_token" in doc:
channel = Channel(kind="pushbullet", project=project)
channel.value = doc["access_token"]
channel.save()
channel.assign_all_checks()
messages.success(request, "The Pushbullet integration has been added!")
else:
messages.warning(request, "Something went wrong")
return redirect("hc-channels", project.code)
@require_setting("DISCORD_CLIENT_ID")
@login_required
def add_discord(request, code):
project = _get_rw_project_for_user(request, code)
state = token_urlsafe()
auth_url = "https://discordapp.com/api/oauth2/authorize?" + urlencode(
{
"client_id": settings.DISCORD_CLIENT_ID,
"scope": "webhook.incoming",
"redirect_uri": settings.SITE_ROOT + reverse(add_discord_complete),
"response_type": "code",
"state": state,
}
)
ctx = {"page": "channels", "project": project, "authorize_url": auth_url}
request.session["add_discord"] = (state, str(project.code))
return render(request, "integrations/add_discord.html", ctx)
@require_setting("DISCORD_CLIENT_ID")
@login_required
def add_discord_complete(request):
if "add_discord" not in request.session:
return HttpResponseForbidden()
state, code = request.session.pop("add_discord")
project = _get_rw_project_for_user(request, code)
if request.GET.get("error") == "access_denied":
messages.warning(request, "Discord setup was cancelled.")
return redirect("hc-channels", project.code)
if request.GET.get("state") != state:
return HttpResponseForbidden()
data = {
"client_id": settings.DISCORD_CLIENT_ID,
"client_secret": settings.DISCORD_CLIENT_SECRET,
"code": request.GET.get("code"),
"grant_type": "authorization_code",
"redirect_uri": settings.SITE_ROOT + reverse(add_discord_complete),
}
result = curl.post("https://discordapp.com/api/oauth2/token", data)
doc = result.json()
if "access_token" in doc:
channel = Channel(kind="discord", project=project)
channel.value = result.text
channel.save()
channel.assign_all_checks()
messages.success(request, "The Discord integration has been added!")
else:
messages.warning(request, "Something went wrong.")
return redirect("hc-channels", project.code)
@require_setting("PUSHOVER_API_TOKEN")
def pushover_help(request):
ctx = {"page": "channels"}
return render(request, "integrations/add_pushover_help.html", ctx)
@require_setting("PUSHOVER_API_TOKEN")
@login_required
def add_pushover(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
state = token_urlsafe()
failure_url = settings.SITE_ROOT + reverse("hc-channels", args=[project.code])
success_url = (
settings.SITE_ROOT
+ reverse("hc-add-pushover", args=[project.code])
+ "?"
+ urlencode(
{
"state": state,
"prio": request.POST.get("po_priority", "0"),
"prio_up": request.POST.get("po_priority_up", "0"),
}
)
)
subscription_url = (
settings.PUSHOVER_SUBSCRIPTION_URL
+ "?"
+ urlencode({"success": success_url, "failure": failure_url})
)
request.session["pushover"] = state
return redirect(subscription_url)
# Handle successful subscriptions
if "pushover_user_key" in request.GET:
if "pushover" not in request.session:
return HttpResponseForbidden()
state = request.session.pop("pushover")
if request.GET.get("state") != state:
return HttpResponseForbidden()
if request.GET.get("pushover_unsubscribed") == "1":
# Unsubscription: delete all Pushover channels for this project
Channel.objects.filter(project=project, kind="po").delete()
return redirect("hc-channels", project.code)
form = forms.AddPushoverForm(request.GET)
if not form.is_valid():
return HttpResponseBadRequest()
channel = Channel(project=project, kind="po")
channel.value = form.get_value()
channel.save()
channel.assign_all_checks()
messages.success(request, "The Pushover integration has been added!")
return redirect("hc-channels", project.code)
# Show Integration Settings form
ctx = {
"page": "channels",
"project": project,
"po_retry_delay": td(seconds=settings.PUSHOVER_EMERGENCY_RETRY_DELAY),
"po_expiration": td(seconds=settings.PUSHOVER_EMERGENCY_EXPIRATION),
}
return render(request, "integrations/add_pushover.html", ctx)
@require_setting("OPSGENIE_ENABLED")
@login_required
def add_opsgenie(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddOpsgenieForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="opsgenie")
v = {"region": form.cleaned_data["region"], "key": form.cleaned_data["key"]}
channel.value = json.dumps(v)
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddOpsgenieForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_opsgenie.html", ctx)
@require_setting("VICTOROPS_ENABLED")
@login_required
def add_victorops(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="victorops")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_victorops.html", ctx)
@require_setting("ZULIP_ENABLED")
@login_required
def add_zulip(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddZulipForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="zulip")
channel.value = form.get_value()
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddZulipForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_zulip.html", ctx)
@csrf_exempt
@require_POST
def telegram_bot(request):
try:
doc = json.loads(request.body.decode())
if "channel_post" in doc:
# Telegram's "channel_post" key uses the same structure as "message".
# To keep the JSON schema and the view logic simple, if the payload
# contains "channel_post", copy it to "message", and proceed as usual.
doc["message"] = doc["channel_post"]
jsonschema.validate(doc, telegram_callback)
except ValueError:
return HttpResponseBadRequest()
except jsonschema.ValidationError:
# We don't recognize the message format, but don't want Telegram
# retrying this over and over again, so respond with 200 OK
return HttpResponse()
if "/start" not in doc["message"]["text"]:
return HttpResponse()
chat = doc["message"]["chat"]
name = max(chat.get("title", ""), chat.get("username", ""))
invite = render_to_string(
"integrations/telegram_invite.html",
{"qs": signing.dumps((chat["id"], chat["type"], name))},
)
try:
Telegram.send(chat["id"], invite)
except TransportError:
# Swallow the error and return HTTP 200 OK, otherwise Telegram will
# hit the webhook again and again.
pass
return HttpResponse()
@require_setting("TELEGRAM_TOKEN")
def telegram_help(request):
ctx = {
"page": "channels",
"bot_name": settings.TELEGRAM_BOT_NAME,
}
return render(request, "integrations/add_telegram.html", ctx)
@require_setting("TELEGRAM_TOKEN")
@login_required
def add_telegram(request):
chat_id, chat_type, chat_name = None, None, None
qs = request.META["QUERY_STRING"]
if qs:
try:
chat_id, chat_type, chat_name = signing.loads(qs, max_age=600)
except signing.BadSignature:
return render(request, "bad_link.html")
if request.method == "POST":
project = _get_rw_project_for_user(request, request.POST.get("project"))
channel = Channel(project=project, kind="telegram")
channel.value = json.dumps(
{"id": chat_id, "type": chat_type, "name": chat_name}
)
channel.save()
channel.assign_all_checks()
messages.success(request, "The Telegram integration has been added!")
return redirect("hc-channels", project.code)
ctx = {
"page": "channels",
"projects": request.profile.projects(),
"chat_id": chat_id,
"chat_type": chat_type,
"chat_name": chat_name,
"bot_name": settings.TELEGRAM_BOT_NAME,
}
return render(request, "integrations/add_telegram.html", ctx)
@require_setting("TWILIO_AUTH")
@login_required
def sms_form(request, channel=None, code=None):
is_new = channel is None
if is_new:
project = _get_rw_project_for_user(request, code)
channel = Channel(project=project, kind="sms")
if request.method == "POST":
form = forms.PhoneUpDownForm(request.POST)
if form.is_valid():
channel.name = form.cleaned_data["label"]
channel.value = form.get_json()
channel.save()
if is_new:
channel.assign_all_checks()
return redirect("hc-channels", channel.project.code)
elif is_new:
form = forms.PhoneUpDownForm(initial={"up": False})
else:
form = forms.PhoneUpDownForm(
{
"label": channel.name,
"phone": channel.phone_number,
"up": channel.sms_notify_up,
"down": channel.sms_notify_down,
}
)
ctx = {
"page": "channels",
"project": channel.project,
"form": form,
"profile": channel.project.owner_profile,
"is_new": is_new,
}
return render(request, "integrations/sms_form.html", ctx)
@require_setting("TWILIO_AUTH")
@login_required
def add_call(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.PhoneNumberForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="call")
channel.name = form.cleaned_data["label"]
channel.value = form.get_json()
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.PhoneNumberForm()
ctx = {
"page": "channels",
"project": project,
"form": form,
"profile": project.owner_profile,
}
return render(request, "integrations/add_call.html", ctx)
@require_setting("TWILIO_USE_WHATSAPP")
@login_required
def whatsapp_form(request, channel=None, code=None):
is_new = channel is None
if is_new:
project = _get_rw_project_for_user(request, code)
channel = Channel(project=project, kind="whatsapp")
if request.method == "POST":
form = forms.PhoneUpDownForm(request.POST)
if form.is_valid():
channel.name = form.cleaned_data["label"]
channel.value = form.get_json()
channel.save()
if is_new:
channel.assign_all_checks()
return redirect("hc-channels", channel.project.code)
elif is_new:
form = forms.PhoneUpDownForm()
else:
form = forms.PhoneUpDownForm(
{
"label": channel.name,
"phone": channel.phone_number,
"up": channel.whatsapp_notify_up,
"down": channel.whatsapp_notify_down,
}
)
ctx = {
"page": "channels",
"project": channel.project,
"form": form,
"profile": channel.project.owner_profile,
"is_new": is_new,
}
return render(request, "integrations/whatsapp_form.html", ctx)
@require_setting("SIGNAL_CLI_SOCKET")
@login_required
def signal_form(request, channel=None, code=None):
is_new = channel is None
if is_new:
project = _get_rw_project_for_user(request, code)
channel = Channel(project=project, kind="signal")
if request.method == "POST":
form = forms.PhoneUpDownForm(request.POST)
if form.is_valid():
channel.name = form.cleaned_data["label"]
channel.value = form.get_json()
channel.save()
if is_new:
channel.assign_all_checks()
return redirect("hc-channels", channel.project.code)
elif is_new:
form = forms.PhoneUpDownForm()
else:
form = forms.PhoneUpDownForm(
{
"label": channel.name,
"phone": channel.phone_number,
"up": channel.signal_notify_up,
"down": channel.signal_notify_down,
}
)
ctx = {
"page": "channels",
"project": channel.project,
"form": form,
"is_new": is_new,
}
return render(request, "integrations/signal_form.html", ctx)
@require_setting("TRELLO_APP_KEY")
@login_required
def add_trello(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddTrelloForm(request.POST)
if not form.is_valid():
return HttpResponseBadRequest()
channel = Channel(project=project, kind="trello")
channel.value = form.get_value()
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
return_url = settings.SITE_ROOT + reverse("hc-add-trello", args=[project.code])
authorize_url = "https://trello.com/1/authorize?" + urlencode(
{
"expiration": "never",
"name": settings.SITE_NAME,
"scope": "read,write",
"response_type": "token",
"key": settings.TRELLO_APP_KEY,
"return_url": return_url,
}
)
ctx = {
"page": "channels",
"project": project,
"authorize_url": authorize_url,
}
return render(request, "integrations/add_trello.html", ctx)
@require_setting("MATRIX_ACCESS_TOKEN")
@login_required
def add_matrix(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddMatrixForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="matrix")
channel.value = form.cleaned_data["room_id"]
# If user supplied room alias instead of ID, use it as channel name
alias = form.cleaned_data["alias"]
if not alias.startswith("!"):
channel.name = alias
channel.save()
channel.assign_all_checks()
messages.success(request, "The Matrix integration has been added!")
return redirect("hc-channels", project.code)
else:
form = forms.AddMatrixForm()
ctx = {
"page": "channels",
"project": project,
"form": form,
"matrix_user_id": settings.MATRIX_USER_ID,
}
return render(request, "integrations/add_matrix.html", ctx)
@require_setting("APPRISE_ENABLED")
@login_required
def add_apprise(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddAppriseForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="apprise")
channel.value = form.cleaned_data["url"]
channel.save()
channel.assign_all_checks()
messages.success(request, "The Apprise integration has been added!")
return redirect("hc-channels", project.code)
else:
form = forms.AddAppriseForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_apprise.html", ctx)
@require_setting("TRELLO_APP_KEY")
@login_required
@require_POST
def trello_settings(request):
token = request.POST.get("token")
url = "https://api.trello.com/1/members/me/boards"
params = {
"key": settings.TRELLO_APP_KEY,
"token": token,
"filter": "open",
"fields": "id,name",
"lists": "open",
"list_fields": "id,name",
}
boards = curl.get(url, params=params).json()
num_lists = sum(len(board["lists"]) for board in boards)
ctx = {"token": token, "boards": boards, "num_lists": num_lists}
return render(request, "integrations/trello_settings.html", ctx)
@require_setting("MSTEAMS_ENABLED")
@login_required
def add_msteams(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="msteams")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_msteams.html", ctx)
@require_setting("PROMETHEUS_ENABLED")
@login_required
def add_prometheus(request, code):
project, rw = _get_project_for_user(request, code)
ctx = {
"page": "channels",
"project": project,
"site_scheme": urlparse(settings.SITE_ROOT).scheme,
}
return render(request, "integrations/add_prometheus.html", ctx)
@require_setting("PROMETHEUS_ENABLED")
def metrics(request, code, key):
if len(key) != 32:
return HttpResponseBadRequest()
q = Project.objects.filter(code=code, api_key_readonly=key)
try:
project = q.get()
except Project.DoesNotExist:
return HttpResponseForbidden()
checks = Check.objects.filter(project_id=project.id).order_by("id")
def esc(s):
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def output(checks):
yield "# HELP hc_check_up Whether the check is currently up (1 for yes, 0 for no).\n"
yield "# TYPE hc_check_up gauge\n"
TMPL = """hc_check_up{name="%s", tags="%s", unique_key="%s"} %d\n"""
for check in checks:
value = 0 if check.get_status() == "down" else 1
yield TMPL % (esc(check.name), esc(check.tags), check.unique_key, value)
yield "\n"
yield "# HELP hc_check_started Whether the check is currently started (1 for yes, 0 for no).\n"
yield "# TYPE hc_check_started gauge\n"
TMPL = """hc_check_started{name="%s", tags="%s", unique_key="%s"} %d\n"""
for check in checks:
value = 1 if check.last_start is not None else 0
yield TMPL % (esc(check.name), esc(check.tags), check.unique_key, value)
tags_statuses, num_down = _tags_statuses(checks)
yield "\n"
yield "# HELP hc_tag_up Whether all checks with this tag are up (1 for yes, 0 for no).\n"
yield "# TYPE hc_tag_up gauge\n"
TMPL = """hc_tag_up{tag="%s"} %d\n"""
for tag in sorted(tags_statuses):
value = 0 if tags_statuses[tag] == "down" else 1
yield TMPL % (esc(tag), value)
yield "\n"
yield "# HELP hc_checks_total The total number of checks.\n"
yield "# TYPE hc_checks_total gauge\n"
yield "hc_checks_total %d\n" % len(checks)
yield "\n"
yield "# HELP hc_checks_down_total The number of checks currently down.\n"
yield "# TYPE hc_checks_down_total gauge\n"
yield "hc_checks_down_total %d\n" % num_down
return HttpResponse(output(checks), content_type="text/plain")
@require_setting("SPIKE_ENABLED")
@login_required
def add_spike(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="spike")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_spike.html", ctx)
@require_setting("LINENOTIFY_CLIENT_ID")
@login_required
def add_linenotify(request, code):
project = _get_rw_project_for_user(request, code)
state = token_urlsafe()
authorize_url = " https://notify-bot.line.me/oauth/authorize?" + urlencode(
{
"client_id": settings.LINENOTIFY_CLIENT_ID,
"redirect_uri": settings.SITE_ROOT + reverse(add_linenotify_complete),
"response_type": "code",
"state": state,
"scope": "notify",
}
)
ctx = {
"page": "channels",
"project": project,
"authorize_url": authorize_url,
}
request.session["add_linenotify"] = (state, str(project.code))
return render(request, "integrations/add_linenotify.html", ctx)
@require_setting("LINENOTIFY_CLIENT_ID")
@login_required
def add_linenotify_complete(request):
if "add_linenotify" not in request.session:
return HttpResponseForbidden()
state, code = request.session.pop("add_linenotify")
if request.GET.get("state") != state:
return HttpResponseForbidden()
project = _get_rw_project_for_user(request, code)
if request.GET.get("error") == "access_denied":
messages.warning(request, "LINE Notify setup was cancelled.")
return redirect("hc-channels", project.code)
# Exchange code for access token
data = {
"grant_type": "authorization_code",
"code": request.GET.get("code"),
"redirect_uri": settings.SITE_ROOT + reverse(add_linenotify_complete),
"client_id": settings.LINENOTIFY_CLIENT_ID,
"client_secret": settings.LINENOTIFY_CLIENT_SECRET,
}
result = curl.post("https://notify-bot.line.me/oauth/token", data)
doc = result.json()
if doc.get("status") != 200:
messages.warning(request, "Something went wrong.")
return redirect("hc-channels", project.code)
# Fetch notification target's name, will use it as channel name:
token = doc["access_token"]
result = curl.get(
"https://notify-api.line.me/api/status",
headers={"Authorization": "Bearer %s" % token},
)
doc = result.json()
channel = Channel(kind="linenotify", project=project)
channel.name = doc.get("target")
channel.value = token
channel.save()
channel.assign_all_checks()
messages.success(request, "The LINE Notify integration has been added!")
return redirect("hc-channels", project.code)
@login_required
def add_gotify(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddGotifyForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="gotify")
channel.value = form.get_value()
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddGotifyForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_gotify.html", ctx)
# Forks: add custom views after this line