mirror of
https://gitlab.com/bramw/baserow.git
synced 2024-11-22 07:42:36 +00:00
1199 lines
39 KiB
Python
Executable File
1199 lines
39 KiB
Python
Executable File
import os
|
|
from unittest.mock import patch
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.shortcuts import reverse
|
|
|
|
import pytest
|
|
from freezegun import freeze_time
|
|
from rest_framework.status import (
|
|
HTTP_200_OK,
|
|
HTTP_204_NO_CONTENT,
|
|
HTTP_400_BAD_REQUEST,
|
|
HTTP_401_UNAUTHORIZED,
|
|
HTTP_404_NOT_FOUND,
|
|
)
|
|
from rest_framework_simplejwt.settings import api_settings as jwt_settings
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
|
|
from baserow.api.user.registries import UserDataType, user_data_registry
|
|
from baserow.contrib.database.models import Database, Table
|
|
from baserow.core.handler import CoreHandler
|
|
from baserow.core.models import Workspace, WorkspaceUser
|
|
from baserow.core.user.handler import UserHandler
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_create_user(client, data_fixture):
|
|
data_fixture.create_password_provider()
|
|
valid_password = "thisIsAValidPassword"
|
|
short_password = "short"
|
|
response = client.post(
|
|
reverse("api:user:index"),
|
|
{"name": "Test1", "email": "test@test.nl", "password": valid_password},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
user = User.objects.get(email="test@test.nl")
|
|
assert user.first_name == "Test1"
|
|
assert user.email == "test@test.nl"
|
|
assert user.password != ""
|
|
assert "password" not in response_json["user"]
|
|
assert response_json["user"]["username"] == "test@test.nl"
|
|
assert response_json["user"]["first_name"] == "Test1"
|
|
assert response_json["user"]["is_staff"] is True
|
|
assert response_json["user"]["id"] == user.id
|
|
|
|
# Test profile properties
|
|
response = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1Bis",
|
|
"email": "test1bis@test.nl",
|
|
"password": valid_password,
|
|
"language": "fr",
|
|
},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
user = User.objects.get(email="test1bis@test.nl")
|
|
assert user.profile.language == "fr"
|
|
assert response_json["user"]["language"] == "fr"
|
|
|
|
response_failed = client.post(
|
|
reverse("api:user:index"),
|
|
{"name": "Test1", "email": "test@test.nl", "password": valid_password},
|
|
format="json",
|
|
)
|
|
assert response_failed.status_code == 400
|
|
assert response_failed.json()["error"] == "ERROR_EMAIL_ALREADY_EXISTS"
|
|
|
|
response_failed = client.post(
|
|
reverse("api:user:index"),
|
|
{"name": "Test1", "email": " teSt@teST.nl ", "password": valid_password},
|
|
format="json",
|
|
)
|
|
assert response_failed.status_code == 400
|
|
assert response_failed.json()["error"] == "ERROR_EMAIL_ALREADY_EXISTS"
|
|
|
|
too_long_name = "x" * 151
|
|
response_failed = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": too_long_name,
|
|
"email": "new@example.com ",
|
|
"password": valid_password,
|
|
},
|
|
format="json",
|
|
)
|
|
assert response_failed.status_code == 400
|
|
assert response_failed.json()["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert response_failed.json()["detail"] == {
|
|
"name": [
|
|
{
|
|
"code": "max_length",
|
|
"error": "Ensure this field has no more than 150 characters.",
|
|
}
|
|
]
|
|
}
|
|
|
|
data_fixture.update_settings(allow_new_signups=False)
|
|
response_failed = client.post(
|
|
reverse("api:user:index"),
|
|
{"name": "Test1", "email": "test10@test.nl", "password": valid_password},
|
|
format="json",
|
|
)
|
|
assert response_failed.status_code == 400
|
|
assert response_failed.json()["error"] == "ERROR_DISABLED_SIGNUP"
|
|
data_fixture.update_settings(allow_new_signups=True)
|
|
|
|
response_failed_2 = client.post(
|
|
reverse("api:user:index"), {"email": "test"}, format="json"
|
|
)
|
|
assert response_failed_2.status_code == 400
|
|
|
|
long_password = "x" * 256
|
|
response = client.post(
|
|
reverse("api:user:index"),
|
|
{"name": "Test2", "email": "test2@test.nl", "password": long_password},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
user = User.objects.get(email="test2@test.nl")
|
|
assert user.check_password(long_password)
|
|
|
|
long_password = "x" * 257
|
|
response = client.post(
|
|
reverse("api:user:index"),
|
|
{"name": "Test2", "email": "test2@test.nl", "password": long_password},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert (
|
|
response_json["detail"]["password"][0]["code"] == "password_validation_failed"
|
|
)
|
|
assert (
|
|
response_json["detail"]["password"][0]["error"]
|
|
== "This password is too long. It must not exceed 256 characters."
|
|
)
|
|
|
|
response = client.post(
|
|
reverse("api:user:index"),
|
|
{"name": "Test2", "email": "random@test.nl", "password": short_password},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert (
|
|
response_json["detail"]["password"][0]["code"] == "password_validation_failed"
|
|
)
|
|
assert (
|
|
response_json["detail"]["password"][0]["error"]
|
|
== "This password is too short. It must contain at least 8 characters."
|
|
)
|
|
|
|
# Test profile attribute errors
|
|
response_failed = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1",
|
|
"email": "test20@test.nl",
|
|
"password": valid_password,
|
|
"language": "invalid",
|
|
},
|
|
format="json",
|
|
)
|
|
response_json = response_failed.json()
|
|
assert response_failed.status_code == 400
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert response_json["detail"]["language"][0]["code"] == "invalid_language"
|
|
assert response_json["detail"]["language"][0]["error"] == (
|
|
"Only the following language keys are "
|
|
f"valid: {','.join([l[0] for l in settings.LANGUAGES])}"
|
|
)
|
|
|
|
# Test username with maximum length
|
|
long_username = "x" * 150
|
|
response = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": long_username,
|
|
"email": "longusername@email.com",
|
|
"password": "password",
|
|
},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
user = User.objects.get(email="longusername@email.com")
|
|
assert user.first_name == long_username
|
|
|
|
# Test username with length exceeding the maximum
|
|
even_longer_username = "x" * 200
|
|
response = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": even_longer_username,
|
|
"email": "evenlongerusername@email.com",
|
|
"password": "password",
|
|
},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response_failed.status_code == 400
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_user_account(data_fixture, api_client):
|
|
user, token = data_fixture.create_user_and_token(
|
|
email="test@localhost.nl", language="en", first_name="Nikolas"
|
|
)
|
|
|
|
response = api_client.patch(
|
|
reverse("api:user:account"),
|
|
{
|
|
"first_name": "NewOriginalName",
|
|
"language": "fr",
|
|
},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
|
|
assert response.status_code == HTTP_200_OK
|
|
assert response_json["first_name"] == "NewOriginalName"
|
|
assert response_json["language"] == "fr"
|
|
assert response_json["completed_onboarding"] is False
|
|
|
|
user.refresh_from_db()
|
|
assert user.first_name == "NewOriginalName"
|
|
assert user.profile.language == "fr"
|
|
|
|
response = api_client.patch(
|
|
reverse("api:user:account"),
|
|
{},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == 400
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert response_json["detail"]["non_field_errors"][0]["code"] == "invalid"
|
|
assert response_json["detail"]["non_field_errors"][0]["error"] == (
|
|
"At least one of the fields first_name, language, email_notification_frequency,"
|
|
" completed_onboarding must be provided."
|
|
)
|
|
|
|
response = api_client.patch(
|
|
reverse("api:user:account"),
|
|
{
|
|
"language": "invalid",
|
|
},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == 400
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert response_json["detail"]["language"][0]["code"] == "invalid_language"
|
|
assert response_json["detail"]["language"][0]["error"] == (
|
|
"Only the following language keys are "
|
|
f"valid: {','.join([l[0] for l in settings.LANGUAGES])}"
|
|
)
|
|
|
|
response = api_client.patch(
|
|
reverse("api:user:account"),
|
|
{
|
|
"email_notification_frequency": "invalid",
|
|
},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == 400
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert (
|
|
response_json["detail"]["email_notification_frequency"][0]["code"]
|
|
== "invalid_choice"
|
|
)
|
|
assert response_json["detail"]["email_notification_frequency"][0]["error"] == (
|
|
'"invalid" is not a valid choice.'
|
|
)
|
|
|
|
response = api_client.patch(
|
|
reverse("api:user:account"),
|
|
{
|
|
"email_notification_frequency": "daily",
|
|
},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == 200
|
|
assert response_json["first_name"] == "NewOriginalName"
|
|
assert response_json["language"] == "fr"
|
|
assert response_json["email_notification_frequency"] == "daily"
|
|
|
|
response = api_client.patch(
|
|
reverse("api:user:account"),
|
|
{
|
|
"completed_onboarding": True,
|
|
},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == 200
|
|
assert response_json["first_name"] == "NewOriginalName"
|
|
assert response_json["language"] == "fr"
|
|
assert response_json["email_notification_frequency"] == "daily"
|
|
assert response_json["completed_onboarding"] is True
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_create_user_with_invitation(data_fixture, client):
|
|
data_fixture.create_password_provider()
|
|
core_handler = CoreHandler()
|
|
valid_password = "thisIsAValidPassword"
|
|
invitation = data_fixture.create_workspace_invitation(email="test0@test.nl")
|
|
signer = core_handler.get_workspace_invitation_signer()
|
|
|
|
response_failed = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1",
|
|
"email": "test@test.nl",
|
|
"password": valid_password,
|
|
"workspace_invitation_token": "INVALID",
|
|
},
|
|
format="json",
|
|
)
|
|
assert response_failed.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_failed.json()["error"] == "BAD_TOKEN_SIGNATURE"
|
|
|
|
response_failed = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1",
|
|
"email": "test0@test.nl",
|
|
"password": valid_password,
|
|
"workspace_invitation_token": f"{signer.dumps(invitation.id)}2",
|
|
},
|
|
format="json",
|
|
)
|
|
assert response_failed.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_failed.json()["error"] == "BAD_TOKEN_SIGNATURE"
|
|
assert User.objects.all().count() == 1
|
|
|
|
response_failed = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1",
|
|
"email": "test@test.nl",
|
|
"password": valid_password,
|
|
"workspace_invitation_token": signer.dumps(99999),
|
|
},
|
|
format="json",
|
|
)
|
|
assert response_failed.status_code == HTTP_404_NOT_FOUND
|
|
assert response_failed.json()["error"] == "ERROR_GROUP_INVITATION_DOES_NOT_EXIST"
|
|
|
|
response_failed = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1",
|
|
"email": "test@test.nl",
|
|
"password": valid_password,
|
|
"workspace_invitation_token": signer.dumps(invitation.id),
|
|
},
|
|
format="json",
|
|
)
|
|
assert response_failed.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_failed.json()["error"] == "ERROR_GROUP_INVITATION_EMAIL_MISMATCH"
|
|
assert User.objects.all().count() == 1
|
|
|
|
response_failed = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1",
|
|
"email": "test0@test.nl",
|
|
"password": valid_password,
|
|
"workspace_invitation_token": signer.dumps(invitation.id),
|
|
},
|
|
format="json",
|
|
)
|
|
assert response_failed.status_code == HTTP_200_OK
|
|
assert User.objects.all().count() == 2
|
|
assert Workspace.objects.all().count() == 1
|
|
assert Workspace.objects.all().first().id == invitation.workspace_id
|
|
assert WorkspaceUser.objects.all().count() == 2
|
|
assert Database.objects.all().count() == 0
|
|
assert Table.objects.all().count() == 0
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_create_user_with_template(data_fixture, client):
|
|
data_fixture.create_password_provider()
|
|
old_templates = settings.APPLICATION_TEMPLATES_DIR
|
|
valid_password = "thisIsAValidPassword"
|
|
settings.APPLICATION_TEMPLATES_DIR = os.path.join(
|
|
settings.BASE_DIR, "../../../tests/templates"
|
|
)
|
|
template = data_fixture.create_template(slug="example-template")
|
|
|
|
response = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1",
|
|
"email": "test0@test.nl",
|
|
"password": valid_password,
|
|
"template_id": -1,
|
|
},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert response_json["detail"]["template_id"][0]["code"] == "does_not_exist"
|
|
|
|
response = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1",
|
|
"email": "test0@test.nl",
|
|
"password": valid_password,
|
|
"template_id": "random",
|
|
},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert response_json["detail"]["template_id"][0]["code"] == "incorrect_type"
|
|
|
|
response = client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1",
|
|
"email": "test0@test.nl",
|
|
"password": valid_password,
|
|
"template_id": template.id,
|
|
},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
assert Workspace.objects.all().count() == 2
|
|
assert WorkspaceUser.objects.all().count() == 1
|
|
# We expect the example template to be installed
|
|
assert Database.objects.all().count() == 1, Database.objects.values_list(
|
|
"name", flat=True
|
|
)
|
|
assert Database.objects.all().first().name == "Event marketing"
|
|
assert Table.objects.all().count() == 2
|
|
|
|
settings.APPLICATION_TEMPLATES_DIR = old_templates
|
|
|
|
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_send_reset_password_email(data_fixture, client, mailoutbox):
|
|
data_fixture.create_password_provider()
|
|
data_fixture.create_user(email="test@localhost.nl")
|
|
|
|
response = client.post(
|
|
reverse("api:user:send_reset_password_email"), {}, format="json"
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
|
|
response = client.post(
|
|
reverse("api:user:send_reset_password_email"),
|
|
{"email": "unknown@localhost.nl", "base_url": "http://localhost:3000"},
|
|
format="json",
|
|
)
|
|
assert response.status_code == 204
|
|
assert len(mailoutbox) == 0
|
|
|
|
response = client.post(
|
|
reverse("api:user:send_reset_password_email"),
|
|
{"email": "test@localhost.nl", "base_url": "http://test.nl"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_HOSTNAME_IS_NOT_ALLOWED"
|
|
assert len(mailoutbox) == 0
|
|
|
|
response = client.post(
|
|
reverse("api:user:send_reset_password_email"),
|
|
{"email": "test@localhost.nl", "base_url": "http://localhost:3000"},
|
|
format="json",
|
|
)
|
|
assert response.status_code == 204
|
|
assert len(mailoutbox) == 1
|
|
|
|
response = client.post(
|
|
reverse("api:user:send_reset_password_email"),
|
|
{"email": " teST@locAlhost.nl ", "base_url": "http://localhost:3000"},
|
|
format="json",
|
|
)
|
|
assert response.status_code == 204
|
|
assert len(mailoutbox) == 2
|
|
|
|
email = mailoutbox[0]
|
|
assert "test@localhost.nl" in email.to
|
|
assert email.body.index("http://localhost:3000")
|
|
|
|
user = data_fixture.create_user(is_staff=True)
|
|
CoreHandler().update_settings(user, allow_reset_password=False)
|
|
|
|
response = client.post(
|
|
reverse("api:user:send_reset_password_email"),
|
|
{"email": " teST@locAlhost.nl ", "base_url": "http://localhost:3000"},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_send_reset_password_email_password_auth_disabled(
|
|
api_client, data_fixture, mailoutbox
|
|
):
|
|
data_fixture.create_password_provider(enabled=False)
|
|
data_fixture.create_user(email="test@localhost.nl")
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:send_reset_password_email"),
|
|
{"email": "test@localhost.nl", "base_url": "http://localhost:3000"},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response.json() == {
|
|
"error": "ERROR_AUTH_PROVIDER_DISABLED",
|
|
"detail": "Authentication provider is disabled.",
|
|
}
|
|
assert len(mailoutbox) == 0
|
|
|
|
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_send_reset_password_email_password_auth_disabled_staff(
|
|
api_client, data_fixture, mailoutbox
|
|
):
|
|
data_fixture.create_password_provider(enabled=False)
|
|
data_fixture.create_user(email="test@localhost.nl", is_staff=True)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:send_reset_password_email"),
|
|
{"email": "test@localhost.nl", "base_url": "http://localhost:3000"},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_204_NO_CONTENT
|
|
assert len(mailoutbox) == 1
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_password_reset(data_fixture, client):
|
|
user = data_fixture.create_user(email="test@localhost")
|
|
handler = UserHandler()
|
|
valid_password = "thisIsAValidPassword"
|
|
short_password = "short"
|
|
long_password = (
|
|
"Bgvmt95en6HGJZ9Xz0F8xysQ6eYgo2Y54YzRPxxv10b5n16F4rZ6YH4ulonocwiFK6970KiAxoYhU"
|
|
"LYA3JFDPIQGj5gMZZl25M46sO810Zd3nyBg699a2TDMJdHG7hAAi0YeDnuHuabyBawnb4962OQ1OO"
|
|
"f1MxzFyNWG7NR2X6MZQL5G1V61x56lQTXbvK1AG1IPM87bQ3YAtIBtGT2vK3Wd83q3he5ezMtUfzK"
|
|
"2ibj0WWhf86DyQB4EHRUJjYcBiI78iEJv5hcu33X2I345YosO66cTBWK45SqJEDudrCOq"
|
|
)
|
|
signer = handler.get_reset_password_signer()
|
|
|
|
response = client.post(reverse("api:user:reset_password"), {}, format="json")
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
|
|
response = client.post(
|
|
reverse("api:user:reset_password"),
|
|
{"token": "test", "password": valid_password},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "BAD_TOKEN_SIGNATURE"
|
|
|
|
with freeze_time("2020-01-01 12:00"):
|
|
token = signer.dumps(user.id)
|
|
|
|
with freeze_time("2020-01-04 12:00"):
|
|
response = client.post(
|
|
reverse("api:user:reset_password"),
|
|
{"token": token, "password": valid_password},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "EXPIRED_TOKEN_SIGNATURE"
|
|
|
|
with freeze_time("2020-01-01 12:00"):
|
|
token = signer.dumps(9999)
|
|
|
|
with freeze_time("2020-01-02 12:00"):
|
|
response = client.post(
|
|
reverse("api:user:reset_password"),
|
|
{"token": token, "password": valid_password},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_USER_NOT_FOUND"
|
|
|
|
with freeze_time("2020-01-01 12:00"):
|
|
token = signer.dumps(user.id)
|
|
|
|
with freeze_time("2020-01-02 12:00"):
|
|
response = client.post(
|
|
reverse("api:user:reset_password"),
|
|
{"token": token, "password": valid_password},
|
|
format="json",
|
|
)
|
|
assert response.status_code == 204
|
|
|
|
user.refresh_from_db()
|
|
assert user.check_password(valid_password)
|
|
|
|
with freeze_time("2020-01-02 12:00"):
|
|
token = signer.dumps(user.id)
|
|
|
|
with freeze_time("2020-01-02 12:00"):
|
|
response = client.post(
|
|
reverse("api:user:reset_password"),
|
|
{"token": token, "password": short_password},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert (
|
|
response_json["detail"]["password"][0]["code"]
|
|
== "password_validation_failed"
|
|
)
|
|
assert (
|
|
response_json["detail"]["password"][0]["error"]
|
|
== "This password is too short. It must contain at least 8 characters."
|
|
)
|
|
|
|
user.refresh_from_db()
|
|
assert not user.check_password(short_password)
|
|
|
|
with freeze_time("2020-01-02 12:00"):
|
|
response = client.post(
|
|
reverse("api:user:reset_password"),
|
|
{"token": token, "password": long_password},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert (
|
|
response_json["detail"]["password"][0]["code"]
|
|
== "password_validation_failed"
|
|
)
|
|
assert (
|
|
response_json["detail"]["password"][0]["error"]
|
|
== "This password is too long. It must not exceed 256 characters."
|
|
)
|
|
|
|
user.refresh_from_db()
|
|
assert not user.check_password(long_password)
|
|
|
|
user = data_fixture.create_user(is_staff=True)
|
|
CoreHandler().update_settings(user, allow_reset_password=False)
|
|
|
|
response = client.post(
|
|
reverse("api:user:reset_password"),
|
|
{"token": token, "password": long_password},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
|
|
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_password_reset_email_verified_email(data_fixture, client, mailoutbox):
|
|
data_fixture.create_password_provider()
|
|
user = data_fixture.create_user()
|
|
handler = UserHandler()
|
|
signer = handler.get_reset_password_signer()
|
|
|
|
with freeze_time("2020-01-01 12:00"):
|
|
token = signer.dumps(user.id)
|
|
|
|
response = client.post(
|
|
reverse("api:user:reset_password"),
|
|
{"token": token, "password": "newpassword"},
|
|
format="json",
|
|
)
|
|
assert response.status_code == 204
|
|
|
|
user.refresh_from_db()
|
|
assert user.profile.email_verified is True
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_change_password(data_fixture, client):
|
|
data_fixture.create_password_provider()
|
|
valid_old_password = "thisIsAValidPassword"
|
|
valid_new_password = "thisIsAValidNewPassword"
|
|
short_password = "short"
|
|
long_password = (
|
|
"Bgvmt95en6HGJZ9Xz0F8xysQ6eYgo2Y54YzRPxxv10b5n16F4rZ6YH4ulonocwiFK6970KiAxoYhU"
|
|
"LYA3JFDPIQGj5gMZZl25M46sO810Zd3nyBg699a2TDMJdHG7hAAi0YeDnuHuabyBawnb4962OQ1OO"
|
|
"f1MxzFyNWG7NR2X6MZQL5G1V61x56lQTXbvK1AG1IPM87bQ3YAtIBtGT2vK3Wd83q3he5ezMtUfzK"
|
|
"2ibj0WWhf86DyQB4EHRUJjYcBiI78iEJv5hcu33X2I345YosO66cTBWK45SqJEDudrCOq"
|
|
)
|
|
user, token = data_fixture.create_user_and_token(
|
|
email="test@localhost", password=valid_old_password
|
|
)
|
|
|
|
response = client.post(
|
|
reverse("api:user:change_password"),
|
|
{},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
|
|
response = client.post(
|
|
reverse("api:user:change_password"),
|
|
{"old_password": "INCORRECT", "new_password": valid_new_password},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_INVALID_OLD_PASSWORD"
|
|
|
|
user.refresh_from_db()
|
|
assert user.check_password(valid_old_password)
|
|
|
|
response = client.post(
|
|
reverse("api:user:change_password"),
|
|
{"old_password": valid_old_password, "new_password": short_password},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert (
|
|
response_json["detail"]["new_password"][0]["code"]
|
|
== "password_validation_failed"
|
|
)
|
|
assert (
|
|
response_json["detail"]["new_password"][0]["error"]
|
|
== "This password is too short. It must contain at least 8 characters."
|
|
)
|
|
|
|
user.refresh_from_db()
|
|
assert user.check_password(valid_old_password)
|
|
|
|
response = client.post(
|
|
reverse("api:user:change_password"),
|
|
{"old_password": valid_old_password, "new_password": long_password},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_REQUEST_BODY_VALIDATION"
|
|
assert (
|
|
response_json["detail"]["new_password"][0]["code"]
|
|
== "password_validation_failed"
|
|
)
|
|
assert (
|
|
response_json["detail"]["new_password"][0]["error"]
|
|
== "This password is too long. It must not exceed 256 characters."
|
|
)
|
|
|
|
user.refresh_from_db()
|
|
assert user.check_password(valid_old_password)
|
|
|
|
response = client.post(
|
|
reverse("api:user:change_password"),
|
|
{"old_password": valid_old_password, "new_password": valid_new_password},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
assert response.status_code == 204
|
|
|
|
user.refresh_from_db()
|
|
assert user.check_password(valid_new_password)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_change_password_auth_disabled(api_client, data_fixture):
|
|
data_fixture.create_password_provider(enabled=False)
|
|
valid_old_password = "thisIsAValidPassword"
|
|
valid_new_password = "thisIsAValidNewPassword"
|
|
user, token = data_fixture.create_user_and_token(
|
|
email="test@localhost", password=valid_old_password
|
|
)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:change_password"),
|
|
{"old_password": valid_old_password, "new_password": valid_new_password},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response.json() == {
|
|
"error": "ERROR_AUTH_PROVIDER_DISABLED",
|
|
"detail": "Authentication provider is disabled.",
|
|
}
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_change_password_auth_disabled_staff(api_client, data_fixture):
|
|
data_fixture.create_password_provider(enabled=False)
|
|
valid_old_password = "thisIsAValidPassword"
|
|
valid_new_password = "thisIsAValidNewPassword"
|
|
user, token = data_fixture.create_user_and_token(
|
|
email="test@localhost", password=valid_old_password, is_staff=True
|
|
)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:change_password"),
|
|
{"old_password": valid_old_password, "new_password": valid_new_password},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
|
|
assert response.status_code == HTTP_204_NO_CONTENT
|
|
user.refresh_from_db()
|
|
assert user.check_password(valid_new_password)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_dashboard(data_fixture, client):
|
|
user, token = data_fixture.create_user_and_token(email="test@localhost")
|
|
workspace_1 = data_fixture.create_workspace(name="Test1")
|
|
workspace_2 = data_fixture.create_workspace()
|
|
invitation_1 = data_fixture.create_workspace_invitation(
|
|
workspace=workspace_1, email="test@localhost"
|
|
)
|
|
data_fixture.create_workspace_invitation(
|
|
workspace=workspace_1, email="test2@localhost"
|
|
)
|
|
data_fixture.create_workspace_invitation(
|
|
workspace=workspace_2, email="test3@localhost"
|
|
)
|
|
|
|
response = client.get(
|
|
reverse("api:user:dashboard"), format="json", HTTP_AUTHORIZATION=f"JWT {token}"
|
|
)
|
|
response_json = response.json()
|
|
assert len(response_json["workspace_invitations"]) == 1
|
|
assert response_json["workspace_invitations"][0]["id"] == invitation_1.id
|
|
assert response_json["workspace_invitations"][0]["email"] == invitation_1.email
|
|
assert response_json["workspace_invitations"][0]["invited_by"] == (
|
|
invitation_1.invited_by.first_name
|
|
)
|
|
assert response_json["workspace_invitations"][0]["workspace"] == "Test1"
|
|
assert response_json["workspace_invitations"][0]["message"] == invitation_1.message
|
|
assert "created_on" in response_json["workspace_invitations"][0]
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_additional_user_data(api_client, data_fixture):
|
|
data_fixture.create_password_provider()
|
|
|
|
class TmpUserDataType(UserDataType):
|
|
type = "type"
|
|
|
|
def get_user_data(self, user, request) -> dict:
|
|
return True
|
|
|
|
plugin_mock = TmpUserDataType()
|
|
with patch.dict(user_data_registry.registry, {"tmp": plugin_mock}):
|
|
response = api_client.post(
|
|
reverse("api:user:index"),
|
|
{
|
|
"name": "Test1",
|
|
"email": "test@test.nl",
|
|
"password": "thisIsAValidPassword",
|
|
"authenticate": True,
|
|
},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
assert response_json["tmp"] is True
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_auth"),
|
|
{"email": "test@test.nl", "password": "thisIsAValidPassword"},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
assert response_json["tmp"] is True
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": response_json["refresh_token"]},
|
|
format="json",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_200_OK
|
|
assert response_json["tmp"] is True
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_schedule_user_deletion(client, data_fixture):
|
|
valid_password = "aValidPassword"
|
|
user, token = data_fixture.create_user_and_token(
|
|
email="test@localhost", password=valid_password, is_staff=True
|
|
)
|
|
|
|
response = client.post(
|
|
reverse("api:user:schedule_account_deletion"),
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
response_json = response.json()
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
assert response_json["error"] == "ERROR_USER_IS_LAST_ADMIN"
|
|
|
|
# Creates a new staff user
|
|
data_fixture.create_user(email="test2@localhost", is_staff=True)
|
|
|
|
response = client.post(
|
|
reverse("api:user:schedule_account_deletion"),
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
assert response.status_code == HTTP_204_NO_CONTENT
|
|
|
|
user.refresh_from_db()
|
|
assert user.profile.to_be_deleted is True
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_token_error_if_user_deleted_or_disabled(api_client, data_fixture):
|
|
user, _ = data_fixture.create_user_and_token(
|
|
email="test@localhost", password="test"
|
|
)
|
|
refresh_token = data_fixture.generate_refresh_token(user)
|
|
|
|
# deactivate user and see that token is no longer valid
|
|
user.is_active = False
|
|
user.save()
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response.json() == {
|
|
"error": "ERROR_INVALID_REFRESH_TOKEN",
|
|
"detail": "Refresh token is expired or invalid.",
|
|
}
|
|
|
|
# reactivate the user
|
|
user.is_active = True
|
|
user.save()
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
|
|
# schedule the user for deletion and see that token is no longer valid again
|
|
user.profile.to_be_deleted = True
|
|
user.profile.save()
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": refresh_token},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response.json() == {
|
|
"error": "ERROR_INVALID_REFRESH_TOKEN",
|
|
"detail": "Refresh token is expired or invalid.",
|
|
}
|
|
|
|
# remove the user_id from the token
|
|
token_object = RefreshToken(refresh_token)
|
|
del token_object.payload[jwt_settings.USER_ID_CLAIM]
|
|
response = api_client.post(
|
|
reverse("api:user:token_refresh"),
|
|
{"refresh_token": str(token_object)},
|
|
format="json",
|
|
)
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response.json() == {
|
|
"error": "ERROR_INVALID_REFRESH_TOKEN",
|
|
"detail": "Refresh token is expired or invalid.",
|
|
}
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_create_user_password_auth_disabled(api_client, data_fixture):
|
|
data_fixture.create_password_provider(enabled=False)
|
|
user, token = data_fixture.create_user_and_token(
|
|
email="test@localhost", password="test"
|
|
)
|
|
|
|
response = api_client.post(
|
|
reverse("api:user:index"),
|
|
{"name": "Test1", "email": "test@test.nl", "password": "thisIsAValidPassword"},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_401_UNAUTHORIZED
|
|
assert response.json() == {
|
|
"error": "ERROR_AUTH_PROVIDER_DISABLED",
|
|
"detail": "Authentication provider is disabled.",
|
|
}
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_verify_email_address_with_login(client, data_fixture):
|
|
user = data_fixture.create_user()
|
|
token = UserHandler().create_email_verification_token(user)
|
|
|
|
response = client.post(
|
|
reverse("api:user:verify_email"),
|
|
{"token": token},
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert "token" in response_json
|
|
assert "refresh_token" in response_json
|
|
assert "user" in response_json
|
|
assert response_json["user"]["username"] == user.email
|
|
assert response.status_code == HTTP_200_OK
|
|
user.refresh_from_db()
|
|
assert user.profile.email_verified is True
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_verify_email_address_no_login(client, data_fixture):
|
|
user, jwt_token = data_fixture.create_user_and_token()
|
|
token = UserHandler().create_email_verification_token(user)
|
|
|
|
response = client.post(
|
|
reverse("api:user:verify_email"),
|
|
{"token": token},
|
|
HTTP_AUTHORIZATION=f"JWT {jwt_token}",
|
|
format="json",
|
|
)
|
|
|
|
response_json = response.json()
|
|
assert response_json["email"] == user.email
|
|
assert response.status_code == HTTP_200_OK
|
|
user.refresh_from_db()
|
|
assert user.profile.email_verified is True
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_verify_email_address_already_verified(client, data_fixture):
|
|
user = data_fixture.create_user()
|
|
profile = user.profile
|
|
profile.email_verified = True
|
|
profile.save()
|
|
token = UserHandler().create_email_verification_token(user)
|
|
|
|
response = client.post(
|
|
reverse("api:user:verify_email"),
|
|
{"token": token},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_verify_email_address_inactive_user(client, data_fixture):
|
|
user = data_fixture.create_user(is_active=False)
|
|
token = UserHandler().create_email_verification_token(user)
|
|
|
|
response = client.post(
|
|
reverse("api:user:verify_email"),
|
|
{"token": token},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_400_BAD_REQUEST
|
|
|
|
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_send_verify_email_address(client, data_fixture, mailoutbox):
|
|
user, token = data_fixture.create_user_and_token()
|
|
|
|
response = client.post(
|
|
reverse("api:user:send_verify_email"),
|
|
{
|
|
"email": user.email,
|
|
},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_204_NO_CONTENT
|
|
assert len(mailoutbox) == 1
|
|
|
|
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_send_verify_email_address_user_not_found(client, data_fixture, mailoutbox):
|
|
response = client.post(
|
|
reverse("api:user:send_verify_email"),
|
|
{
|
|
"email": "doesntexist@example.com",
|
|
},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_204_NO_CONTENT
|
|
assert len(mailoutbox) == 0
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_send_verify_email_address_already_verified(client, data_fixture, mailoutbox):
|
|
user, token = data_fixture.create_user_and_token()
|
|
profile = user.profile
|
|
profile.email_verified = True
|
|
profile.save()
|
|
|
|
response = client.post(
|
|
reverse("api:user:send_verify_email"),
|
|
{
|
|
"email": user.email,
|
|
},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_204_NO_CONTENT
|
|
assert len(mailoutbox) == 0
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_send_verify_email_address_inactive_user(client, data_fixture, mailoutbox):
|
|
user, token = data_fixture.create_user_and_token(is_active=False)
|
|
|
|
response = client.post(
|
|
reverse("api:user:send_verify_email"),
|
|
{
|
|
"email": user.email,
|
|
},
|
|
format="json",
|
|
)
|
|
|
|
assert response.status_code == HTTP_204_NO_CONTENT
|
|
assert len(mailoutbox) == 0
|
|
|
|
|
|
@pytest.mark.django_db
|
|
@patch("baserow.core.user.handler.share_onboarding_details_with_baserow")
|
|
def test_share_onboarding_details_with_baserow(mock_task, client, data_fixture):
|
|
data_fixture.update_settings(instance_id="1")
|
|
user, token = data_fixture.create_user_and_token()
|
|
|
|
response = client.post(
|
|
reverse("api:user:share_onboarding_details_with_baserow"),
|
|
{
|
|
"team": "Marketing",
|
|
"role": "CEO",
|
|
"size": "11 - 50",
|
|
"country": "The Netherlands",
|
|
},
|
|
format="json",
|
|
HTTP_AUTHORIZATION=f"JWT {token}",
|
|
)
|
|
|
|
assert response.status_code == HTTP_204_NO_CONTENT
|
|
|
|
mock_task.delay.assert_called_with(
|
|
email=user.email,
|
|
team="Marketing",
|
|
role="CEO",
|
|
size="11 - 50",
|
|
country="The Netherlands",
|
|
)
|