0
0
Fork 0
mirror of https://github.com/healthchecks/healthchecks.git synced 2025-04-11 15:51:19 +00:00

Improve type hints in management commands

This commit is contained in:
Pēteris Caune 2023-10-18 16:15:01 +03:00
parent e8be347d1a
commit 1aec03dfc6
No known key found for this signature in database
GPG key ID: E28D7679E9A9EDE2
8 changed files with 48 additions and 30 deletions

View file

@ -2,29 +2,25 @@ from __future__ import annotations
import signal
import time
from argparse import ArgumentParser
from types import FrameType
from typing import Any
from django.core.management.base import BaseCommand
from django.db.models import Q
from django.utils.timezone import now
from hc.accounts.models import NO_NAG, Profile
from hc.api.models import Check
def num_pinged_checks(profile):
q = Check.objects.filter(user_id=profile.user.id)
q = q.filter(last_ping__isnull=False)
return q.count()
class Command(BaseCommand):
help = "Send due monthly reports and nags"
tmpl = "Sent monthly report to %s"
def pause(self):
def pause(self) -> None:
time.sleep(3)
def add_arguments(self, parser):
def add_arguments(self, parser: ArgumentParser) -> None:
parser.add_argument(
"--loop",
action="store_true",
@ -94,12 +90,12 @@ class Command(BaseCommand):
return True
def on_signal(self, signum, frame):
def on_signal(self, signum: int, frame: FrameType | None) -> None:
desc = signal.strsignal(signum)
self.stdout.write(f"{desc}, finishing...\n")
self.shutdown = True
def handle(self, loop=False, *args, **options):
def handle(self, loop: bool, **options: Any) -> str:
self.shutdown = False
signal.signal(signal.SIGTERM, self.on_signal)
signal.signal(signal.SIGINT, self.on_signal)

View file

@ -1,5 +1,7 @@
from __future__ import annotations
from typing import Any
from django.conf import settings
from django.core.management.base import BaseCommand
from django.urls import reverse
@ -12,7 +14,7 @@ SETWEBHOOK_TMPL = "https://api.telegram.org/bot%s/setWebhook"
class Command(BaseCommand):
help = "Set up telegram bot's webhook address"
def handle(self, *args, **options):
def handle(self, **options: Any) -> str:
if settings.TELEGRAM_TOKEN is None:
return "Abort: settings.TELEGRAM_TOKEN is not set"

View file

@ -4,8 +4,13 @@ import email
import email.policy
import re
import time
from argparse import ArgumentParser
from email.message import EmailMessage
from io import TextIOBase
from typing import Any
from aiosmtpd.controller import Controller
from aiosmtpd.smtp import SMTP, Envelope, Session
from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand
from django.db import connection
@ -18,7 +23,7 @@ RE_UUID = re.compile(
)
def _match(subject, keywords):
def _match(subject: str, keywords: str) -> bool:
for s in keywords.split(","):
s = s.strip()
if s and s in subject:
@ -27,24 +32,26 @@ def _match(subject, keywords):
return False
def _to_text(message, with_subject, with_body):
def _to_text(message: EmailMessage, with_subject: bool, with_body: bool) -> str:
chunks = []
if with_subject:
chunks.append(message.get("subject", ""))
if with_body:
plain_mime_part = message.get_body(("plain",))
if plain_mime_part:
assert isinstance(plain_mime_part, EmailMessage)
chunks.append(plain_mime_part.get_content())
html_mime_part = message.get_body(("html",))
if html_mime_part:
assert isinstance(html_mime_part, EmailMessage)
html = html_mime_part.get_content()
chunks.append(html2text(html))
return "\n".join(chunks)
def _process_message(remote_addr, mailfrom, mailto, data):
def _process_message(remote_addr: str, mailfrom: str, mailto: str, data: bytes) -> str:
to_parts = mailto.split("@")
code = to_parts[0]
@ -66,6 +73,7 @@ def _process_message(remote_addr, mailfrom, mailto, data):
data_str = data.decode(errors="replace")
# Specify policy, the default policy does not decode encoded headers:
message = email.message_from_string(data_str, policy=email.policy.SMTP)
assert isinstance(message, EmailMessage)
text = _to_text(message, check.filter_subject, check.filter_body)
action = "ign"
@ -83,11 +91,14 @@ def _process_message(remote_addr, mailfrom, mailto, data):
class PingHandler:
def __init__(self, stdout):
def __init__(self, stdout: TextIOBase) -> None:
self.stdout = stdout
self.process_message = sync_to_async(_process_message)
async def handle_DATA(self, server, session, envelope):
async def handle_DATA(
self, server: SMTP, session: Session, envelope: Envelope
) -> str:
assert session.peer
remote_addr = session.peer[0]
mailfrom = envelope.mail_from
data = envelope.content
@ -101,7 +112,7 @@ class PingHandler:
class Command(BaseCommand):
help = "Listen for ping emails"
def add_arguments(self, parser):
def add_arguments(self, parser: ArgumentParser) -> None:
parser.add_argument(
"--host", help="ip address to listen on, default 0.0.0.0", default="0.0.0.0"
)
@ -109,7 +120,7 @@ class Command(BaseCommand):
"--port", help="port to listen on, default 25", type=int, default=25
)
def handle(self, host, port, *args, **options):
def handle(self, host: str, port: int, **options: Any) -> None:
handler = PingHandler(self.stdout)
controller = Controller(handler, hostname=host, port=port)
print(f"Starting SMTP listener on {host}:{port} ...")

View file

@ -2,6 +2,8 @@ from __future__ import annotations
import json
import uuid
from argparse import ArgumentParser
from typing import Any
from django.core.management.base import BaseCommand
@ -11,15 +13,14 @@ from hc.api.transports import Signal
class Command(BaseCommand):
help = "Submit Signal rate-limit challenge."
def add_arguments(self, parser):
def add_arguments(self, parser: ArgumentParser) -> None:
parser.add_argument("challenge", help="challenge token from Signal")
parser.add_argument(
"captcha",
help="solved CAPTCHA from https://signalcaptchas.org/challenge/generate.html",
)
def handle(self, challenge, captcha, *args, **options):
def handle(self, challenge: str, captcha: str, **options: Any) -> str:
payload = {
"jsonrpc": "2.0",
"method": "submitRateLimitChallenge",
@ -28,7 +29,7 @@ class Command(BaseCommand):
}
payload_bytes = (json.dumps(payload) + "\n").encode()
for reply_bytes in Signal(None)._read_replies(payload_bytes):
for reply_bytes in Signal._read_replies(payload_bytes):
try:
reply = json.loads(reply_bytes.decode())
except ValueError:
@ -36,3 +37,4 @@ class Command(BaseCommand):
if reply.get("id") == payload["id"]:
return reply_bytes.decode()
return ""

View file

@ -1113,7 +1113,7 @@ class Signal(Transport):
raise TransportError(f"signal-cli call failed ({code})")
@classmethod
def _read_replies(self, payload_bytes: bytes) -> Iterator[bytes]:
def _read_replies(cls, payload_bytes: bytes) -> Iterator[bytes]:
"""Send a request to signal-cli over UNIX socket. Read and yield replies.
This method:

View file

@ -1,6 +1,7 @@
from __future__ import annotations
import sqlite3
from typing import Any
from django.conf import settings
from django.core.management.base import BaseCommand
@ -12,7 +13,7 @@ from hc.lib.html import html2text
class Command(BaseCommand):
help = "Renders Markdown to HTML"
def handle(self, *args, **options):
def handle(self, **options: Any) -> None:
con = sqlite3.connect(settings.BASE_DIR / "search.db")
cur = con.cursor()
cur.execute("DROP TABLE IF EXISTS docs")

View file

@ -1,9 +1,14 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from django.core.management.base import BaseCommand
if TYPE_CHECKING:
from pygments import lexers
def _process(name, lexer):
def _process(name: str, lexer: lexers.Lexer) -> None:
from pygments import highlight
from pygments.formatters import HtmlFormatter
@ -19,8 +24,7 @@ def _process(name, lexer):
class Command(BaseCommand):
help = "Compiles snippets with Pygments"
def handle(self, *args, **options):
def handle(self, **options: Any) -> None:
try:
from pygments import lexers
except ImportError:

View file

@ -1,6 +1,8 @@
from __future__ import annotations
from importlib.util import find_spec
from pathlib import Path
from typing import Any
from django.conf import settings
from django.core.management.base import BaseCommand
@ -9,7 +11,7 @@ from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Renders Markdown to HTML"
def handle(self, *args, **options):
def handle(self, **options: Any) -> None:
for pkg in ("markdown", "pygments"):
if find_spec(pkg) is None:
self.stdout.write(f"This command requires the {pkg} package.")
@ -30,7 +32,7 @@ class Command(BaseCommand):
"codehilite": {"css_class": "highlight", "startinline": True}
}
def process_directory(path):
def process_directory(path: Path) -> None:
for src_path in path.glob("*.md"):
print(f"Rendering {src_path.name}")