mirror of
https://gitlab.com/bramw/baserow.git
synced 2025-04-07 22:35:36 +00:00
Resolve "Jobs get stuck when a SystemExit happens during the initialization phase (pending state)"
This commit is contained in:
parent
369330e71f
commit
622ba1f078
7 changed files with 87 additions and 19 deletions
|
@ -11,7 +11,7 @@ from django.utils import timezone
|
|||
from baserow.core.utils import Progress
|
||||
|
||||
from .cache import job_progress_key
|
||||
from .constants import JOB_FAILED
|
||||
from .constants import JOB_FAILED, JOB_PENDING
|
||||
from .exceptions import JobDoesNotExist, MaxJobCountExceeded
|
||||
from .models import Job
|
||||
from .registries import job_type_registry
|
||||
|
@ -174,7 +174,25 @@ class JobHandler:
|
|||
run_async_job(job.id)
|
||||
job.refresh_from_db()
|
||||
else:
|
||||
transaction.on_commit(lambda: run_async_job.delay(job.id))
|
||||
|
||||
# This wrapper ensure the job doesn't stay in pending state if something
|
||||
# goes wrong during the delay call. This is related to the redis connection
|
||||
# failure that triggers a sys.exit(1) to be called in gunicorn.
|
||||
def call_async_job_safe():
|
||||
try:
|
||||
run_async_job.delay(job.id)
|
||||
except BaseException as e:
|
||||
job.refresh_from_db()
|
||||
if job.state == JOB_PENDING:
|
||||
job.state = JOB_FAILED
|
||||
job.error = str(e)
|
||||
job.human_readable_error = (
|
||||
f"Something went wrong during the job({job.id}) execution."
|
||||
)
|
||||
job.save()
|
||||
raise
|
||||
|
||||
transaction.on_commit(call_async_job_safe)
|
||||
|
||||
return job
|
||||
|
||||
|
@ -201,7 +219,7 @@ class JobHandler:
|
|||
|
||||
(
|
||||
Job.objects.filter(created_on__lte=limit_date)
|
||||
.is_running()
|
||||
.is_pending_or_running()
|
||||
.update(
|
||||
state=JOB_FAILED,
|
||||
human_readable_error=(
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
from datetime import timedelta
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import transaction
|
||||
|
||||
from baserow.config.celery import app
|
||||
from baserow.core.jobs.registries import job_type_registry
|
||||
|
@ -25,11 +24,10 @@ def run_async_job(self, job_id: int):
|
|||
|
||||
from .cache import job_progress_key
|
||||
|
||||
with transaction.atomic():
|
||||
job = Job.objects.get(id=job_id).specific
|
||||
job_type = job_type_registry.get_by_model(job)
|
||||
job.state = JOB_STARTED
|
||||
job.save(update_fields=("state",))
|
||||
job = Job.objects.get(id=job_id).specific
|
||||
job_type = job_type_registry.get_by_model(job)
|
||||
job.state = JOB_STARTED
|
||||
job.save(update_fields=("state",))
|
||||
|
||||
try:
|
||||
with job_type.transaction_atomic_context(job):
|
||||
|
@ -39,7 +37,7 @@ def run_async_job(self, job_id: int):
|
|||
# Don't override the other properties that have been set during the
|
||||
# progress update.
|
||||
job.save(update_fields=("state",))
|
||||
except Exception as e:
|
||||
except BaseException as e: # We also want to catch SystemExit exception here.
|
||||
error = f"Something went wrong during the {job_type.type} job execution."
|
||||
|
||||
exception_mapping = {
|
||||
|
@ -70,10 +68,10 @@ def run_async_job(self, job_id: int):
|
|||
# Allow a job_type to modify job after an error
|
||||
job_type.on_error(job.specific, e)
|
||||
|
||||
raise e
|
||||
raise
|
||||
finally:
|
||||
# Delete the import job cached entry because the transaction has been committed
|
||||
# and the AirtableImportJob entry now contains the latest data.
|
||||
# and the Job entry now contains the latest data.
|
||||
cache.delete(job_progress_key(job.id))
|
||||
|
||||
|
||||
|
|
|
@ -114,6 +114,28 @@ def test_run_task_with_exception(mock_get_by_model, data_fixture):
|
|||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db(transaction=True)
|
||||
@patch("baserow.core.jobs.registries.JobTypeRegistry.get_by_model")
|
||||
def test_run_task_with_sytemexit(mock_get_by_model, data_fixture):
|
||||
job_type = TmpCustomJobType()
|
||||
# Simulate a SystemExit during the run.
|
||||
job_type.run = Mock(side_effect=SystemExit(-1))
|
||||
mock_get_by_model.return_value = job_type
|
||||
|
||||
job = data_fixture.create_fake_job()
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
run_async_job(job.id)
|
||||
|
||||
job.refresh_from_db()
|
||||
assert job.state == JOB_FAILED
|
||||
assert job.error == "-1"
|
||||
assert (
|
||||
job.human_readable_error
|
||||
== "Something went wrong during the custom_job_type job execution."
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db(transaction=True)
|
||||
@patch("baserow.core.jobs.registries.JobTypeRegistry.get_by_model")
|
||||
def test_run_task_failing_time_limit(mock_get_by_model, data_fixture):
|
||||
|
@ -185,14 +207,14 @@ def test_cleanup_file_import_job(storage_mock, data_fixture, settings):
|
|||
|
||||
assert Job.objects.count() == 8
|
||||
assert Job.objects.is_running().count() == 2
|
||||
assert Job.objects.is_finished().count() == 4
|
||||
assert Job.objects.is_pending_or_running().count() == 4
|
||||
assert Job.objects.is_finished().count() == 5
|
||||
assert Job.objects.is_pending_or_running().count() == 3
|
||||
|
||||
# Should delete the job that has been automatically expired by the previous cleanup
|
||||
with freeze_time(now):
|
||||
clean_up_jobs()
|
||||
|
||||
assert Job.objects.count() == 6
|
||||
assert Job.objects.count() == 5
|
||||
assert Job.objects.is_running().count() == 2
|
||||
assert Job.objects.is_finished().count() == 2
|
||||
assert Job.objects.is_pending_or_running().count() == 4
|
||||
assert Job.objects.is_pending_or_running().count() == 3
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import sys
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
@ -25,6 +26,26 @@ def test_create_and_start_job(mock_run_async_job, data_fixture):
|
|||
assert args[0][0] == job.id
|
||||
|
||||
|
||||
@pytest.mark.django_db(transaction=True)
|
||||
@patch("baserow.core.jobs.handler.run_async_job")
|
||||
def test_create_and_start_job_with_system_exit(mock_run_async_job, data_fixture):
|
||||
data_fixture.register_temp_job_types()
|
||||
|
||||
user = data_fixture.create_user()
|
||||
|
||||
# Simulate a SystemExit during the delay call
|
||||
mock_run_async_job.delay.side_effect = lambda x: sys.exit(-1)
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
JobHandler().create_and_start_job(user, "tmp_job_type_1")
|
||||
|
||||
job = Job.objects.first()
|
||||
assert job.user_id == user.id
|
||||
assert job.progress_percentage == 0
|
||||
assert job.state == "failed"
|
||||
assert job.error == "-1"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_exceeding_max_job_count(data_fixture):
|
||||
data_fixture.register_temp_job_types()
|
||||
|
|
|
@ -22,7 +22,7 @@ python3 -m pip install -r requirements.txt
|
|||
|
||||
### Add a new entry
|
||||
```shell
|
||||
python3 src/changelog.py add
|
||||
./src/changelog.py add
|
||||
```
|
||||
The command will ask you for the required information to create a new changelog entry.
|
||||
|
||||
|
@ -33,7 +33,7 @@ your workflow.
|
|||
|
||||
### Make a release
|
||||
```shell
|
||||
python3 src/changelog.py release <name-of-the-release>
|
||||
./src/changelog.py release <name-of-the-release>
|
||||
```
|
||||
|
||||
The command will do the following:
|
||||
|
@ -45,7 +45,7 @@ The command will do the following:
|
|||
## Additional commands
|
||||
### Purge
|
||||
```shell
|
||||
python3 src/changelog.py purge
|
||||
./src/changelog.py purge
|
||||
```
|
||||
|
||||
This command will delete:
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
{
|
||||
"type": "bug",
|
||||
"message": "Fix stuck jobs when error occured while in pending state",
|
||||
"issue_number": 1615,
|
||||
"bullet_points": [],
|
||||
"created_at": "2023-03-03"
|
||||
}
|
2
changelog/src/changelog.py
Normal file → Executable file
2
changelog/src/changelog.py
Normal file → Executable file
|
@ -1,3 +1,5 @@
|
|||
#!/bin/env python3
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
|
Loading…
Add table
Reference in a new issue