1
0
mirror of https://gitlab.com/bramw/baserow.git synced 2024-11-22 07:42:36 +00:00
bramw_baserow/backend/tests/baserow/api/jobs/test_jobs_views.py
2024-09-24 09:43:47 +00:00

507 lines
14 KiB
Python

import threading
from unittest.mock import patch
from django.urls import reverse
import pytest
from rest_framework.status import HTTP_200_OK, HTTP_400_BAD_REQUEST, HTTP_404_NOT_FOUND
from baserow.core.jobs.constants import JOB_CANCELLED
from baserow.core.jobs.models import Job
from baserow.core.jobs.registries import JobType
from baserow.core.jobs.tasks import run_async_job
from baserow.test_utils.helpers import is_dict_subset
@pytest.mark.django_db(transaction=True)
@patch("baserow.core.jobs.handler.run_async_job")
def test_create_job(mock_run_async, data_fixture, api_client):
data_fixture.register_temp_job_types()
user, token = data_fixture.create_user_and_token()
response = api_client.post(
reverse("api:jobs:list"),
{},
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json()["error"] == "ERROR_REQUEST_BODY_VALIDATION"
assert response.json() == {
"error": "ERROR_REQUEST_BODY_VALIDATION",
"detail": {
"type": [{"error": "This field is required.", "code": "required"}],
},
}
response = api_client.post(
reverse("api:jobs:list"),
{
"type": "tmp_job_type_1",
},
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json()["error"] == "ERROR_REQUEST_BODY_VALIDATION"
assert response.json() == {
"error": "ERROR_REQUEST_BODY_VALIDATION",
"detail": {
"test_request_field": [
{"error": "This field is required.", "code": "required"}
],
},
}
response = api_client.post(
reverse("api:jobs:list"),
{
"type": "tmp_job_type_1",
"test_request_field": "test",
},
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json()["error"] == "ERROR_REQUEST_BODY_VALIDATION"
assert response.json() == {
"error": "ERROR_REQUEST_BODY_VALIDATION",
"detail": {
"test_request_field": [
{"error": "A valid integer is required.", "code": "invalid"}
],
},
}
response = api_client.post(
reverse("api:jobs:list"),
{
"type": "tmp_job_type_3",
"test_request_field": 1,
},
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_404_NOT_FOUND
assert response.json()["error"] == "TEST_EXCEPTION"
response = api_client.post(
reverse("api:jobs:list"),
{
"type": "tmp_job_type_1",
"test_request_field": 1,
},
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
job = Job.objects.all().first()
assert response.json() == {
"id": job.id,
"type": "tmp_job_type_1",
"test_field": 42,
"state": "pending",
"progress_percentage": 0,
"human_readable_error": "",
}
mock_run_async.delay.assert_called()
response = api_client.post(
reverse("api:jobs:list"),
{
"type": "tmp_job_type_1",
"test_request_field": 1,
},
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json()["error"] == "ERROR_MAX_JOB_COUNT_EXCEEDED"
@pytest.mark.django_db
def test_list_jobs(data_fixture, api_client):
user, token = data_fixture.create_user_and_token()
job_1 = data_fixture.create_fake_job(user=user)
job_2 = data_fixture.create_fake_job(user=user, state="failed")
job_3 = data_fixture.create_fake_job()
url = reverse("api:jobs:list")
response = api_client.get(url, HTTP_AUTHORIZATION=f"JWT {token}")
job_1_json = {
"id": job_1.id,
"type": "tmp_job_type_1",
"progress_percentage": 0,
"state": "pending",
"human_readable_error": "",
"test_field": 42,
}
job_2_json = {
"id": job_2.id,
"type": "tmp_job_type_1",
"progress_percentage": 0,
"state": "failed",
"human_readable_error": "",
"test_field": 42,
}
assert response.status_code == HTTP_200_OK
assert response.json() == {"jobs": [job_2_json, job_1_json]}
valid_job_states = ",".join(["pending", "finished"])
response = api_client.get(
f"{url}?states={valid_job_states}",
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
assert response.json() == {"jobs": [job_1_json]}
response = api_client.get(
f"{url}?states=!finished",
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
assert response.json() == {"jobs": [job_2_json, job_1_json]}
response = api_client.get(
f"{url}?states=importing",
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json()["error"] == "ERROR_QUERY_PARAMETER_VALIDATION"
assert response.json() == {
"error": "ERROR_QUERY_PARAMETER_VALIDATION",
"detail": {
"states": [
{
"error": "State importing is not a valid state. "
"Valid states are: pending, finished, failed, "
"cancelled.",
"code": "invalid",
}
],
},
}
response = api_client.get(
f"{url}?job_ids={job_2.id},{job_3.id}",
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
assert response.json() == {"jobs": [job_2_json]}
response = api_client.get(
f"{url}?job_ids=invalid_job_id",
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json()["error"] == "ERROR_QUERY_PARAMETER_VALIDATION"
assert response.json() == {
"error": "ERROR_QUERY_PARAMETER_VALIDATION",
"detail": {
"job_ids": [
{
"error": "Job id invalid_job_id is not a valid integer.",
"code": "invalid",
}
],
},
}
@pytest.mark.django_db
def test_get_job(data_fixture, api_client):
user, token = data_fixture.create_user_and_token()
job_1 = data_fixture.create_fake_job(user=user)
job_2 = data_fixture.create_fake_job()
response = api_client.get(
reverse(
"api:jobs:item",
kwargs={"job_id": job_2.id},
),
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_404_NOT_FOUND
assert response.json()["error"] == "ERROR_JOB_DOES_NOT_EXIST"
response = api_client.get(
reverse(
"api:jobs:item",
kwargs={"job_id": job_1.id},
),
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
json = response.json()
assert json == {
"id": job_1.id,
"type": "tmp_job_type_1",
"progress_percentage": 0,
"state": "pending",
"human_readable_error": "",
"test_field": 42,
}
job_1.progress_percentage = 50
job_1.state = "failed"
job_1.human_readable_error = "Wrong"
job_1.save()
response = api_client.get(
reverse(
"api:jobs:item",
kwargs={"job_id": job_1.id},
),
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
json = response.json()
assert json == {
"id": job_1.id,
"type": "tmp_job_type_1",
"progress_percentage": 50,
"state": "failed",
"human_readable_error": "Wrong",
"test_field": 42,
}
@pytest.mark.django_db(transaction=True)
def test_cancel_job_running(
data_fixture,
api_client,
test_thread,
mutable_job_type_registry,
enable_locmem_testing,
):
# marker that the job started
m_start = threading.Event()
# marker for job to stop
m_set_stop = threading.Event()
# confirmation of m_set_stop
m_stop_wait = threading.Event()
# marker that job finished
m_end = threading.Event()
class IdlingJobType(JobType):
type = "idling_job_b"
model_class = Job
max_count = 1
def run(self, job, progress):
m_start.set()
progress.set_progress(1)
assert m_set_stop.wait(2)
progress.set_progress(2)
m_end.set()
progress.set_progress(3)
mutable_job_type_registry.register(IdlingJobType())
user, token = data_fixture.create_user_and_token()
with patch("baserow.core.jobs.tasks.run_async_job.delay"):
response = api_client.post(
reverse("api:jobs:list"),
{
"type": IdlingJobType.type,
},
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
resp = response.json()
job = Job.objects.get(id=resp["id"]).specific
assert job
assert job.id
assert job.specific
assert resp == {
"id": job.id,
"type": IdlingJobType.type,
"state": "pending",
"progress_percentage": 0,
"human_readable_error": "",
}
with test_thread(run_async_job.apply, args=(job.id,)) as t:
assert job.pending, job.get_cached_state()
t.start()
assert m_start.wait(0.5)
assert job.started, job.get_cached_state()
response = api_client.post(
reverse("api:jobs:cancel", args=(job.id,)),
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
resp = response.json()
assert is_dict_subset(
{"state": "cancelled", "id": job.id, "type": IdlingJobType.type}, resp
)
m_set_stop.set()
assert t.thread_stopped.wait(2)
job.refresh_from_db()
assert job.cancelled, job.get_cached_state()
assert not m_end.is_set()
assert response.status_code == HTTP_200_OK
job_data = response.json()
assert job_data["state"] == JOB_CANCELLED
@pytest.mark.django_db(transaction=True)
def test_cancel_job_pending(
data_fixture,
api_client,
test_thread,
mutable_job_type_registry,
enable_locmem_testing,
):
# marker that the job started
m_start = threading.Event()
# marker for job to stop
m_set_stop = threading.Event()
# marker that job finished
m_end = threading.Event()
class IdlingJobType(JobType):
type = "idling_job_b"
model_class = Job
max_count = 1
def run(self, job, progress):
m_start.set()
progress.set_progress(1)
assert m_set_stop.wait(1)
progress.set_progress(2)
m_end.set()
mutable_job_type_registry.register(IdlingJobType())
user, token = data_fixture.create_user_and_token()
with patch("baserow.core.jobs.tasks.run_async_job.delay"):
response = api_client.post(
reverse("api:jobs:list"),
{
"type": IdlingJobType.type,
},
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
resp = response.json()
job = Job.objects.get(id=resp["id"]).specific
assert job
assert job.id
assert job.specific
assert resp == {
"id": job.id,
"type": IdlingJobType.type,
"state": "pending",
"progress_percentage": 0,
"human_readable_error": "",
}
with test_thread(run_async_job.apply, args=(job.id,)) as t:
assert job.pending, job.get_cached_state()
response = api_client.post(
reverse("api:jobs:cancel", args=(job.id,)),
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
t.start()
assert job.cancelled, job.get_cached_state()
assert not m_start.is_set()
assert not m_end.is_set()
job_data = response.json()
assert job_data["state"] == JOB_CANCELLED
@pytest.mark.django_db(transaction=True)
def test_cancel_job_finished(
data_fixture,
api_client,
test_thread,
mutable_job_type_registry,
enable_locmem_testing,
):
# marker that the job started
m_start = threading.Event()
# marker for job to stop
m_set_stop = threading.Event()
# marker that job finished
m_end = threading.Event()
class IdlingJobType(JobType):
type = "idling_job_b"
model_class = Job
max_count = 1
def run(self, job, progress):
m_start.set()
progress.set_progress(1)
assert m_set_stop.wait(1)
progress.set_progress(2)
m_end.set()
mutable_job_type_registry.register(IdlingJobType())
user, token = data_fixture.create_user_and_token()
with patch("baserow.core.jobs.tasks.run_async_job.delay"):
response = api_client.post(
reverse("api:jobs:list"),
{
"type": IdlingJobType.type,
},
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_200_OK
resp = response.json()
job = Job.objects.get(id=resp["id"]).specific
assert job
assert job.id
assert job.specific
assert resp == {
"id": job.id,
"type": IdlingJobType.type,
"state": "pending",
"progress_percentage": 0,
"human_readable_error": "",
}
with test_thread(run_async_job.apply, args=(job.id,)) as t:
assert job.pending, job.get_cached_state()
t.start()
assert m_start.wait(0.1)
m_set_stop.set()
assert m_end.wait(0.1)
response = api_client.post(
reverse("api:jobs:cancel", args=(job.id,)),
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_400_BAD_REQUEST
resp = response.json()
assert resp.get("error") == "ERROR_JOB_NOT_CANCELLABLE"