1
0
Fork 0
mirror of https://gitlab.com/bramw/baserow.git synced 2025-04-07 06:15:36 +00:00

Merge branch 'per-database-update-patch' into 'develop'

Periodic field update tasks Improvements

See merge request 
This commit is contained in:
Davide Silvestri 2025-03-31 11:27:23 +02:00
commit ae674ffa4e
8 changed files with 210 additions and 86 deletions
backend/src/baserow/contrib/database

View file

@ -18,6 +18,7 @@ from baserow.contrib.database.fields.dependencies.exceptions import (
from baserow.contrib.database.fields.field_cache import FieldCache
from baserow.contrib.database.fields.models import Field, LinkRowField
from baserow.contrib.database.fields.registries import FieldType, field_type_registry
from baserow.contrib.database.table.models import Table
from baserow.core.db import specific_iterator
from baserow.core.handler import CoreHandler
from baserow.core.models import Workspace
@ -112,7 +113,8 @@ class FieldDependencyHandler:
}
relationship_table = FieldDependency._meta.db_table
linkrowfield_table = LinkRowField._meta.db_table
field_table = Field._meta.db_table
fields_db_table_name = Field._meta.db_table
tables_db_table_name = Table._meta.db_table
if associated_relations_changed:
associated_relations_changed_query = f"""
@ -128,34 +130,33 @@ class FieldDependencyHandler:
associated_relations_changed_query = ""
if database_id_prefilter:
# Override the `relationship_table` with a CTE that only selects the
# dependencies within the provided database. This will significantly speed
# up performance because it doesn't need to filter through all the
# dependencies. This improvement is noticeable on large Baserow instances
# with many users.
# Filters tbe dependencies to only include fields in the provided database.
# This will significantly speed up performance because it doesn't need to
# filter through all the dependencies. This improvement is noticeable on
# large Baserow instances with many users.
database_prefilter_query = f"""
WITH {relationship_table} AS (
SELECT database_fielddependency.id,
database_fielddependency.dependant_id,
database_fielddependency.dependency_id,
database_fielddependency.via_id
FROM database_fielddependency
INNER JOIN database_field
ON ( database_fielddependency.dependant_id =
database_field.id )
INNER JOIN database_table
ON ( database_field.table_id = database_table.id )
WHERE database_table.database_id = %(database_id)s
)
""" # nosec b608
INNER JOIN {fields_db_table_name}
ON ( {relationship_table}.dependant_id = {fields_db_table_name}.id )
INNER JOIN {tables_db_table_name}
ON ( {fields_db_table_name}.table_id = {tables_db_table_name}.id )
WHERE {tables_db_table_name}.database_id = %(database_id)s
"""
else:
database_prefilter_query = ""
relationship_table_cte = f"""
SELECT {relationship_table}.id,
{relationship_table}.dependant_id,
{relationship_table}.dependency_id,
{relationship_table}.via_id
FROM {relationship_table} {database_prefilter_query}
""" # nosec b608
# Raw query that traverses through the dependencies, and will find the
# dependants of the provided fields ids recursively.
raw_query = f"""
WITH RECURSIVE traverse(id, dependency_ids, via_ids, depth) AS (
{database_prefilter_query}
WITH relationship_table_cte AS ( {relationship_table_cte} )
SELECT
first.dependant_id,
first.dependency_id::text,
@ -176,14 +177,14 @@ class FieldDependencyHandler:
ELSE ''
END as via_id,
1
FROM {relationship_table} AS first
LEFT OUTER JOIN {relationship_table} AS second
FROM relationship_table_cte AS first
LEFT OUTER JOIN relationship_table_cte AS second
ON first.dependant_id = second.dependency_id
LEFT OUTER JOIN {linkrowfield_table} as linkrowfield
ON first.via_id = linkrowfield.field_ptr_id
LEFT OUTER JOIN {field_table} as dependant
LEFT OUTER JOIN {fields_db_table_name} as dependant
ON first.dependant_id = dependant.id
LEFT OUTER JOIN {field_table} as dependency
LEFT OUTER JOIN {fields_db_table_name} as dependency
ON first.dependency_id = dependency.id
WHERE
first.dependency_id = ANY(%(pks)s)
@ -198,8 +199,8 @@ class FieldDependencyHandler:
coalesce(concat_ws('|', via_ids::text, via_id::text), ''),
traverse.depth + 1
FROM traverse
INNER JOIN {relationship_table}
ON {relationship_table}.dependency_id = traverse.id
INNER JOIN relationship_table_cte
ON relationship_table_cte.dependency_id = traverse.id
WHERE 1 = 1
-- LIMITING_FK_EDGES_CLAUSE_2
-- DISALLOWED_ANCESTORS_NODES_CLAUSE_2
@ -214,7 +215,7 @@ class FieldDependencyHandler:
field.table_id,
MAX(depth) as depth
FROM traverse
LEFT OUTER JOIN {field_table} as field
LEFT OUTER JOIN {fields_db_table_name} as field
ON traverse.id = field.id
WHERE depth <= %(max_depth)s
GROUP BY traverse.id, traverse.via_ids, field.content_type_id, field.name, field.table_id
@ -222,7 +223,6 @@ class FieldDependencyHandler:
""" # nosec b608
queryset = FieldDependency.objects.raw(raw_query, query_parameters)
link_row_field_content_type = ContentType.objects.get_for_model(LinkRowField)
fields_to_fetch = set()
fields_in_cache = {}

View file

@ -5085,6 +5085,8 @@ class FormulaFieldType(FormulaFieldTypeArrayFilterSupport, ReadOnlyFieldType):
field_cache: "Optional[FieldCache]" = None,
via_path_to_starting_table: Optional[List[LinkRowField]] = None,
already_updated_fields: Optional[List[Field]] = None,
skip_search_updates: bool = False,
database_id: Optional[int] = None,
):
from baserow.contrib.database.fields.dependencies.update_collector import (
FieldUpdateCollector,
@ -5113,15 +5115,18 @@ class FormulaFieldType(FormulaFieldTypeArrayFilterSupport, ReadOnlyFieldType):
for update_collector in update_collectors.values():
updated_fields |= set(
update_collector.apply_updates_and_get_updated_fields(field_cache)
update_collector.apply_updates_and_get_updated_fields(
field_cache, skip_search_updates=skip_search_updates
)
)
all_dependent_fields_grouped_by_depth = FieldDependencyHandler.group_all_dependent_fields_by_level_from_fields(
fields,
field_cache,
associated_relations_changed=False,
# We can't provide the `database_id_prefilter` here because the fields
# can belong in different databases.
all_dependent_fields_grouped_by_depth = (
FieldDependencyHandler.group_all_dependent_fields_by_level_from_fields(
fields,
field_cache,
associated_relations_changed=False,
database_id_prefilter=database_id,
)
)
for dependant_fields_group in all_dependent_fields_grouped_by_depth:
for table_id, dependant_field in dependant_fields_group:
@ -5136,7 +5141,9 @@ class FormulaFieldType(FormulaFieldTypeArrayFilterSupport, ReadOnlyFieldType):
via_path_to_starting_table,
)
updated_fields |= set(
update_collector.apply_updates_and_get_updated_fields(field_cache)
update_collector.apply_updates_and_get_updated_fields(
field_cache, skip_search_updates=skip_search_updates
)
)
update_collector.send_force_refresh_signals_for_all_updated_tables()

View file

@ -1311,6 +1311,8 @@ class FieldType(
field_cache: "Optional[FieldCache]" = None,
via_path_to_starting_table: Optional[List[LinkRowField]] = None,
already_updated_fields: Optional[List[Field]] = None,
skip_search_updates: bool = False,
database_id: Optional[int] = None,
):
"""
This method is called periodically for all the fields of the same type
@ -1327,6 +1329,10 @@ class FieldType(
:param already_updated_fields: A list of fields that have already been updated
before. That can happen because it was a dependency of another field for
example.
:param skip_search_updates: If True, search index updates should be skipped.
:param database_id: The id of the database that the fields belong to. If not
provided, it means fields might belong to different databases and so we
should not assume they belong to the same database.
"""
return already_updated_fields

View file

@ -1,11 +1,11 @@
import itertools
import traceback
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from typing import Optional
from typing import Optional, Type
from django.conf import settings
from django.db import transaction
from django.db.models import Q, QuerySet
from django.db.models import OuterRef, Q, QuerySet, Subquery
from loguru import logger
from opentelemetry import trace
@ -14,9 +14,11 @@ from baserow.config.celery import app
from baserow.contrib.database.fields.periodic_field_update_handler import (
PeriodicFieldUpdateHandler,
)
from baserow.contrib.database.fields.registries import field_type_registry
from baserow.contrib.database.fields.registries import FieldType, field_type_registry
from baserow.contrib.database.search.handler import SearchHandler
from baserow.contrib.database.table.models import RichTextFieldMention
from baserow.contrib.database.views.handler import ViewSubscriptionHandler
from baserow.contrib.database.views.models import View, ViewSubscription
from baserow.core.models import Workspace
from baserow.core.telemetry.utils import add_baserow_trace_attrs, baserow_trace
@ -83,7 +85,7 @@ def run_periodic_fields_updates(
@baserow_trace(tracer)
def _run_periodic_field_type_update_per_workspace(
field_type_instance, workspace: Workspace, update_now=True
field_type_instance: Type[FieldType], workspace: Workspace, update_now: bool = True
):
qs = field_type_instance.get_fields_needing_periodic_update()
if qs is None:
@ -93,45 +95,77 @@ def _run_periodic_field_type_update_per_workspace(
workspace.refresh_now()
add_baserow_trace_attrs(update_now=update_now, workspace_id=workspace.id)
all_updated_fields = []
fields = (
qs.filter(
table__database__workspace_id=workspace.id,
table__trashed=False,
table__database__trashed=False,
table__trashed=False,
)
.select_related("table")
.prefetch_related("table__view_set")
.order_by("table__database_id")
)
# noinspection PyBroadException
try:
all_updated_fields = _run_periodic_field_update(
fields, field_type_instance, all_updated_fields
)
except Exception:
tb = traceback.format_exc()
field_ids = ", ".join(str(field.id) for field in fields)
logger.error(
"Failed to periodically update {field_ids} because of: \n{tb}",
field_ids=field_ids,
tb=tb,
# Grouping by database will allow us to pass the `database_id` to the update
# function so recreating the dependency tree will be faster.
for database_id, field_group in itertools.groupby(
fields, key=lambda f: f.table.database_id
):
fields_in_db = list(field_group)
database_updated_fields = []
try:
with transaction.atomic():
database_updated_fields = field_type_instance.run_periodic_update(
fields_in_db,
already_updated_fields=database_updated_fields,
skip_search_updates=True,
database_id=database_id,
)
except Exception:
tb = traceback.format_exc()
field_ids = ", ".join(str(field.id) for field in fields_in_db)
logger.error(
"Failed to periodically update {field_ids} because of: \n{tb}",
field_ids=field_ids,
tb=tb,
)
else:
# Update tsv columns and notify views of the changes.
SearchHandler().all_fields_values_changed_or_created(
database_updated_fields
)
updated_table_ids = list(
{field.table_id for field in database_updated_fields}
)
notify_table_views_updates.delay(updated_table_ids)
@app.task(bind=True)
def notify_table_views_updates(self, table_ids):
"""
Notifies the views of the provided tables that their data has been updated. For
performance reasons, we fetch all the views with subscriptions in one go and group
them by table id so we can notify only the views that need to be notified.
:param table_ids: The ids of the tables that have been updated.
"""
subquery = ViewSubscription.objects.filter(view_id=OuterRef("id")).values("view_id")
views_need_notify = (
View.objects.filter(
table_id__in=table_ids,
id=Subquery(subquery),
)
.select_related("table")
.order_by("table_id")
)
from baserow.contrib.database.views.handler import ViewSubscriptionHandler
# After a successful periodic update of all fields, we would need to update the
# search index for all of them in one function per table to avoid ending up in a
# deadlock because rows are updated simultaneously.
fields_per_table = defaultdict(list)
for field in all_updated_fields:
fields_per_table[field.table_id].append(field)
for _, fields in fields_per_table.items():
SearchHandler().entire_field_values_changed_or_created(fields[0].table, fields)
for _, views_group in itertools.groupby(
views_need_notify, key=lambda v: v.table_id
):
with transaction.atomic():
ViewSubscriptionHandler().notify_table_views_updates(
fields[0].table.view_set.all()
ViewSubscriptionHandler.notify_table_views(
[view.id for view in views_group]
)
@ -145,14 +179,6 @@ def delete_mentions_marked_for_deletion(self):
).delete()
@baserow_trace(tracer)
def _run_periodic_field_update(fields, field_type_instance, all_updated_fields):
with transaction.atomic():
return field_type_instance.run_periodic_update(
fields, already_updated_fields=all_updated_fields
)
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(

View file

@ -1,5 +1,4 @@
from django.core.management.base import BaseCommand
from django.db import transaction
from baserow.contrib.database.fields.tasks import run_periodic_fields_updates
@ -21,7 +20,6 @@ class Command(BaseCommand):
help="If set to true all the workspaces will be updated with the current time otherwise the previous `now` value will be used.",
)
@transaction.atomic
def handle(self, *args, **options):
run_periodic_fields_updates(
options["workspace_id"], not options["dont_update_now"]

View file

@ -755,6 +755,38 @@ class SearchHandler(
updated_fields=updated_fields,
)
@classmethod
def all_fields_values_changed_or_created(cls, updated_fields: List["Field"]):
"""
Called when field values for a table have been changed or created for an entire
field column at once. This is more efficient than calling
`entire_field_values_changed_or_created` for each table individually when
multiple tables have had field values changed. Please, make sure to
select_related the table for the "updated_fields" to avoid N+1 queries.
:param updated_fields: If only some fields have had values changed then the
search vector update can be optimized by providing those here.
"""
from baserow.contrib.database.search.tasks import (
async_update_multiple_fields_tsvector_columns,
)
from baserow.contrib.database.tasks import (
enqueue_task_on_commit_swallowing_any_exceptions,
)
searchable_updated_fields_ids = [
field.id for field in updated_fields if field.table.tsvectors_are_supported
]
if searchable_updated_fields_ids:
enqueue_task_on_commit_swallowing_any_exceptions(
lambda: async_update_multiple_fields_tsvector_columns.delay(
field_ids=searchable_updated_fields_ids,
update_tsvs_for_changed_rows_only=False,
)
)
@classmethod
def _trigger_async_tsvector_task_if_needed(
cls,

View file

@ -1,3 +1,5 @@
import itertools
import traceback
from typing import List, Optional
from django.conf import settings
@ -5,9 +7,11 @@ from django.conf import settings
from loguru import logger
from baserow.config.celery import app
from baserow.contrib.database.fields.models import Field
from baserow.contrib.database.search.exceptions import (
PostgresFullTextSearchDisabledException,
)
from baserow.contrib.database.table.models import Table
@app.task(
@ -31,12 +35,63 @@ def async_update_tsvector_columns(
"""
from baserow.contrib.database.search.handler import SearchHandler
from baserow.contrib.database.table.handler import TableHandler
table = TableHandler().get_table(table_id)
try:
table = Table.objects_and_trash.get(id=table_id)
except Table.DoesNotExist:
logger.warning(
f"Could not find table with id {table_id} for updating tsvector columns."
)
return
try:
SearchHandler.update_tsvector_columns_locked(
table, update_tsvs_for_changed_rows_only, field_ids_to_restrict_update_to
)
except PostgresFullTextSearchDisabledException:
logger.debug(f"Postgres full-text search is disabled.")
@app.task(
queue="export",
time_limit=settings.CELERY_SEARCH_UPDATE_HARD_TIME_LIMIT,
)
def async_update_multiple_fields_tsvector_columns(
field_ids: List[int],
update_tsvs_for_changed_rows_only: bool,
):
"""
Responsible for asynchronously updating the `tsvector` columns for all the fields
provided.
:param field_ids: The fields we'd like to update the tsvectors for.
:param update_tsvs_for_changed_rows_only: By default we will only update rows on the
table which have changed since the last search update. If set to `False`, we
will index all cells that match the other parameters.
"""
from baserow.contrib.database.search.handler import SearchHandler
fields = (
Field.objects_and_trash.filter(id__in=field_ids)
.select_related("table")
.order_by("table_id")
)
for _, field_group in itertools.groupby(fields, lambda f: f.table_id):
table_fields = list(field_group)
table = table_fields[0].table
try:
SearchHandler.update_tsvector_columns_locked(
table, update_tsvs_for_changed_rows_only, [f.id for f in table_fields]
)
except PostgresFullTextSearchDisabledException:
logger.debug(f"Postgres full-text search is disabled.")
break
except Exception:
tb = traceback.format_exc()
field_ids = ", ".join(str(field.id) for field in field_group)
logger.error(
"Failed to update tsvector columns for fields {field_ids} in table {table_id} because of: \n{tb}.",
field_ids=field_ids,
table_id=table.id,
tb=tb,
)

View file

@ -3936,7 +3936,7 @@ class ViewSubscriptionHandler:
for _, view_group in itertools.groupby(views, key=lambda f: f.table):
view_ids = [v.id for v in view_group]
if view_ids:
cls._notify_table_views_updates(view_ids)
cls.notify_table_views(view_ids)
@classmethod
def notify_table_views_updates(
@ -3955,10 +3955,10 @@ class ViewSubscriptionHandler:
view__in=views
).values_list("view_id", flat=True)
if view_ids_with_subscribers:
cls._notify_table_views_updates(view_ids_with_subscribers, model)
cls.notify_table_views(view_ids_with_subscribers, model)
@classmethod
def _notify_table_views_updates(
def notify_table_views(
cls, view_ids: list[int], model: GeneratedTableModel | None = None
):
"""