mirror of
https://gitlab.com/bramw/baserow.git
synced 2025-04-07 14:25:37 +00:00
Periodic field update tasks Improvements
This commit is contained in:
parent
32a9ed3395
commit
d3fba862ab
8 changed files with 210 additions and 86 deletions
backend/src/baserow/contrib/database
fields
management/commands
search
views
|
@ -18,6 +18,7 @@ from baserow.contrib.database.fields.dependencies.exceptions import (
|
|||
from baserow.contrib.database.fields.field_cache import FieldCache
|
||||
from baserow.contrib.database.fields.models import Field, LinkRowField
|
||||
from baserow.contrib.database.fields.registries import FieldType, field_type_registry
|
||||
from baserow.contrib.database.table.models import Table
|
||||
from baserow.core.db import specific_iterator
|
||||
from baserow.core.handler import CoreHandler
|
||||
from baserow.core.models import Workspace
|
||||
|
@ -112,7 +113,8 @@ class FieldDependencyHandler:
|
|||
}
|
||||
relationship_table = FieldDependency._meta.db_table
|
||||
linkrowfield_table = LinkRowField._meta.db_table
|
||||
field_table = Field._meta.db_table
|
||||
fields_db_table_name = Field._meta.db_table
|
||||
tables_db_table_name = Table._meta.db_table
|
||||
|
||||
if associated_relations_changed:
|
||||
associated_relations_changed_query = f"""
|
||||
|
@ -128,34 +130,33 @@ class FieldDependencyHandler:
|
|||
associated_relations_changed_query = ""
|
||||
|
||||
if database_id_prefilter:
|
||||
# Override the `relationship_table` with a CTE that only selects the
|
||||
# dependencies within the provided database. This will significantly speed
|
||||
# up performance because it doesn't need to filter through all the
|
||||
# dependencies. This improvement is noticeable on large Baserow instances
|
||||
# with many users.
|
||||
# Filters tbe dependencies to only include fields in the provided database.
|
||||
# This will significantly speed up performance because it doesn't need to
|
||||
# filter through all the dependencies. This improvement is noticeable on
|
||||
# large Baserow instances with many users.
|
||||
database_prefilter_query = f"""
|
||||
WITH {relationship_table} AS (
|
||||
SELECT database_fielddependency.id,
|
||||
database_fielddependency.dependant_id,
|
||||
database_fielddependency.dependency_id,
|
||||
database_fielddependency.via_id
|
||||
FROM database_fielddependency
|
||||
INNER JOIN database_field
|
||||
ON ( database_fielddependency.dependant_id =
|
||||
database_field.id )
|
||||
INNER JOIN database_table
|
||||
ON ( database_field.table_id = database_table.id )
|
||||
WHERE database_table.database_id = %(database_id)s
|
||||
)
|
||||
""" # nosec b608
|
||||
INNER JOIN {fields_db_table_name}
|
||||
ON ( {relationship_table}.dependant_id = {fields_db_table_name}.id )
|
||||
INNER JOIN {tables_db_table_name}
|
||||
ON ( {fields_db_table_name}.table_id = {tables_db_table_name}.id )
|
||||
WHERE {tables_db_table_name}.database_id = %(database_id)s
|
||||
"""
|
||||
else:
|
||||
database_prefilter_query = ""
|
||||
|
||||
relationship_table_cte = f"""
|
||||
SELECT {relationship_table}.id,
|
||||
{relationship_table}.dependant_id,
|
||||
{relationship_table}.dependency_id,
|
||||
{relationship_table}.via_id
|
||||
FROM {relationship_table} {database_prefilter_query}
|
||||
""" # nosec b608
|
||||
|
||||
# Raw query that traverses through the dependencies, and will find the
|
||||
# dependants of the provided fields ids recursively.
|
||||
raw_query = f"""
|
||||
WITH RECURSIVE traverse(id, dependency_ids, via_ids, depth) AS (
|
||||
{database_prefilter_query}
|
||||
WITH relationship_table_cte AS ( {relationship_table_cte} )
|
||||
SELECT
|
||||
first.dependant_id,
|
||||
first.dependency_id::text,
|
||||
|
@ -176,14 +177,14 @@ class FieldDependencyHandler:
|
|||
ELSE ''
|
||||
END as via_id,
|
||||
1
|
||||
FROM {relationship_table} AS first
|
||||
LEFT OUTER JOIN {relationship_table} AS second
|
||||
FROM relationship_table_cte AS first
|
||||
LEFT OUTER JOIN relationship_table_cte AS second
|
||||
ON first.dependant_id = second.dependency_id
|
||||
LEFT OUTER JOIN {linkrowfield_table} as linkrowfield
|
||||
ON first.via_id = linkrowfield.field_ptr_id
|
||||
LEFT OUTER JOIN {field_table} as dependant
|
||||
LEFT OUTER JOIN {fields_db_table_name} as dependant
|
||||
ON first.dependant_id = dependant.id
|
||||
LEFT OUTER JOIN {field_table} as dependency
|
||||
LEFT OUTER JOIN {fields_db_table_name} as dependency
|
||||
ON first.dependency_id = dependency.id
|
||||
WHERE
|
||||
first.dependency_id = ANY(%(pks)s)
|
||||
|
@ -198,8 +199,8 @@ class FieldDependencyHandler:
|
|||
coalesce(concat_ws('|', via_ids::text, via_id::text), ''),
|
||||
traverse.depth + 1
|
||||
FROM traverse
|
||||
INNER JOIN {relationship_table}
|
||||
ON {relationship_table}.dependency_id = traverse.id
|
||||
INNER JOIN relationship_table_cte
|
||||
ON relationship_table_cte.dependency_id = traverse.id
|
||||
WHERE 1 = 1
|
||||
-- LIMITING_FK_EDGES_CLAUSE_2
|
||||
-- DISALLOWED_ANCESTORS_NODES_CLAUSE_2
|
||||
|
@ -214,7 +215,7 @@ class FieldDependencyHandler:
|
|||
field.table_id,
|
||||
MAX(depth) as depth
|
||||
FROM traverse
|
||||
LEFT OUTER JOIN {field_table} as field
|
||||
LEFT OUTER JOIN {fields_db_table_name} as field
|
||||
ON traverse.id = field.id
|
||||
WHERE depth <= %(max_depth)s
|
||||
GROUP BY traverse.id, traverse.via_ids, field.content_type_id, field.name, field.table_id
|
||||
|
@ -222,7 +223,6 @@ class FieldDependencyHandler:
|
|||
""" # nosec b608
|
||||
|
||||
queryset = FieldDependency.objects.raw(raw_query, query_parameters)
|
||||
|
||||
link_row_field_content_type = ContentType.objects.get_for_model(LinkRowField)
|
||||
fields_to_fetch = set()
|
||||
fields_in_cache = {}
|
||||
|
|
|
@ -5085,6 +5085,8 @@ class FormulaFieldType(FormulaFieldTypeArrayFilterSupport, ReadOnlyFieldType):
|
|||
field_cache: "Optional[FieldCache]" = None,
|
||||
via_path_to_starting_table: Optional[List[LinkRowField]] = None,
|
||||
already_updated_fields: Optional[List[Field]] = None,
|
||||
skip_search_updates: bool = False,
|
||||
database_id: Optional[int] = None,
|
||||
):
|
||||
from baserow.contrib.database.fields.dependencies.update_collector import (
|
||||
FieldUpdateCollector,
|
||||
|
@ -5113,15 +5115,18 @@ class FormulaFieldType(FormulaFieldTypeArrayFilterSupport, ReadOnlyFieldType):
|
|||
|
||||
for update_collector in update_collectors.values():
|
||||
updated_fields |= set(
|
||||
update_collector.apply_updates_and_get_updated_fields(field_cache)
|
||||
update_collector.apply_updates_and_get_updated_fields(
|
||||
field_cache, skip_search_updates=skip_search_updates
|
||||
)
|
||||
)
|
||||
|
||||
all_dependent_fields_grouped_by_depth = FieldDependencyHandler.group_all_dependent_fields_by_level_from_fields(
|
||||
fields,
|
||||
field_cache,
|
||||
associated_relations_changed=False,
|
||||
# We can't provide the `database_id_prefilter` here because the fields
|
||||
# can belong in different databases.
|
||||
all_dependent_fields_grouped_by_depth = (
|
||||
FieldDependencyHandler.group_all_dependent_fields_by_level_from_fields(
|
||||
fields,
|
||||
field_cache,
|
||||
associated_relations_changed=False,
|
||||
database_id_prefilter=database_id,
|
||||
)
|
||||
)
|
||||
for dependant_fields_group in all_dependent_fields_grouped_by_depth:
|
||||
for table_id, dependant_field in dependant_fields_group:
|
||||
|
@ -5136,7 +5141,9 @@ class FormulaFieldType(FormulaFieldTypeArrayFilterSupport, ReadOnlyFieldType):
|
|||
via_path_to_starting_table,
|
||||
)
|
||||
updated_fields |= set(
|
||||
update_collector.apply_updates_and_get_updated_fields(field_cache)
|
||||
update_collector.apply_updates_and_get_updated_fields(
|
||||
field_cache, skip_search_updates=skip_search_updates
|
||||
)
|
||||
)
|
||||
|
||||
update_collector.send_force_refresh_signals_for_all_updated_tables()
|
||||
|
|
|
@ -1311,6 +1311,8 @@ class FieldType(
|
|||
field_cache: "Optional[FieldCache]" = None,
|
||||
via_path_to_starting_table: Optional[List[LinkRowField]] = None,
|
||||
already_updated_fields: Optional[List[Field]] = None,
|
||||
skip_search_updates: bool = False,
|
||||
database_id: Optional[int] = None,
|
||||
):
|
||||
"""
|
||||
This method is called periodically for all the fields of the same type
|
||||
|
@ -1327,6 +1329,10 @@ class FieldType(
|
|||
:param already_updated_fields: A list of fields that have already been updated
|
||||
before. That can happen because it was a dependency of another field for
|
||||
example.
|
||||
:param skip_search_updates: If True, search index updates should be skipped.
|
||||
:param database_id: The id of the database that the fields belong to. If not
|
||||
provided, it means fields might belong to different databases and so we
|
||||
should not assume they belong to the same database.
|
||||
"""
|
||||
|
||||
return already_updated_fields
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
import itertools
|
||||
import traceback
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Optional
|
||||
from typing import Optional, Type
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import transaction
|
||||
from django.db.models import Q, QuerySet
|
||||
from django.db.models import OuterRef, Q, QuerySet, Subquery
|
||||
|
||||
from loguru import logger
|
||||
from opentelemetry import trace
|
||||
|
@ -14,9 +14,11 @@ from baserow.config.celery import app
|
|||
from baserow.contrib.database.fields.periodic_field_update_handler import (
|
||||
PeriodicFieldUpdateHandler,
|
||||
)
|
||||
from baserow.contrib.database.fields.registries import field_type_registry
|
||||
from baserow.contrib.database.fields.registries import FieldType, field_type_registry
|
||||
from baserow.contrib.database.search.handler import SearchHandler
|
||||
from baserow.contrib.database.table.models import RichTextFieldMention
|
||||
from baserow.contrib.database.views.handler import ViewSubscriptionHandler
|
||||
from baserow.contrib.database.views.models import View, ViewSubscription
|
||||
from baserow.core.models import Workspace
|
||||
from baserow.core.telemetry.utils import add_baserow_trace_attrs, baserow_trace
|
||||
|
||||
|
@ -83,7 +85,7 @@ def run_periodic_fields_updates(
|
|||
|
||||
@baserow_trace(tracer)
|
||||
def _run_periodic_field_type_update_per_workspace(
|
||||
field_type_instance, workspace: Workspace, update_now=True
|
||||
field_type_instance: Type[FieldType], workspace: Workspace, update_now: bool = True
|
||||
):
|
||||
qs = field_type_instance.get_fields_needing_periodic_update()
|
||||
if qs is None:
|
||||
|
@ -93,45 +95,77 @@ def _run_periodic_field_type_update_per_workspace(
|
|||
workspace.refresh_now()
|
||||
add_baserow_trace_attrs(update_now=update_now, workspace_id=workspace.id)
|
||||
|
||||
all_updated_fields = []
|
||||
|
||||
fields = (
|
||||
qs.filter(
|
||||
table__database__workspace_id=workspace.id,
|
||||
table__trashed=False,
|
||||
table__database__trashed=False,
|
||||
table__trashed=False,
|
||||
)
|
||||
.select_related("table")
|
||||
.prefetch_related("table__view_set")
|
||||
.order_by("table__database_id")
|
||||
)
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
all_updated_fields = _run_periodic_field_update(
|
||||
fields, field_type_instance, all_updated_fields
|
||||
)
|
||||
except Exception:
|
||||
tb = traceback.format_exc()
|
||||
field_ids = ", ".join(str(field.id) for field in fields)
|
||||
logger.error(
|
||||
"Failed to periodically update {field_ids} because of: \n{tb}",
|
||||
field_ids=field_ids,
|
||||
tb=tb,
|
||||
|
||||
# Grouping by database will allow us to pass the `database_id` to the update
|
||||
# function so recreating the dependency tree will be faster.
|
||||
for database_id, field_group in itertools.groupby(
|
||||
fields, key=lambda f: f.table.database_id
|
||||
):
|
||||
fields_in_db = list(field_group)
|
||||
database_updated_fields = []
|
||||
try:
|
||||
with transaction.atomic():
|
||||
database_updated_fields = field_type_instance.run_periodic_update(
|
||||
fields_in_db,
|
||||
already_updated_fields=database_updated_fields,
|
||||
skip_search_updates=True,
|
||||
database_id=database_id,
|
||||
)
|
||||
except Exception:
|
||||
tb = traceback.format_exc()
|
||||
field_ids = ", ".join(str(field.id) for field in fields_in_db)
|
||||
logger.error(
|
||||
"Failed to periodically update {field_ids} because of: \n{tb}",
|
||||
field_ids=field_ids,
|
||||
tb=tb,
|
||||
)
|
||||
else:
|
||||
# Update tsv columns and notify views of the changes.
|
||||
SearchHandler().all_fields_values_changed_or_created(
|
||||
database_updated_fields
|
||||
)
|
||||
|
||||
updated_table_ids = list(
|
||||
{field.table_id for field in database_updated_fields}
|
||||
)
|
||||
notify_table_views_updates.delay(updated_table_ids)
|
||||
|
||||
|
||||
@app.task(bind=True)
|
||||
def notify_table_views_updates(self, table_ids):
|
||||
"""
|
||||
Notifies the views of the provided tables that their data has been updated. For
|
||||
performance reasons, we fetch all the views with subscriptions in one go and group
|
||||
them by table id so we can notify only the views that need to be notified.
|
||||
|
||||
:param table_ids: The ids of the tables that have been updated.
|
||||
"""
|
||||
|
||||
subquery = ViewSubscription.objects.filter(view_id=OuterRef("id")).values("view_id")
|
||||
views_need_notify = (
|
||||
View.objects.filter(
|
||||
table_id__in=table_ids,
|
||||
id=Subquery(subquery),
|
||||
)
|
||||
.select_related("table")
|
||||
.order_by("table_id")
|
||||
)
|
||||
|
||||
from baserow.contrib.database.views.handler import ViewSubscriptionHandler
|
||||
|
||||
# After a successful periodic update of all fields, we would need to update the
|
||||
# search index for all of them in one function per table to avoid ending up in a
|
||||
# deadlock because rows are updated simultaneously.
|
||||
fields_per_table = defaultdict(list)
|
||||
for field in all_updated_fields:
|
||||
fields_per_table[field.table_id].append(field)
|
||||
for _, fields in fields_per_table.items():
|
||||
SearchHandler().entire_field_values_changed_or_created(fields[0].table, fields)
|
||||
|
||||
for _, views_group in itertools.groupby(
|
||||
views_need_notify, key=lambda v: v.table_id
|
||||
):
|
||||
with transaction.atomic():
|
||||
ViewSubscriptionHandler().notify_table_views_updates(
|
||||
fields[0].table.view_set.all()
|
||||
ViewSubscriptionHandler.notify_table_views(
|
||||
[view.id for view in views_group]
|
||||
)
|
||||
|
||||
|
||||
|
@ -145,14 +179,6 @@ def delete_mentions_marked_for_deletion(self):
|
|||
).delete()
|
||||
|
||||
|
||||
@baserow_trace(tracer)
|
||||
def _run_periodic_field_update(fields, field_type_instance, all_updated_fields):
|
||||
with transaction.atomic():
|
||||
return field_type_instance.run_periodic_update(
|
||||
fields, already_updated_fields=all_updated_fields
|
||||
)
|
||||
|
||||
|
||||
@app.on_after_finalize.connect
|
||||
def setup_periodic_tasks(sender, **kwargs):
|
||||
sender.add_periodic_task(
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
from django.core.management.base import BaseCommand
|
||||
from django.db import transaction
|
||||
|
||||
from baserow.contrib.database.fields.tasks import run_periodic_fields_updates
|
||||
|
||||
|
@ -21,7 +20,6 @@ class Command(BaseCommand):
|
|||
help="If set to true all the workspaces will be updated with the current time otherwise the previous `now` value will be used.",
|
||||
)
|
||||
|
||||
@transaction.atomic
|
||||
def handle(self, *args, **options):
|
||||
run_periodic_fields_updates(
|
||||
options["workspace_id"], not options["dont_update_now"]
|
||||
|
|
|
@ -755,6 +755,38 @@ class SearchHandler(
|
|||
updated_fields=updated_fields,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def all_fields_values_changed_or_created(cls, updated_fields: List["Field"]):
|
||||
"""
|
||||
Called when field values for a table have been changed or created for an entire
|
||||
field column at once. This is more efficient than calling
|
||||
`entire_field_values_changed_or_created` for each table individually when
|
||||
multiple tables have had field values changed. Please, make sure to
|
||||
select_related the table for the "updated_fields" to avoid N+1 queries.
|
||||
|
||||
:param updated_fields: If only some fields have had values changed then the
|
||||
search vector update can be optimized by providing those here.
|
||||
"""
|
||||
|
||||
from baserow.contrib.database.search.tasks import (
|
||||
async_update_multiple_fields_tsvector_columns,
|
||||
)
|
||||
from baserow.contrib.database.tasks import (
|
||||
enqueue_task_on_commit_swallowing_any_exceptions,
|
||||
)
|
||||
|
||||
searchable_updated_fields_ids = [
|
||||
field.id for field in updated_fields if field.table.tsvectors_are_supported
|
||||
]
|
||||
|
||||
if searchable_updated_fields_ids:
|
||||
enqueue_task_on_commit_swallowing_any_exceptions(
|
||||
lambda: async_update_multiple_fields_tsvector_columns.delay(
|
||||
field_ids=searchable_updated_fields_ids,
|
||||
update_tsvs_for_changed_rows_only=False,
|
||||
)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _trigger_async_tsvector_task_if_needed(
|
||||
cls,
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import itertools
|
||||
import traceback
|
||||
from typing import List, Optional
|
||||
|
||||
from django.conf import settings
|
||||
|
@ -5,9 +7,11 @@ from django.conf import settings
|
|||
from loguru import logger
|
||||
|
||||
from baserow.config.celery import app
|
||||
from baserow.contrib.database.fields.models import Field
|
||||
from baserow.contrib.database.search.exceptions import (
|
||||
PostgresFullTextSearchDisabledException,
|
||||
)
|
||||
from baserow.contrib.database.table.models import Table
|
||||
|
||||
|
||||
@app.task(
|
||||
|
@ -31,12 +35,63 @@ def async_update_tsvector_columns(
|
|||
"""
|
||||
|
||||
from baserow.contrib.database.search.handler import SearchHandler
|
||||
from baserow.contrib.database.table.handler import TableHandler
|
||||
|
||||
table = TableHandler().get_table(table_id)
|
||||
try:
|
||||
table = Table.objects_and_trash.get(id=table_id)
|
||||
except Table.DoesNotExist:
|
||||
logger.warning(
|
||||
f"Could not find table with id {table_id} for updating tsvector columns."
|
||||
)
|
||||
return
|
||||
try:
|
||||
SearchHandler.update_tsvector_columns_locked(
|
||||
table, update_tsvs_for_changed_rows_only, field_ids_to_restrict_update_to
|
||||
)
|
||||
except PostgresFullTextSearchDisabledException:
|
||||
logger.debug(f"Postgres full-text search is disabled.")
|
||||
|
||||
|
||||
@app.task(
|
||||
queue="export",
|
||||
time_limit=settings.CELERY_SEARCH_UPDATE_HARD_TIME_LIMIT,
|
||||
)
|
||||
def async_update_multiple_fields_tsvector_columns(
|
||||
field_ids: List[int],
|
||||
update_tsvs_for_changed_rows_only: bool,
|
||||
):
|
||||
"""
|
||||
Responsible for asynchronously updating the `tsvector` columns for all the fields
|
||||
provided.
|
||||
|
||||
:param field_ids: The fields we'd like to update the tsvectors for.
|
||||
:param update_tsvs_for_changed_rows_only: By default we will only update rows on the
|
||||
table which have changed since the last search update. If set to `False`, we
|
||||
will index all cells that match the other parameters.
|
||||
"""
|
||||
|
||||
from baserow.contrib.database.search.handler import SearchHandler
|
||||
|
||||
fields = (
|
||||
Field.objects_and_trash.filter(id__in=field_ids)
|
||||
.select_related("table")
|
||||
.order_by("table_id")
|
||||
)
|
||||
for _, field_group in itertools.groupby(fields, lambda f: f.table_id):
|
||||
table_fields = list(field_group)
|
||||
table = table_fields[0].table
|
||||
try:
|
||||
SearchHandler.update_tsvector_columns_locked(
|
||||
table, update_tsvs_for_changed_rows_only, [f.id for f in table_fields]
|
||||
)
|
||||
except PostgresFullTextSearchDisabledException:
|
||||
logger.debug(f"Postgres full-text search is disabled.")
|
||||
break
|
||||
except Exception:
|
||||
tb = traceback.format_exc()
|
||||
field_ids = ", ".join(str(field.id) for field in field_group)
|
||||
logger.error(
|
||||
"Failed to update tsvector columns for fields {field_ids} in table {table_id} because of: \n{tb}.",
|
||||
field_ids=field_ids,
|
||||
table_id=table.id,
|
||||
tb=tb,
|
||||
)
|
||||
|
|
|
@ -3936,7 +3936,7 @@ class ViewSubscriptionHandler:
|
|||
for _, view_group in itertools.groupby(views, key=lambda f: f.table):
|
||||
view_ids = [v.id for v in view_group]
|
||||
if view_ids:
|
||||
cls._notify_table_views_updates(view_ids)
|
||||
cls.notify_table_views(view_ids)
|
||||
|
||||
@classmethod
|
||||
def notify_table_views_updates(
|
||||
|
@ -3955,10 +3955,10 @@ class ViewSubscriptionHandler:
|
|||
view__in=views
|
||||
).values_list("view_id", flat=True)
|
||||
if view_ids_with_subscribers:
|
||||
cls._notify_table_views_updates(view_ids_with_subscribers, model)
|
||||
cls.notify_table_views(view_ids_with_subscribers, model)
|
||||
|
||||
@classmethod
|
||||
def _notify_table_views_updates(
|
||||
def notify_table_views(
|
||||
cls, view_ids: list[int], model: GeneratedTableModel | None = None
|
||||
):
|
||||
"""
|
||||
|
|
Loading…
Add table
Reference in a new issue