mirror of
https://gitlab.com/bramw/baserow.git
synced 2024-11-25 08:47:54 +00:00
774 lines
26 KiB
Python
774 lines
26 KiB
Python
from datetime import datetime, timezone
|
|
from unittest.mock import patch
|
|
|
|
from django.contrib.auth import get_user_model
|
|
from django.shortcuts import reverse
|
|
|
|
import pytest
|
|
from freezegun import freeze_time
|
|
from rest_framework.status import (
|
|
HTTP_200_OK,
|
|
HTTP_204_NO_CONTENT,
|
|
HTTP_400_BAD_REQUEST,
|
|
HTTP_401_UNAUTHORIZED,
|
|
)
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
|
|
from baserow.core.handler import CoreHandler
|
|
from baserow.core.models import BlacklistedToken, Settings, UserLogEntry
|
|
from baserow.core.registries import Plugin, plugin_registry
|
|
from baserow.core.user.handler import UserHandler
|
|
from baserow.core.user.utils import generate_session_tokens_for_user
|
|
from baserow.core.utils import generate_hash
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_auth(api_client, data_fixture):
|
|
data_fixture.create_password_provider()
|
|
|
|
class TmpPlugin(Plugin):
|
|
type = "tmp_plugin"
|
|
called = False
|
|
|
|
def user_signed_in(self, user):
|
|
self.called = True
|
|
|
|
plugin_mock = TmpPlugin()
|
|
|
|
user = data_fixture.create_user(
|
|
email="test@test.nl", password="password", first_name="Test1", is_active=True
|
|
)
|
|
|
|
assert not user.last_login
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"), {"password": "password"}, format="json"
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["email"] == ["This field is required."]
|
|
|
|
# accept username for backward compatibility
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"username": "invalid_mail", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["username"] == ["Enter a valid email address."]
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "invalid_mail", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["email"] == ["Enter a valid email address."]
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "no_existing@test.nl", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert (
|
|
response_json["detail"] == "No active account found with the given credentials."
|
|
)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@test.nl", "password": "wrong_password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert (
|
|
response_json["detail"] == "No active account found with the given credentials."
|
|
)
|
|
|
|
with patch.dict(plugin_registry.registry, {"tmp": plugin_mock}):
|
|
with freeze_time("2020-01-01 12:00"):
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@test.nl", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
assert "access_token" in response_json
|
|
assert "refresh_token" in response_json
|
|
assert "user" in response_json
|
|
assert response_json["user"]["username"] == "test@test.nl"
|
|
assert response_json["user"]["first_name"] == "Test1"
|
|
assert response_json["user"]["id"] == user.id
|
|
assert response_json["user"]["is_staff"] is False
|
|
assert plugin_mock.called
|
|
|
|
user.refresh_from_db()
|
|
assert user.last_login == datetime(2020, 1, 1, 12, 00, tzinfo=timezone.utc)
|
|
|
|
logs = UserLogEntry.objects.all()
|
|
assert len(logs) == 1
|
|
assert logs[0].actor_id == user.id
|
|
assert logs[0].action == "SIGNED_IN"
|
|
assert logs[0].timestamp == datetime(2020, 1, 1, 12, 00, tzinfo=timezone.utc)
|
|
|
|
with freeze_time("2020-01-02 12:00"):
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": " teSt@teSt.nL ", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
assert "access_token" in response_json
|
|
assert "refresh_token" in response_json
|
|
assert "user" in response_json
|
|
assert response_json["user"]["username"] == "test@test.nl"
|
|
assert response_json["user"]["first_name"] == "Test1"
|
|
assert response_json["user"]["id"] == user.id
|
|
assert response_json["user"]["is_staff"] is False
|
|
|
|
user.refresh_from_db()
|
|
assert user.last_login == datetime(2020, 1, 2, 12, 0, tzinfo=timezone.utc)
|
|
|
|
data_fixture.create_user(
|
|
email="test2@test.nl", password="password", first_name="Test1", is_active=False
|
|
)
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test2@test.nl", "password": "wrong_password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert (
|
|
response_json["detail"] == "No active account found with the given credentials."
|
|
)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test2@test.nl", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response_json["error"] == "ERROR_DEACTIVATED_USER"
|
|
assert response_json["detail"] == "User account has been disabled."
|
|
|
|
# Check that a login cancel user deletion
|
|
user_to_be_deleted = data_fixture.create_user(
|
|
email="test3@test.nl", password="password", to_be_deleted=True
|
|
)
|
|
|
|
# check that the user cannot refresh the token if set to be deleted
|
|
refresh_token = str(RefreshToken.for_user(user_to_be_deleted))
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
response_json = response.json()
|
|
assert response_json["error"] == "ERROR_INVALID_REFRESH_TOKEN"
|
|
assert response_json["detail"] == "Refresh token is expired or invalid."
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test3@test.nl", "password": "password"},
|
|
format="json",
|
|
)
|
|
|
|
user_to_be_deleted.refresh_from_db()
|
|
assert user_to_be_deleted.profile.to_be_deleted is False
|
|
|
|
# check that now the user can refresh the token
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
response_json = response.json()
|
|
assert "access_token" in response_json
|
|
assert "user" in response_json
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_auth_email_verification_required(api_client, data_fixture):
|
|
data_fixture.create_password_provider()
|
|
user = data_fixture.create_user(email="test@example.com", password="password")
|
|
settings = CoreHandler().get_settings()
|
|
settings.email_verification = Settings.EmailVerificationOptions.ENFORCED
|
|
settings.save()
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@example.com", "password": "password"},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response_json["error"] == "ERROR_EMAIL_VERIFICATION_REQUIRED"
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_auth_email_verification_not_required(api_client, data_fixture):
|
|
data_fixture.create_password_provider()
|
|
user = data_fixture.create_user(email="test@example.com", password="password")
|
|
settings = CoreHandler().get_settings()
|
|
settings.email_verification = Settings.EmailVerificationOptions.RECOMMENDED
|
|
settings.save()
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@example.com", "password": "password"},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
assert "access_token" in response_json
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_auth_email_verification_not_required_staff(api_client, data_fixture):
|
|
data_fixture.create_password_provider()
|
|
user = data_fixture.create_user(
|
|
email="test@example.com", password="password", is_staff=True
|
|
)
|
|
settings = CoreHandler().get_settings()
|
|
settings.email_verification = Settings.EmailVerificationOptions.ENFORCED
|
|
settings.save()
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@example.com", "password": "password"},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
assert "access_token" in response_json
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_password_auth_disabled(api_client, data_fixture):
|
|
data_fixture.create_password_provider(enabled=False)
|
|
user, token = data_fixture.create_user_and_token(
|
|
email="test@localhost", password="test"
|
|
)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@localhost", "password": "test"},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response.json() == {
|
|
"error": "ERROR_AUTH_PROVIDER_DISABLED",
|
|
"detail": "Authentication provider is disabled.",
|
|
}
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_password_auth_disabled_superadmin(api_client, data_fixture):
|
|
data_fixture.create_password_provider(enabled=False)
|
|
user, token = data_fixture.create_user_and_token(
|
|
email="test@localhost", password="test", is_staff=True
|
|
)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@localhost", "password": "test"},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_200_OK
|
|
response_json = response.json()
|
|
assert "access_token" in response_json
|
|
assert "refresh_token" in response_json
|
|
assert "user" in response_json
|
|
assert response_json["user"]["id"] == user.id
|
|
assert response_json["user"]["is_staff"] is True
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_refresh(api_client, data_fixture):
|
|
class TmpPlugin(Plugin):
|
|
type = "tmp_plugin"
|
|
called = False
|
|
|
|
def user_signed_in(self, user):
|
|
self.called = True
|
|
|
|
plugin_mock = TmpPlugin()
|
|
|
|
user = data_fixture.create_user(
|
|
email="test@test.nl", password="password", first_name="Test1"
|
|
)
|
|
refresh_token = str(RefreshToken.for_user(user))
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"), {"token": "WRONG_TOKEN"}, format="json"
|
|
)
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
response_json = response.json()
|
|
assert response_json["error"] == "ERROR_INVALID_REFRESH_TOKEN"
|
|
assert response_json["detail"] == "Refresh token is expired or invalid."
|
|
|
|
# DEPRECATED: "token" as body param is deprecated, use "refresh_token" instead
|
|
with patch.dict(plugin_registry.registry, {"tmp": plugin_mock}):
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"token": refresh_token},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
response_json = response.json()
|
|
assert "access_token" in response_json
|
|
assert "user" in response_json
|
|
assert response_json["user"]["username"] == "test@test.nl"
|
|
assert response_json["user"]["first_name"] == "Test1"
|
|
assert response_json["user"]["id"] == user.id
|
|
assert response_json["user"]["is_staff"] is False
|
|
assert not plugin_mock.called
|
|
|
|
with patch.dict(plugin_registry.registry, {"tmp": plugin_mock}):
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
response_json = response.json()
|
|
assert "access_token" in response_json
|
|
assert "user" in response_json
|
|
assert response_json["user"]["username"] == "test@test.nl"
|
|
assert response_json["user"]["first_name"] == "Test1"
|
|
assert response_json["user"]["id"] == user.id
|
|
assert response_json["user"]["is_staff"] is False
|
|
assert not plugin_mock.called
|
|
|
|
with freeze_time("2019-01-01 12:00"):
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
json={"refresh_token": str(RefreshToken.for_user(user))},
|
|
)
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
response_json = response.json()
|
|
assert response_json["error"] == "ERROR_INVALID_REFRESH_TOKEN"
|
|
assert response_json["detail"] == "Refresh token is expired or invalid."
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_refresh_token_is_invalidated_after_password_change(api_client, data_fixture):
|
|
with freeze_time("2020-01-01 12:00"):
|
|
user = data_fixture.create_user(
|
|
email="test@test.nl",
|
|
password="password",
|
|
first_name="Test1",
|
|
is_active=True,
|
|
)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@test.nl", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
refresh_token = response_json["refresh_token"]
|
|
|
|
with freeze_time("2020-01-01 12:01"):
|
|
UserHandler().change_password(user, "password", "test1234")
|
|
|
|
with freeze_time("2020-01-01 12:02"):
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response_json["error"] == "ERROR_INVALID_REFRESH_TOKEN"
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_refresh_token_email_verification_required(api_client, data_fixture):
|
|
data_fixture.create_password_provider()
|
|
user = data_fixture.create_user(email="test@example.com", password="password")
|
|
|
|
# obtain refresh token
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@example.com", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
refresh_token = response_json["refresh_token"]
|
|
|
|
# change email verification setting
|
|
settings = CoreHandler().get_settings()
|
|
settings.email_verification = Settings.EmailVerificationOptions.ENFORCED
|
|
settings.save()
|
|
|
|
profile = user.profile
|
|
profile.email_verified = False
|
|
profile.save()
|
|
|
|
# using the refresh token is not possible any more
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response_json["error"] == "ERROR_EMAIL_VERIFICATION_REQUIRED"
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_refresh_token_email_verification_not_enforced(api_client, data_fixture):
|
|
data_fixture.create_password_provider()
|
|
user = data_fixture.create_user(email="test@example.com", password="password")
|
|
|
|
# obtain refresh token
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@example.com", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
refresh_token = response_json["refresh_token"]
|
|
|
|
# change email verification setting
|
|
settings = CoreHandler().get_settings()
|
|
settings.email_verification = Settings.EmailVerificationOptions.RECOMMENDED
|
|
settings.save()
|
|
|
|
profile = user.profile
|
|
profile.email_verified = False
|
|
profile.save()
|
|
|
|
# using the refresh token is possible
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_refresh_token_email_verification_not_required(api_client, data_fixture):
|
|
user = data_fixture.create_user(email="test@example.com", password="password")
|
|
|
|
# obtain refresh token
|
|
# the auth claim will not be set to password authentication
|
|
tokens = generate_session_tokens_for_user(
|
|
user, include_refresh_token=True, verified_email_claim=None
|
|
)
|
|
refresh_token = tokens["refresh_token"]
|
|
|
|
# change email verification setting
|
|
settings = CoreHandler().get_settings()
|
|
settings.email_verification = Settings.EmailVerificationOptions.ENFORCED
|
|
settings.save()
|
|
|
|
profile = user.profile
|
|
profile.email_verified = False
|
|
profile.save()
|
|
|
|
# using the refresh token is possible
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_refresh_token_email_verification_not_required_staff(api_client, data_fixture):
|
|
user = data_fixture.create_user(
|
|
email="test@example.com", password="password", is_staff=True
|
|
)
|
|
|
|
# obtain refresh token
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@example.com", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
refresh_token = response_json["refresh_token"]
|
|
|
|
# change email verification setting
|
|
settings = CoreHandler().get_settings()
|
|
settings.email_verification = Settings.EmailVerificationOptions.ENFORCED
|
|
settings.save()
|
|
|
|
profile = user.profile
|
|
profile.email_verified = False
|
|
profile.save()
|
|
|
|
# using the refresh token is possible
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_verify(api_client, data_fixture):
|
|
class TmpPlugin(Plugin):
|
|
type = "tmp_plugin"
|
|
called = False
|
|
|
|
def user_signed_in(self, user):
|
|
self.called = True
|
|
|
|
plugin_mock = TmpPlugin()
|
|
|
|
user = data_fixture.create_user(
|
|
email="test@test.nl", password="password", first_name="Test1"
|
|
)
|
|
refresh_token = str(RefreshToken.for_user(user))
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_verify"), {"token": "WRONG_TOKEN"}, format="json"
|
|
)
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
|
|
with patch.dict(plugin_registry.registry, {"tmp": plugin_mock}):
|
|
response = api_client.post(
|
|
reverse("api:user:token_verify"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
assert "user" in response_json
|
|
assert response_json["user"]["username"] == "test@test.nl"
|
|
assert response_json["user"]["first_name"] == "Test1"
|
|
assert response_json["user"]["id"] == user.id
|
|
assert response_json["user"]["is_staff"] is False
|
|
assert not plugin_mock.called
|
|
|
|
with freeze_time("2019-01-01 12:00"):
|
|
response = api_client.post(
|
|
reverse("api:user:token_verify"),
|
|
json={"refresh_token": str(RefreshToken.for_user(user))},
|
|
)
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_verify_email_verification_required(api_client, data_fixture):
|
|
data_fixture.create_password_provider()
|
|
user = data_fixture.create_user(email="test@example.com", password="password")
|
|
|
|
# obtain refresh token
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@example.com", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
refresh_token = response_json["refresh_token"]
|
|
|
|
# change email verification setting
|
|
settings = CoreHandler().get_settings()
|
|
settings.email_verification = Settings.EmailVerificationOptions.ENFORCED
|
|
settings.save()
|
|
|
|
profile = user.profile
|
|
profile.email_verified = False
|
|
profile.save()
|
|
|
|
# using the refresh token is not possible any more
|
|
response = api_client.post(
|
|
reverse("api:user:token_verify"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response_json["error"] == "ERROR_EMAIL_VERIFICATION_REQUIRED"
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_verify_email_verification_not_required(api_client, data_fixture):
|
|
user = data_fixture.create_user(email="test@example.com", password="password")
|
|
|
|
# obtain refresh token
|
|
# the auth claim will not be set to password authentication
|
|
tokens = generate_session_tokens_for_user(
|
|
user, include_refresh_token=True, verified_email_claim=None
|
|
)
|
|
refresh_token = tokens["refresh_token"]
|
|
|
|
# change email verification setting
|
|
settings = CoreHandler().get_settings()
|
|
settings.email_verification = Settings.EmailVerificationOptions.ENFORCED
|
|
settings.save()
|
|
|
|
profile = user.profile
|
|
profile.email_verified = False
|
|
profile.save()
|
|
|
|
# using the refresh token is possible
|
|
response = api_client.post(
|
|
reverse("api:user:token_verify"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_verify_email_verification_not_required_staff(api_client, data_fixture):
|
|
user = data_fixture.create_user(
|
|
email="test@example.com", password="password", is_staff=True
|
|
)
|
|
|
|
# obtain refresh token
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@example.com", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
refresh_token = response_json["refresh_token"]
|
|
|
|
# change email verification setting
|
|
settings = CoreHandler().get_settings()
|
|
settings.email_verification = Settings.EmailVerificationOptions.ENFORCED
|
|
settings.save()
|
|
|
|
profile = user.profile
|
|
profile.email_verified = False
|
|
profile.save()
|
|
|
|
# using the refresh token is possible
|
|
response = api_client.post(
|
|
reverse("api:user:token_verify"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_verify_token_is_invalidated_after_password_change(api_client, data_fixture):
|
|
with freeze_time("2020-01-01 12:00"):
|
|
user = data_fixture.create_user(
|
|
email="test@test.nl",
|
|
password="password",
|
|
first_name="Test1",
|
|
is_active=True,
|
|
)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@test.nl", "password": "password"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
refresh_token = response_json["refresh_token"]
|
|
|
|
with freeze_time("2020-01-01 12:01"):
|
|
UserHandler().change_password(user, "password", "test1234")
|
|
|
|
with freeze_time("2020-01-01 12:02"):
|
|
response = api_client.post(
|
|
reverse("api:user:token_verify"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response_json["error"] == "ERROR_INVALID_REFRESH_TOKEN"
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_blacklist(api_client, data_fixture):
|
|
user = data_fixture.create_user(
|
|
email="test@test.nl", password="password", first_name="Test1"
|
|
)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_blacklist"),
|
|
{"refresh_token": "INVALID_TOKEN"},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_blacklist"),
|
|
{},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
|
|
refresh_token = RefreshToken.for_user(user)
|
|
refresh_token_str = str(RefreshToken.for_user(user))
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_blacklist"),
|
|
{"refresh_token": refresh_token_str},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_204_NO_CONTENT
|
|
|
|
token = BlacklistedToken.objects.all().first()
|
|
|
|
assert refresh_token.payload["exp"] == token.expires_at.timestamp()
|
|
assert token.hashed_token == generate_hash(refresh_token_str)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"token": refresh_token_str},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response_json["error"] == "ERROR_INVALID_REFRESH_TOKEN"
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_verify"),
|
|
{"refresh_token": refresh_token_str},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response_json["error"] == "ERROR_INVALID_REFRESH_TOKEN"
|