healthchecks_healthchecks/hc/api/transports.py
Pēteris Caune e048ec4c48
Fix "class Foo(object):" -> "class Foo:"
In Python 3 these are equivalent, and shorter is better.
2024-10-29 17:57:50 +02:00

1625 lines
56 KiB
Python

from __future__ import annotations
import email
import json
import logging
import os
import socket
import time
import uuid
from collections.abc import Iterator
from email.message import EmailMessage
from typing import TYPE_CHECKING, Any, NoReturn, cast
from urllib.parse import quote, urlencode, urljoin
from django.conf import settings
from django.db import close_old_connections
from django.template.loader import render_to_string
from django.utils.html import escape
from pydantic import BaseModel, ValidationError
from hc.accounts.models import Profile
from hc.front.templatetags.hc_extras import (
absolute_site_logo_url,
fix_asterisks,
sortchecks,
)
from hc.lib import curl, emails
from hc.lib.date import format_duration
from hc.lib.html import extract_signal_styles
from hc.lib.signing import sign_bounce_id
from hc.lib.string import replace
from hc.lib.typealias import JSONDict, JSONList, JSONValue
if TYPE_CHECKING:
from hc.api.models import Channel, Check, Flip, Notification, Ping
try:
import apprise
have_apprise = True
except ImportError:
have_apprise = False
logger = logging.getLogger(__name__)
def tmpl(template_name: str, **ctx: Any) -> str:
template_path = f"integrations/{template_name}"
# \xa0 is non-breaking space. It causes SMS messages to use UCS2 encoding
# and cost twice the money.
return render_to_string(template_path, ctx).strip().replace("\xa0", " ")
def get_ping_body(ping: Ping | None, maxlen: int | None = None) -> str | None:
"""Return ping body for a given Ping object.
Does two extra things in addition to simply calling Ping.get_body():
* if body has not been uploaded to object storage yet, waits 5 seconds
and tries to fetch it again
* if body is longer than the `maxlen` argument, truncate it
"""
body = None
if ping and ping.has_body():
body = ping.get_body()
if body is None and ping.object_size:
# Body is not uploaded to object storage yet.
# Wait 5 seconds, then fetch the body again.
time.sleep(5)
body = ping.get_body()
if body and maxlen and len(body) > maxlen:
body = body[:maxlen] + "\n[truncated]"
return body
class TransportError(Exception):
def __init__(self, message: str, permanent: bool = False) -> None:
self.message = message
self.permanent = permanent
class Transport:
def __init__(self, channel: Channel):
self.channel = channel
def notify(self, flip: Flip, notification: Notification) -> None:
"""Send notification about current status of the check.
This method raises TransportError on error, and returns None
on success.
"""
raise NotImplementedError()
def is_noop(self, status: str) -> bool:
"""Return True if transport will ignore check's current status.
This method is overridden in Webhook subclass where the user can
configure webhook urls for "up" and "down" events, and both are
optional.
"""
return False
def down_checks(self, check: Check) -> list[Check] | None:
"""Return a sorted list of other checks in the same project that are down.
If there are no other hecks in the project, return None instead of empty list.
Templates can check for None to decide whether to show or not show the
"All other checks are up" note.
"""
siblings = self.channel.project.check_set.exclude(id=check.id)
if not siblings.exists():
return None
down_siblings = list(siblings.filter(status="down"))
sortchecks(down_siblings, "name")
return down_siblings
def last_ping(self, flip: Flip) -> Ping | None:
"""Return the last Ping object received before this flip."""
if not flip.owner.pk:
return None
# Sort by "created". Sorting by "id" can cause postgres to pick api_ping.id
# index (slow if the api_ping table is big)
q = flip.owner.ping_set.order_by("created")
# Make sure we're not selecting pings that occurred after the flip
q = q.filter(created__lte=flip.created)
return q.last()
class Email(Transport):
def notify(self, flip: Flip, notification: Notification) -> None:
if not self.channel.email_verified:
raise TransportError("Email not verified")
unsub_link = self.channel.get_unsub_link()
headers = {
"List-Unsubscribe": "<%s>" % unsub_link,
"List-Unsubscribe-Post": "List-Unsubscribe=One-Click",
"X-Bounce-ID": sign_bounce_id("n.%s" % notification.code),
}
from hc.accounts.models import Profile
# If this email address has an associated account, include
# a summary of projects the account has access to
try:
profile = Profile.objects.get(user__email=self.channel.email.value)
projects = list(profile.projects())
except Profile.DoesNotExist:
projects = None
ping = self.last_ping(flip)
body = get_ping_body(ping)
subject = None
if ping is not None and ping.scheme == "email" and body:
parsed = email.message_from_string(body, policy=email.policy.SMTP)
assert isinstance(parsed, EmailMessage)
subject = parsed.get("subject", "")
ctx = {
"flip": flip,
"check": flip.owner,
"ping": ping,
"body": body,
"subject": subject,
"projects": projects,
"unsub_link": unsub_link,
}
emails.alert(self.channel.email.value, ctx, headers)
def is_noop(self, status: str) -> bool:
if status == "down":
return not self.channel.email.notify_down
else:
return not self.channel.email.notify_up
class Shell(Transport):
def prepare(self, template: str, flip: Flip) -> str:
"""Replace placeholders with actual values."""
check = flip.owner
ctx = {
"$CODE": str(check.code),
"$STATUS": flip.new_status,
"$NOW": flip.created.replace(microsecond=0).isoformat(),
"$NAME": check.name,
"$TAGS": check.tags,
}
for i, tag in enumerate(check.tags_list()):
ctx["$TAG%d" % (i + 1)] = tag
return replace(template, ctx)
def is_noop(self, status: str) -> bool:
if status == "down" and not self.channel.shell.cmd_down:
return True
if status == "up" and not self.channel.shell.cmd_up:
return True
return False
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.SHELL_ENABLED:
raise TransportError("Shell commands are not enabled")
if flip.new_status == "up":
cmd = self.channel.shell.cmd_up
elif flip.new_status == "down":
cmd = self.channel.shell.cmd_down
cmd = self.prepare(cmd, flip)
code = os.system(cmd)
if code != 0:
raise TransportError("Command returned exit code %d" % code)
class HttpTransport(Transport):
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
# Subclasses can override this method to produce a more specific message.
raise TransportError(f"Received status code {response.status_code}")
@classmethod
def _request(
cls,
method: str,
url: str,
*,
params: curl.Params,
data: curl.Data,
json: Any,
headers: curl.Headers,
auth: curl.Auth,
) -> None:
try:
r = curl.request(
method,
url,
params=params,
data=data,
json=json,
headers=headers,
auth=auth,
timeout=30,
)
if r.status_code not in (200, 201, 202, 204):
cls.raise_for_response(r)
except curl.CurlError as e:
raise TransportError(e.message)
@classmethod
def request(
cls,
method: str,
url: str,
*,
retry: bool,
params: curl.Params = None,
data: curl.Data = None,
json: Any = None,
headers: curl.Headers = None,
auth: curl.Auth = None,
) -> None:
tries_left = 3 if retry else 1
while True:
try:
return cls._request(
method,
url,
params=params,
data=data,
json=json,
headers=headers,
auth=auth,
)
except TransportError as e:
tries_left = 0 if e.permanent else tries_left - 1
# If we have no tries left then abort the retry loop by re-raising
# the exception:
if tries_left == 0:
raise e
# Convenience wrapper around self.request for making "POST" requests
@classmethod
def post(
cls,
url: str,
retry: bool = True,
*,
params: curl.Params = None,
data: curl.Data = None,
json: Any = None,
headers: curl.Headers = None,
auth: curl.Auth = None,
) -> None:
cls.request(
"post",
url,
retry=retry,
params=params,
data=data,
json=json,
headers=headers,
auth=auth,
)
class Webhook(HttpTransport):
def prepare(
self,
template: str,
flip: Flip,
urlencode: bool = False,
latin1: bool = False,
allow_ping_body: bool = False,
) -> str:
"""Replace variables with actual values."""
def safe(s: str) -> str:
return quote(s) if urlencode else s
check = flip.owner
ctx = {
"$CODE": str(check.code),
"$STATUS": flip.new_status,
"$NOW": safe(flip.created.replace(microsecond=0).isoformat()),
"$NAME_JSON": safe(json.dumps(check.name)),
"$NAME": safe(check.name),
"$SLUG": check.slug,
"$TAGS": safe(check.tags),
"$JSON": safe(json.dumps(check.to_dict())),
}
# Materialize ping body only if template refers to it.
if allow_ping_body and "$BODY" in template:
body = get_ping_body(self.last_ping(flip))
ctx["$BODY_JSON"] = json.dumps(body if body else "")
ctx["$BODY"] = body if body else ""
if "$EXITSTATUS" in template:
ctx["$EXITSTATUS"] = "-1"
lp = self.last_ping(flip)
if lp and lp.exitstatus is not None:
ctx["$EXITSTATUS"] = str(lp.exitstatus)
for i, tag in enumerate(check.tags_list()):
ctx["$TAG%d" % (i + 1)] = safe(tag)
result = replace(template, ctx)
if latin1:
# Replace non-latin-1 characters with XML character references.
result = result.encode("latin-1", "xmlcharrefreplace").decode("latin-1")
return result
def is_noop(self, status: str) -> bool:
spec = self.channel.webhook_spec(status)
if not spec.url:
return True
return False
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.WEBHOOKS_ENABLED:
raise TransportError("Webhook notifications are not enabled.")
spec = self.channel.webhook_spec(flip.new_status)
if not spec.url:
raise TransportError("Empty webhook URL")
method = spec.method.lower()
url = self.prepare(spec.url, flip, urlencode=True)
retry = True
if notification.owner is None:
# This is a test notification.
# When sending a test notification, don't retry on failures.
retry = False
body, body_bytes = spec.body, None
if body and spec.method in ("POST", "PUT"):
body = self.prepare(body, flip, allow_ping_body=True)
body_bytes = body.encode()
headers = {}
for key, value in spec.headers.items():
# Header values should contain ASCII and latin-1 only
headers[key] = self.prepare(value, flip, latin1=True)
# Give up database connection before potentially long network IO:
close_old_connections()
self.request(method, url, retry=retry, data=body_bytes, headers=headers)
class SlackFields(list[JSONValue]):
"""Helper class for preparing [{"title": ..., "value": ... }, ...] structures."""
def add(self, title: str, value: str, short: bool = True) -> None:
field: JSONDict = {"title": title, "value": value}
if short:
field["short"] = True
self.append(field)
class Slackalike(HttpTransport):
"""Base class for transports that use Slack-compatible incoming webhooks."""
def payload(self, flip: Flip) -> JSONDict:
"""Prepare JSON-serializable payload for Slack-compatible incoming webhook."""
check = flip.owner
name = check.name_then_code()
fields = SlackFields()
result: JSONDict = {
"username": settings.SITE_NAME,
"icon_url": absolute_site_logo_url(),
"attachments": [
{
"color": "good" if flip.new_status == "up" else "danger",
"fallback": f'The check "{name}" is {flip.new_status.upper()}.',
"mrkdwn_in": ["fields"],
"title": f"{name}” is {flip.new_status.upper()}.",
"title_link": check.cloaked_url(),
"fields": fields,
}
],
}
if check.desc:
fields.add("Description", check.desc, short=False)
if check.project.name:
fields.add("Project", check.project.name)
if tags := check.tags_list():
fields.add("Tags", " ".join(f"`{tag}`" for tag in tags))
if check.kind == "simple":
fields.add("Period", format_duration(check.timeout))
if check.kind in ("cron", "oncalendar"):
fields.add("Schedule", fix_asterisks(check.schedule))
fields.add("Time Zone", check.tz)
if ping := self.last_ping(flip):
fields.add("Total Pings", str(ping.n))
fields.add("Last Ping", ping.formatted_kind_created())
else:
fields.add("Total Pings", "0")
fields.add("Last Ping", "Never")
body = get_ping_body(ping, maxlen=1000)
if body and "```" not in body:
fields.add("Last Ping Body", f"```\n{body}\n```", short=False)
return result
def notify(self, flip: Flip, notification: Notification) -> None:
self.post(self.channel.slack_webhook_url, json=self.payload(flip))
class Slack(Slackalike):
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
message = f"Received status code {response.status_code}"
permanent = False
if response.status_code == 404:
# If Slack returns 404, this endpoint is unlikely to ever work again
# https://api.slack.com/messaging/webhooks#handling_errors
permanent = True
elif response.status_code == 400:
if response.content == b"invalid_token":
# If Slack returns 400 with "invalid_token" in response body,
# we're using a deactivated user's token to post to a private channel.
# In theory this condition can recover (a deactivated user can be
# activated), but in practice it is unlikely to happen.
permanent = True
else:
# Log it for later inspection
logger.debug("Slack returned HTTP 400 with body: %s", response.content)
raise TransportError(message, permanent=permanent)
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.SLACK_ENABLED:
raise TransportError("Slack notifications are not enabled.")
self.post(self.channel.slack_webhook_url, json=self.payload(flip))
class Mattermost(Slackalike):
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.MATTERMOST_ENABLED:
raise TransportError("Mattermost notifications are not enabled.")
self.post(self.channel.slack_webhook_url, json=self.payload(flip))
class Discord(Slackalike):
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
message = f"Received status code {response.status_code}"
# Consider 404 a permanent failure
permanent = response.status_code == 404
raise TransportError(message, permanent=permanent)
def notify(self, flip: Flip, notification: Notification) -> None:
url = self.channel.discord_webhook_url + "/slack"
self.post(url, json=self.payload(flip))
class Opsgenie(HttpTransport):
class ErrorModel(BaseModel):
message: str
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
message = f"Received status code {response.status_code}"
try:
r = Opsgenie.ErrorModel.model_validate_json(response.content)
message += f' with a message: "{r.message}"'
except ValidationError:
pass
raise TransportError(message)
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.OPSGENIE_ENABLED:
raise TransportError("Opsgenie notifications are not enabled.")
headers = {
"Content-Type": "application/json",
"Authorization": "GenieKey %s" % self.channel.opsgenie.key,
}
check = flip.owner
payload: JSONDict = {
"alias": str(check.unique_key),
"source": settings.SITE_NAME,
}
if flip.new_status == "down":
ctx = {"check": check, "ping": self.last_ping(flip)}
payload["tags"] = cast(JSONValue, check.tags_list())
payload["message"] = tmpl("opsgenie_message.html", **ctx)
payload["description"] = check.desc
details: JSONDict = {}
details["Project"] = check.project.name
if ping := self.last_ping(flip):
details["Total pings"] = ping.n
details["Last ping"] = ping.formatted_kind_created()
else:
details["Total pings"] = 0
details["Last ping"] = "Never"
if check.kind == "simple":
details["Period"] = format_duration(check.timeout)
if check.kind in ("cron", "oncalendar"):
details["Schedule"] = f"<code>{check.schedule}</code>"
details["Time zone"] = check.tz
details["Full details"] = check.cloaked_url()
payload["details"] = details
url = "https://api.opsgenie.com/v2/alerts"
if self.channel.opsgenie.region == "eu":
url = "https://api.eu.opsgenie.com/v2/alerts"
if flip.new_status == "up":
url += f"/{check.unique_key}/close?identifierType=alias"
self.post(url, json=payload, headers=headers)
class PagerDuty(HttpTransport):
URL = "https://events.pagerduty.com/generic/2010-04-15/create_event.json"
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.PD_ENABLED:
raise TransportError("PagerDuty notifications are not enabled.")
check = flip.owner
details: JSONDict = {
"Project": check.project.name,
}
if ping := self.last_ping(flip):
details["Total pings"] = ping.n
details["Last ping"] = ping.formatted_kind_created()
else:
details["Total pings"] = 0
details["Last ping"] = "Never"
if check.desc:
details["Description"] = check.desc
if check.tags:
details["Tags"] = ", ".join(check.tags_list())
if check.kind == "simple":
details["Period"] = format_duration(check.timeout)
if check.kind in ("cron", "oncalendar"):
details["Schedule"] = check.schedule
details["Time zone"] = check.tz
description = tmpl("pd_description.html", check=check, status=flip.new_status)
payload = {
"service_key": self.channel.pd.service_key,
"incident_key": check.unique_key,
"event_type": "trigger" if flip.new_status == "down" else "resolve",
"description": description,
"client": settings.SITE_NAME,
"client_url": check.details_url(),
"details": details,
}
self.post(self.URL, json=payload)
class PagerTree(HttpTransport):
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.PAGERTREE_ENABLED:
raise TransportError("PagerTree notifications are not enabled.")
url = self.channel.value
headers = {"Content-Type": "application/json"}
ctx = {
"check": flip.owner,
"status": flip.new_status,
"ping": self.last_ping(flip),
}
payload = {
"incident_key": str(flip.owner.unique_key),
"event_type": "trigger" if flip.new_status == "down" else "resolve",
"title": tmpl("pagertree_title.html", **ctx),
"description": tmpl("pagertree_description.html", **ctx),
"client": settings.SITE_NAME,
"client_url": settings.SITE_ROOT,
"tags": ",".join(flip.owner.tags_list()),
}
self.post(url, json=payload, headers=headers)
class Pushbullet(HttpTransport):
def notify(self, flip: Flip, notification: Notification) -> None:
url = "https://api.pushbullet.com/v2/pushes"
headers = {
"Access-Token": self.channel.value,
"Content-Type": "application/json",
}
text = tmpl(
"pushbullet_message.html",
check=flip.owner,
status=flip.new_status,
ping=self.last_ping(flip),
)
payload = {"type": "note", "title": settings.SITE_NAME, "body": text}
self.post(url, json=payload, headers=headers)
class Pushover(HttpTransport):
URL = "https://api.pushover.net/1/messages.json"
CANCEL_TMPL = "https://api.pushover.net/1/receipts/cancel_by_tag/%s.json"
class ErrorModel(BaseModel):
user: str = ""
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
message = f"Received status code {response.status_code}"
permanent = False
if response.status_code == 400:
try:
doc = Pushover.ErrorModel.model_validate_json(response.content)
if doc.user == "invalid":
message += " (invalid user)"
permanent = True
except ValidationError:
logger.debug("Pushover HTTP 400 with body: %s", response.content)
raise TransportError(message, permanent=permanent)
def is_noop(self, status: str) -> bool:
pieces = self.channel.value.split("|")
_, prio = pieces[0], pieces[1]
# The third element, if present, is the priority for "up" events
if status == "up" and len(pieces) == 3:
prio = pieces[2]
return int(prio) == -3
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.PUSHOVER_API_TOKEN:
raise TransportError("Pushover notifications are not enabled.")
pieces = self.channel.value.split("|")
user_key, down_prio = pieces[0], pieces[1]
# The third element, if present, is the priority for "up" events
up_prio = down_prio
if len(pieces) == 3:
up_prio = pieces[2]
from hc.api.models import TokenBucket
if not TokenBucket.authorize_pushover(user_key):
raise TransportError("Rate limit exceeded")
check = flip.owner
# If down events have the emergency priority,
# send a cancel call first
if flip.new_status == "up" and down_prio == "2":
url = self.CANCEL_TMPL % check.unique_key
cancel_payload = {"token": settings.PUSHOVER_API_TOKEN}
self.post(url, data=cancel_payload)
ctx = {
"check": check,
"status": flip.new_status,
"ping": self.last_ping(flip),
"down_checks": self.down_checks(check),
}
text = tmpl("pushover_message.html", **ctx)
title = tmpl("pushover_title.html", **ctx)
prio = up_prio if flip.new_status == "up" else down_prio
payload = {
"token": settings.PUSHOVER_API_TOKEN,
"user": user_key,
"message": text,
"title": title,
"html": 1,
"priority": int(prio),
"tags": check.unique_key,
"url": check.cloaked_url(),
"url_title": f"View on {settings.SITE_NAME}",
}
# Emergency notification
if prio == "2":
payload["retry"] = settings.PUSHOVER_EMERGENCY_RETRY_DELAY
payload["expire"] = settings.PUSHOVER_EMERGENCY_EXPIRATION
self.post(self.URL, data=payload)
class RocketChat(HttpTransport):
def payload(self, flip: Flip) -> JSONDict:
check = flip.owner
url = check.cloaked_url()
color = "#5cb85c" if flip.new_status == "up" else "#d9534f"
fields = SlackFields()
result: JSONDict = {
"alias": settings.SITE_NAME,
"avatar": absolute_site_logo_url(),
"text": f"[{check.name_then_code()}]({url}) is {flip.new_status.upper()}.",
"attachments": [{"color": color, "fields": fields}],
}
if check.desc:
fields.add("Description", check.desc, short=False)
if check.project.name:
fields.add("Project", check.project.name)
if tags := check.tags_list():
fields.add("Tags", " ".join(f"`{tag}`" for tag in tags))
if check.kind == "simple":
fields.add("Period", format_duration(check.timeout))
if check.kind in ("cron", "oncalendar"):
fields.add("Schedule", fix_asterisks(check.schedule))
fields.add("Time Zone", check.tz)
if ping := self.last_ping(flip):
fields.add("Total Pings", str(ping.n))
fields.add("Last Ping", ping.formatted_kind_created())
if body_size := ping.get_body_size():
bytes_str = "byte" if body_size == 1 else "bytes"
ping_url = f"{url}#ping-{ping.n}"
text = f"{body_size} {bytes_str}, [show body]({ping_url})"
fields.add("Last Ping Body", text)
else:
fields.add("Total Pings", "0")
fields.add("Last Ping", "Never")
return result
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.ROCKETCHAT_ENABLED:
raise TransportError("Rocket.Chat notifications are not enabled.")
self.post(self.channel.value, json=self.payload(flip))
class VictorOps(HttpTransport):
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
message = f"Received status code {response.status_code}"
# If the endpoint returns 404, this endpoint is unlikely to ever work again
permanent = response.status_code == 404
raise TransportError(message, permanent=permanent)
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.VICTOROPS_ENABLED:
raise TransportError("Splunk On-Call notifications are not enabled.")
ctx = {
"check": flip.owner,
"status": flip.new_status,
"ping": self.last_ping(flip),
}
mtype = "CRITICAL" if flip.new_status == "down" else "RECOVERY"
payload = {
"entity_id": str(flip.owner.unique_key),
"message_type": mtype,
"entity_display_name": flip.owner.name_then_code(),
"state_message": tmpl("victorops_description.html", **ctx),
"monitoring_tool": settings.SITE_NAME,
}
self.post(self.channel.value, json=payload)
class Matrix(HttpTransport):
def get_url(self) -> str:
s = quote(self.channel.value)
assert isinstance(settings.MATRIX_HOMESERVER, str)
url = settings.MATRIX_HOMESERVER
url += "/_matrix/client/r0/rooms/%s/send/m.room.message?" % s
url += urlencode({"access_token": settings.MATRIX_ACCESS_TOKEN})
return url
def notify(self, flip: Flip, notification: Notification) -> None:
ctx = {
"check": flip.owner,
"status": flip.new_status,
"ping": self.last_ping(flip),
}
plain = tmpl("matrix_description.html", **ctx)
formatted = tmpl("matrix_description_formatted.html", **ctx)
payload = {
"msgtype": "m.text",
"body": plain,
"format": "org.matrix.custom.html",
"formatted_body": formatted,
}
self.post(self.get_url(), json=payload)
class MigrationRequiredError(TransportError):
def __init__(self, message: str, new_chat_id: int):
super().__init__(message, permanent=True)
self.new_chat_id = new_chat_id
class Telegram(HttpTransport):
SM = f"https://api.telegram.org/bot{settings.TELEGRAM_TOKEN}/sendMessage"
class MigrationParameters(BaseModel):
migrate_to_chat_id: int
class ErrorModel(BaseModel):
description: str
parameters: Telegram.MigrationParameters | None = None
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
message = f"Received status code {response.status_code}"
try:
m = Telegram.ErrorModel.model_validate_json(response.content)
except ValidationError:
raise TransportError(message)
if m.parameters:
# If the error payload contains the migrate_to_chat_id field,
# raise MigrationRequiredError, with the new chat_id included
chat_id = m.parameters.migrate_to_chat_id
raise MigrationRequiredError(m.description, chat_id)
permanent = False
message += f' with a message: "{m.description}"'
if m.description == "Forbidden: the group chat was deleted":
permanent = True
if m.description == "Forbidden: bot was blocked by the user":
permanent = True
raise TransportError(message, permanent=permanent)
@classmethod
def send(cls, chat_id: int, thread_id: int | None, text: str) -> None:
# Telegram.send is a separate method because it is also used in
# hc.front.views.telegram_bot to send invite links.
payload = {
"chat_id": chat_id,
"message_thread_id": thread_id,
"text": text,
"parse_mode": "html",
}
cls.post(cls.SM, json=payload)
def notify(self, flip: Flip, notification: Notification) -> None:
from hc.api.models import TokenBucket
if not TokenBucket.authorize_telegram(self.channel.telegram.id):
raise TransportError("Rate limit exceeded")
ping = self.last_ping(flip)
ctx = {
"check": flip.owner,
"status": flip.new_status,
"down_checks": self.down_checks(flip.owner),
"ping": ping,
# Telegram's message limit is 4096 chars, but clip body at 1000 for
# consistency
"body": get_ping_body(ping, maxlen=1000),
}
text = tmpl("telegram_message.html", **ctx)
try:
self.send(self.channel.telegram.id, self.channel.telegram.thread_id, text)
except MigrationRequiredError as e:
# Save the new chat_id, then try sending again:
self.channel.update_telegram_id(e.new_chat_id)
self.send(self.channel.telegram.id, self.channel.telegram.thread_id, text)
class Sms(HttpTransport):
URL = "https://api.twilio.com/2010-04-01/Accounts/%s/Messages.json"
class ErrorModel(BaseModel):
code: int
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
if response.status_code == 400:
try:
doc = Sms.ErrorModel.model_validate_json(response.content, strict=True)
if doc.code == 21211:
raise TransportError("Invalid phone number", permanent=True)
except ValidationError:
pass
logger.debug("Twilio Messages HTTP 400 with body: %s", response.content)
raise TransportError(f"Received status code {response.status_code}")
def is_noop(self, status: str) -> bool:
if status == "down":
return not self.channel.phone.notify_down
else:
return not self.channel.phone.notify_up
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.TWILIO_ACCOUNT or not settings.TWILIO_AUTH:
raise TransportError("SMS notifications are not enabled")
profile = Profile.objects.for_user(self.channel.project.owner)
if not profile.authorize_sms():
profile.send_sms_limit_notice("SMS")
raise TransportError("Monthly SMS limit exceeded")
url = self.URL % settings.TWILIO_ACCOUNT
auth = (settings.TWILIO_ACCOUNT, settings.TWILIO_AUTH)
text = tmpl(
"sms_message.html",
check=flip.owner,
status=flip.new_status,
ping=self.last_ping(flip),
site_name=settings.SITE_NAME,
)
data = {
"To": self.channel.phone.value,
"Body": text,
"StatusCallback": notification.status_url(),
"RiskCheck": "disable",
}
if settings.TWILIO_MESSAGING_SERVICE_SID:
data["MessagingServiceSid"] = settings.TWILIO_MESSAGING_SERVICE_SID
else:
assert settings.TWILIO_FROM
data["From"] = settings.TWILIO_FROM
self.post(url, data=data, auth=auth)
class Call(HttpTransport):
URL = "https://api.twilio.com/2010-04-01/Accounts/%s/Calls.json"
class ErrorModel(BaseModel):
code: int
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
if response.status_code == 400:
try:
doc = Call.ErrorModel.model_validate_json(response.content, strict=True)
if doc.code == 21211:
raise TransportError("Invalid phone number", permanent=True)
except ValidationError:
pass
logger.debug("Twilio Calls HTTP 400 with body: %s", response.content)
raise TransportError(f"Received status code {response.status_code}")
def is_noop(self, status: str) -> bool:
return status != "down"
def notify(self, flip: Flip, notification: Notification) -> None:
if (
not settings.TWILIO_ACCOUNT
or not settings.TWILIO_AUTH
or not settings.TWILIO_FROM
):
raise TransportError("Call notifications are not enabled")
profile = Profile.objects.for_user(self.channel.project.owner)
if not profile.authorize_call():
profile.send_call_limit_notice()
raise TransportError("Monthly phone call limit exceeded")
url = self.URL % settings.TWILIO_ACCOUNT
auth = (settings.TWILIO_ACCOUNT, settings.TWILIO_AUTH)
ctx = {"check": flip.owner, "site_name": settings.SITE_NAME}
data = {
"From": settings.TWILIO_FROM,
"To": self.channel.phone.value,
"Twiml": tmpl("call_message.html", **ctx),
"StatusCallback": notification.status_url(),
}
self.post(url, data=data, auth=auth)
class WhatsApp(HttpTransport):
URL = "https://api.twilio.com/2010-04-01/Accounts/%s/Messages.json"
class ErrorModel(BaseModel):
code: int
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
if response.status_code == 400:
try:
doc = WhatsApp.ErrorModel.model_validate_json(
response.content, strict=True
)
if doc.code == 21211:
raise TransportError("Invalid phone number", permanent=True)
except ValidationError:
pass
logger.debug("WhatsApp HTTP 400 with body: %s", response.content)
raise TransportError(f"Received status code {response.status_code}")
def is_noop(self, status: str) -> bool:
if status == "down":
return not self.channel.phone.notify_down
else:
return not self.channel.phone.notify_up
def notify(self, flip: Flip, notification: Notification) -> None:
for key in (
"TWILIO_USE_WHATSAPP",
"TWILIO_ACCOUNT",
"TWILIO_AUTH",
"TWILIO_FROM",
"TWILIO_MESSAGING_SERVICE_SID",
"WHATSAPP_DOWN_CONTENT_SID",
"WHATSAPP_UP_CONTENT_SID",
):
if not getattr(settings, key):
raise TransportError("WhatsApp notifications are not enabled")
profile = Profile.objects.for_user(self.channel.project.owner)
if not profile.authorize_sms():
profile.send_sms_limit_notice("WhatsApp")
raise TransportError("Monthly message limit exceeded")
url = self.URL % settings.TWILIO_ACCOUNT
assert settings.TWILIO_ACCOUNT and settings.TWILIO_AUTH
auth = (settings.TWILIO_ACCOUNT, settings.TWILIO_AUTH)
if flip.new_status == "down":
content_sid = settings.WHATSAPP_DOWN_CONTENT_SID
else:
content_sid = settings.WHATSAPP_UP_CONTENT_SID
data = {
"To": f"whatsapp:{self.channel.phone.value}",
"From": f"whatsapp:{settings.TWILIO_FROM}",
"MessagingServiceSid": settings.TWILIO_MESSAGING_SERVICE_SID,
"ContentSid": content_sid,
"ContentVariables": json.dumps({1: flip.owner.name_then_code()}),
"StatusCallback": notification.status_url(),
}
self.post(url, data=data, auth=auth)
class Trello(HttpTransport):
URL = "https://api.trello.com/1/cards"
def is_noop(self, status: str) -> bool:
return status != "down"
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.TRELLO_APP_KEY:
raise TransportError("Trello notifications are not enabled.")
ctx = {
"check": flip.owner,
"status": flip.new_status,
"ping": self.last_ping(flip),
}
params = {
"idList": self.channel.trello.list_id,
"name": tmpl("trello_name.html", **ctx),
"desc": tmpl("trello_desc.html", **ctx),
"key": settings.TRELLO_APP_KEY,
"token": self.channel.trello.token,
}
self.post(self.URL, params=params)
class Apprise(HttpTransport):
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.APPRISE_ENABLED or not have_apprise:
raise TransportError("Apprise is disabled and/or not installed")
a = apprise.Apprise()
check, status, ping = flip.owner, flip.new_status, self.last_ping(flip)
title = tmpl("apprise_title.html", check=check, status=status)
body = tmpl("apprise_description.html", check=check, status=status, ping=ping)
a.add(self.channel.value)
notify_type = (
apprise.NotifyType.SUCCESS if status == "up" else apprise.NotifyType.FAILURE
)
if not a.notify(body=body, title=title, notify_type=notify_type):
raise TransportError("Failed")
class MsTeams(HttpTransport):
def payload(self, flip: Flip) -> JSONDict:
check = flip.owner
name = check.name_then_code()
facts: JSONList = []
sections: JSONList = [{"text": check.desc, "facts": facts}]
result: JSONDict = {
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"title": f"{escape(name)}” is {flip.new_status.upper()}.",
"summary": f"{name}” is {flip.new_status.upper()}.",
"themeColor": "5cb85c" if flip.new_status == "up" else "d9534f",
"sections": sections,
"potentialAction": [
{
"@type": "OpenUri",
"name": f"View in {settings.SITE_NAME}",
"targets": [{"os": "default", "uri": check.cloaked_url()}],
}
],
}
if tags := check.tags_list():
formatted_tags = " ".join(f"`{tag}`" for tag in tags)
facts.append({"name": "Tags:", "value": formatted_tags})
if check.kind == "simple":
facts.append({"name": "Period:", "value": format_duration(check.timeout)})
if check.kind in ("cron", "oncalendar"):
facts.append({"name": "Schedule:", "value": fix_asterisks(check.schedule)})
facts.append({"name": "Time Zone:", "value": check.tz})
if ping := self.last_ping(flip):
facts.append({"name": "Total Pings:", "value": str(ping.n)})
facts.append({"name": "Last Ping:", "value": ping.formatted_kind_created()})
else:
facts.append({"name": "Total Pings:", "value": "0"})
facts.append({"name": "Last Ping:", "value": "Never"})
body = get_ping_body(ping, maxlen=1000)
if body and "```" not in body:
section_text = f"**Last Ping Body**:\n```\n{ body }\n```"
sections.append({"text": section_text})
return result
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.MSTEAMS_ENABLED:
raise TransportError("MS Teams notifications are not enabled.")
self.post(self.channel.value, json=self.payload(flip))
class MsTeamsWorkflow(HttpTransport):
def payload(self, flip: Flip) -> JSONDict:
check = flip.owner
name = check.name_then_code()
fields = SlackFields()
indicator = "🔴" if flip.new_status == "down" else "🟢"
result: JSONDict = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"contentUrl": None,
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"fallbackText": f"{escape(name)}” is {flip.new_status.upper()}.",
"version": "1.2",
"body": [
{
"type": "TextBlock",
"text": f"{indicator}{escape(name)}” is {flip.new_status.upper()}.",
"weight": "bolder",
"size": "medium",
"wrap": True,
"style": "heading",
},
{
"type": "FactSet",
"facts": fields,
},
],
"actions": [
{
"type": "Action.OpenUrl",
"title": f"View in {settings.SITE_NAME}",
"url": check.cloaked_url(),
}
],
},
}
],
}
if check.desc:
fields.add("Description:", check.desc.replace("\n", "\n\n"))
if check.project.name:
fields.add("Project:", check.project.name)
if tags := check.tags_list():
formatted_tags = " ".join(tags)
fields.add("Tags:", formatted_tags)
if check.kind == "simple":
fields.add("Period:", format_duration(check.timeout))
if check.kind in ("cron", "oncalendar"):
fields.add("Schedule:", fix_asterisks(check.schedule))
fields.add("Time Zone:", check.tz)
if ping := self.last_ping(flip):
fields.add("Total Pings:", str(ping.n))
fields.add("Last Ping:", ping.formatted_kind_created())
else:
fields.add("Total Pings:", "0")
fields.add("Last Ping:", "Never")
return result
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.MSTEAMS_ENABLED:
raise TransportError("MS Teams notifications are not enabled.")
self.post(self.channel.value, json=self.payload(flip))
class Zulip(HttpTransport):
class ErrorModel(BaseModel):
msg: str
@classmethod
def raise_for_response(cls, response: curl.Response) -> NoReturn:
message = f"Received status code {response.status_code}"
try:
f = Zulip.ErrorModel.model_validate_json(response.content)
message += f' with a message: "{f.msg}"'
except ValidationError:
pass
raise TransportError(message)
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.ZULIP_ENABLED:
raise TransportError("Zulip notifications are not enabled.")
topic = self.channel.zulip.topic
if not topic:
topic = tmpl("zulip_topic.html", check=flip.owner, status=flip.new_status)
url = self.channel.zulip.site + "/api/v1/messages"
auth = (self.channel.zulip.bot_email, self.channel.zulip.api_key)
content = tmpl(
"zulip_content.html",
check=flip.owner,
status=flip.new_status,
ping=self.last_ping(flip),
)
data = {
"type": self.channel.zulip.mtype,
"to": self.channel.zulip.to,
"topic": topic,
"content": content,
}
self.post(url, data=data, auth=auth)
class Spike(HttpTransport):
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.SPIKE_ENABLED:
raise TransportError("Spike notifications are not enabled.")
url = self.channel.value
headers = {"Content-Type": "application/json"}
ctx = {
"check": flip.owner,
"status": flip.new_status,
"ping": self.last_ping(flip),
}
payload = {
"check_id": str(flip.owner.unique_key),
"title": tmpl("spike_title.html", **ctx),
"message": tmpl("spike_description.html", **ctx),
"status": flip.new_status,
}
self.post(url, json=payload, headers=headers)
class LineNotify(HttpTransport):
URL = "https://notify-api.line.me/api/notify"
def notify(self, flip: Flip, notification: Notification) -> None:
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": "Bearer %s" % self.channel.linenotify_token,
}
ctx = {
"check": flip.owner,
"status": flip.new_status,
"ping": self.last_ping(flip),
}
msg = tmpl("linenotify_message.html", **ctx)
self.post(self.URL, headers=headers, params={"message": msg})
class SignalRateLimitFailure(TransportError):
def __init__(self, token: str, reply: bytes):
super().__init__("CAPTCHA proof required")
self.token = token
self.reply = reply
class Signal(Transport):
TIMEOUT = 60
class Result(BaseModel):
type: str
token: str | None = None
class Response(BaseModel):
results: list[Signal.Result]
class Data(BaseModel):
response: Signal.Response
class Error(BaseModel):
code: int
data: Signal.Data | None = None
class Reply(BaseModel):
id: str = ""
error: Signal.Error | None = None
def get_results(self) -> list[Signal.Result]:
assert self.error
if self.error.data is None:
return []
return self.error.data.response.results
def is_noop(self, status: str) -> bool:
if status == "down":
return not self.channel.phone.notify_down
else:
return not self.channel.phone.notify_up
@classmethod
def send(cls, recipient: str, message: str) -> None:
plaintext, styles = extract_signal_styles(message)
payload = {
"jsonrpc": "2.0",
"method": "send",
"params": {
"recipient": [recipient],
"message": plaintext,
"textStyle": styles,
},
"id": str(uuid.uuid4()),
}
payload_bytes = (json.dumps(payload) + "\n").encode()
for reply_bytes in cls._read_replies(payload_bytes):
try:
reply = Signal.Reply.model_validate_json(reply_bytes)
except ValidationError:
logger.error("unexpected signal-cli response: %s", reply_bytes)
raise TransportError("signal-cli call failed (unexpected response)")
if reply.id != payload["id"]:
continue
if reply.error is None:
break # success!
for result in reply.get_results():
if result.type == "UNREGISTERED_FAILURE":
raise TransportError("Recipient not found", permanent=True)
if result.type == "RATE_LIMIT_FAILURE" and result.token:
raise SignalRateLimitFailure(result.token, reply_bytes)
msg = f"signal-cli call failed ({reply.error.code})"
msg_with_reply = msg + "\n" + reply_bytes.decode()
# Include signal-cli reply in the message we log for ourselves
logger.error(msg_with_reply)
# Do not include signal-cli reply in the message we show to the user
raise TransportError(msg)
@classmethod
def _read_replies(cls, payload_bytes: bytes) -> Iterator[bytes]:
"""Send a request to signal-cli over UNIX socket. Read and yield replies.
This method:
* opens UNIX socket
* sends the request data (JSON RPC data encoded as bytes)
* reads newline-terminated responses and yields them
Individual sendall and recv operations have a timeout of 15 seconds.
This method also keeps track of total time spent in the method, and raises
an exception when the total time exceeds 15 seconds.
"""
if not settings.SIGNAL_CLI_SOCKET:
raise TransportError("Signal notifications are not enabled")
start = time.time()
address: str | tuple[str, int]
if ":" in settings.SIGNAL_CLI_SOCKET:
stype = socket.AF_INET
parts = settings.SIGNAL_CLI_SOCKET.split(":")
address = (parts[0], int(parts[1]))
else:
stype = socket.AF_UNIX
address = settings.SIGNAL_CLI_SOCKET
with socket.socket(stype, socket.SOCK_STREAM) as s:
s.settimeout(cls.TIMEOUT)
try:
s.connect(address)
s.sendall(payload_bytes)
s.shutdown(socket.SHUT_WR) # we are done sending
buffer = []
while True:
ch = s.recv(1)
buffer.append(ch)
if ch in (b"\n", b""):
yield b"".join(buffer)
buffer = []
if time.time() - start > cls.TIMEOUT:
raise TransportError("signal-cli call timed out")
except OSError as e:
msg = "signal-cli call failed (%s)" % e
# Log the exception, so any configured logging handlers can pick it up
logger.exception(msg)
# And then report it the same as other errors
raise TransportError(msg)
def notify(self, flip: Flip, notification: Notification) -> None:
if not settings.SIGNAL_CLI_SOCKET:
raise TransportError("Signal notifications are not enabled")
from hc.api.models import TokenBucket
if not TokenBucket.authorize_signal(self.channel.phone.value):
raise TransportError("Rate limit exceeded")
ctx = {
"check": flip.owner,
"status": flip.new_status,
"ping": self.last_ping(flip),
"down_checks": self.down_checks(flip.owner),
}
text = tmpl("signal_message.html", **ctx)
tries_left = 2
while True:
try:
return self.send(self.channel.phone.value, text)
except SignalRateLimitFailure as e:
self.channel.send_signal_captcha_alert(e.token, e.reply.decode())
plaintext, _ = extract_signal_styles(text)
self.channel.send_signal_rate_limited_notice(text, plaintext)
raise e
except TransportError as e:
tries_left -= 1
if e.permanent or tries_left == 0:
raise e
logger.debug("Retrying signal-cli call")
class Gotify(HttpTransport):
def notify(self, flip: Flip, notification: Notification) -> None:
base = self.channel.gotify.url
if not base.endswith("/"):
base += "/"
url = urljoin(base, "message")
url += "?" + urlencode({"token": self.channel.gotify.token})
ctx = {
"check": flip.owner,
"status": flip.new_status,
"ping": self.last_ping(flip),
"down_checks": self.down_checks(flip.owner),
}
payload = {
"title": tmpl("gotify_title.html", **ctx),
"message": tmpl("gotify_message.html", **ctx),
"extras": {
"client::display": {"contentType": "text/markdown"},
},
}
self.post(url, json=payload)
class Group(Transport):
def notify(self, flip: Flip, notification: Notification) -> None:
channels = self.channel.group_channels
# If notification's owner field is None then this is a test notification,
# and we should pass is_test=True to channel.notify() calls
is_test = notification.owner is None
error_count = 0
for channel in channels:
error = channel.notify(flip, is_test=is_test)
if error and error != "no-op":
error_count += 1
if error_count:
raise TransportError(
f"{error_count} out of {len(channels)} notifications failed"
)
class Ntfy(HttpTransport):
def priority(self, status: str) -> int:
if status == "up":
return self.channel.ntfy.priority_up
return self.channel.ntfy.priority
def is_noop(self, status: str) -> bool:
return self.priority(status) == 0
def notify(self, flip: Flip, notification: Notification) -> None:
ctx = {
"check": flip.owner,
"status": flip.new_status,
"ping": self.last_ping(flip),
"down_checks": self.down_checks(flip.owner),
}
payload = {
"topic": self.channel.ntfy.topic,
"priority": self.priority(flip.new_status),
"title": tmpl("ntfy_title.html", **ctx),
"message": tmpl("ntfy_message.html", **ctx),
"tags": ["red_circle" if flip.new_status == "down" else "green_circle"],
"actions": [
{
"action": "view",
"label": f"View on {settings.SITE_NAME}",
"url": flip.owner.cloaked_url(),
}
],
}
url = self.channel.ntfy.url
headers = {}
if self.channel.ntfy.token:
headers = {"Authorization": f"Bearer {self.channel.ntfy.token}"}
# Give up database connection before potentially long network IO:
close_old_connections()
self.post(url, headers=headers, json=payload)