0
0
Fork 0
mirror of https://github.com/healthchecks/healthchecks.git synced 2025-04-06 21:58:48 +00:00
healthchecks_healthchecks/hc/accounts/middleware.py
Pēteris Caune e048ec4c48
Fix "class Foo(object):" -> "class Foo:"
In Python 3 these are equivalent, and shorter is better.
2024-10-29 17:57:50 +02:00

83 lines
3.1 KiB
Python

from __future__ import annotations
from typing import Callable
from django.conf import settings
from django.contrib import auth
from django.contrib.auth.models import User
from django.core.exceptions import MiddlewareNotUsed
from django.http import HttpRequest, HttpResponse
from hc.accounts.models import Profile
MiddlewareFunc = Callable[[HttpRequest], HttpResponse]
class TeamAccessMiddleware:
def __init__(self, get_response: MiddlewareFunc) -> None:
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated:
return self.get_response(request)
setattr(request, "profile", Profile.objects.for_user(request.user))
return self.get_response(request)
class CustomHeaderMiddleware:
"""
Middleware for utilizing Web-server-provided authentication.
If request.user is not authenticated, then this middleware:
- looks for an email address in request.META[settings.REMOTE_USER_HEADER]
- looks up and automatically logs in the user with a matching email
"""
def __init__(self, get_response: MiddlewareFunc) -> None:
if not settings.REMOTE_USER_HEADER:
raise MiddlewareNotUsed()
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
assert settings.REMOTE_USER_HEADER
# Make sure AuthenticationMiddleware is installed
assert hasattr(request, "user")
email = request.META.get(settings.REMOTE_USER_HEADER)
if not email:
# If specified header doesn't exist or is empty then log out any
# authenticated user and return
if request.user.is_authenticated:
auth.logout(request)
return self.get_response(request)
# The email address from the external authentication system may be in
# in upper case or mixed case. Convert it to lower case, as we do
# elsewhere in the system (when registering new users, when inviting users
# into projects, when changing email address) to avoid naming conflicts.
email = email.lower()
# If the user is already authenticated and that user is the user we are
# getting passed in the headers, then the correct user is already
# persisted in the session and we don't need to continue.
if request.user.is_authenticated:
if request.user.email == email:
return self.get_response(request)
else:
# An authenticated user is associated with the request, but
# it does not match the authorized user in the header.
auth.logout(request)
# We are seeing this user for the first time in this session, attempt
# to authenticate the user.
if user := auth.authenticate(request, remote_user_email=email):
assert isinstance(user, User)
# User is valid. Set request.user and persist user in the session
# by logging the user in.
request.user = user
auth.login(request, user)
return self.get_response(request)