From d3fba862abc17adec334ab72f8734e6927530822 Mon Sep 17 00:00:00 2001
From: Bram Wiepjes <bramw@protonmail.com>
Date: Mon, 31 Mar 2025 09:27:23 +0000
Subject: [PATCH] Periodic field update tasks Improvements

---
 .../database/fields/dependencies/handler.py   |  58 ++++-----
 .../contrib/database/fields/field_types.py    |  23 ++--
 .../contrib/database/fields/registries.py     |   6 +
 .../baserow/contrib/database/fields/tasks.py  | 110 +++++++++++-------
 .../commands/run_periodic_fields_updates.py   |   2 -
 .../contrib/database/search/handler.py        |  32 +++++
 .../baserow/contrib/database/search/tasks.py  |  59 +++++++++-
 .../baserow/contrib/database/views/handler.py |   6 +-
 8 files changed, 210 insertions(+), 86 deletions(-)

diff --git a/backend/src/baserow/contrib/database/fields/dependencies/handler.py b/backend/src/baserow/contrib/database/fields/dependencies/handler.py
index 7cc3437ca..5edaf61cd 100644
--- a/backend/src/baserow/contrib/database/fields/dependencies/handler.py
+++ b/backend/src/baserow/contrib/database/fields/dependencies/handler.py
@@ -18,6 +18,7 @@ from baserow.contrib.database.fields.dependencies.exceptions import (
 from baserow.contrib.database.fields.field_cache import FieldCache
 from baserow.contrib.database.fields.models import Field, LinkRowField
 from baserow.contrib.database.fields.registries import FieldType, field_type_registry
+from baserow.contrib.database.table.models import Table
 from baserow.core.db import specific_iterator
 from baserow.core.handler import CoreHandler
 from baserow.core.models import Workspace
@@ -112,7 +113,8 @@ class FieldDependencyHandler:
         }
         relationship_table = FieldDependency._meta.db_table
         linkrowfield_table = LinkRowField._meta.db_table
-        field_table = Field._meta.db_table
+        fields_db_table_name = Field._meta.db_table
+        tables_db_table_name = Table._meta.db_table
 
         if associated_relations_changed:
             associated_relations_changed_query = f"""
@@ -128,34 +130,33 @@ class FieldDependencyHandler:
             associated_relations_changed_query = ""
 
         if database_id_prefilter:
-            # Override the `relationship_table` with a CTE that only selects the
-            # dependencies within the provided database. This will significantly speed
-            # up performance because it doesn't need to filter through all the
-            # dependencies. This improvement is noticeable on large Baserow instances
-            # with many users.
+            # Filters tbe dependencies to only include fields in the provided database.
+            # This will significantly speed up performance because it doesn't need to
+            # filter through all the dependencies. This improvement is noticeable on
+            # large Baserow instances with many users.
             database_prefilter_query = f"""
-            WITH {relationship_table} AS (
-                SELECT database_fielddependency.id,
-                       database_fielddependency.dependant_id,
-                       database_fielddependency.dependency_id,
-                       database_fielddependency.via_id
-                FROM   database_fielddependency
-                           INNER JOIN database_field
-                                      ON ( database_fielddependency.dependant_id =
-                                           database_field.id )
-                           INNER JOIN database_table
-                                      ON ( database_field.table_id = database_table.id )
-                WHERE  database_table.database_id = %(database_id)s
-            )
-            """  # nosec b608
+                INNER JOIN {fields_db_table_name}
+                    ON ( {relationship_table}.dependant_id = {fields_db_table_name}.id )
+                INNER JOIN {tables_db_table_name}
+                    ON ( {fields_db_table_name}.table_id = {tables_db_table_name}.id )
+                WHERE  {tables_db_table_name}.database_id = %(database_id)s
+            """
         else:
             database_prefilter_query = ""
 
+        relationship_table_cte = f"""
+            SELECT  {relationship_table}.id,
+                    {relationship_table}.dependant_id,
+                    {relationship_table}.dependency_id,
+                    {relationship_table}.via_id
+            FROM {relationship_table} {database_prefilter_query}
+        """  # nosec b608
+
         # Raw query that traverses through the dependencies, and will find the
         # dependants of the provided fields ids recursively.
         raw_query = f"""
             WITH RECURSIVE traverse(id, dependency_ids, via_ids, depth) AS (
-                {database_prefilter_query}
+              WITH relationship_table_cte AS ( {relationship_table_cte} )
                 SELECT
                     first.dependant_id,
                     first.dependency_id::text,
@@ -176,14 +177,14 @@ class FieldDependencyHandler:
                         ELSE ''
                     END as via_id,
                     1
-                FROM {relationship_table} AS first
-                LEFT OUTER JOIN {relationship_table} AS second
+                FROM relationship_table_cte AS first
+                LEFT OUTER JOIN relationship_table_cte AS second
                     ON first.dependant_id = second.dependency_id
                 LEFT OUTER JOIN {linkrowfield_table} as linkrowfield
                     ON first.via_id = linkrowfield.field_ptr_id
-                LEFT OUTER JOIN {field_table} as dependant
+                LEFT OUTER JOIN {fields_db_table_name} as dependant
                     ON first.dependant_id = dependant.id
-                LEFT OUTER JOIN {field_table} as dependency
+                LEFT OUTER JOIN {fields_db_table_name} as dependency
                     ON first.dependency_id = dependency.id
                 WHERE
                     first.dependency_id = ANY(%(pks)s)
@@ -198,8 +199,8 @@ class FieldDependencyHandler:
                     coalesce(concat_ws('|', via_ids::text, via_id::text), ''),
                     traverse.depth + 1
                 FROM traverse
-                INNER JOIN {relationship_table}
-                    ON {relationship_table}.dependency_id = traverse.id
+                INNER JOIN relationship_table_cte
+                    ON relationship_table_cte.dependency_id = traverse.id
                 WHERE 1 = 1
                 -- LIMITING_FK_EDGES_CLAUSE_2
                 -- DISALLOWED_ANCESTORS_NODES_CLAUSE_2
@@ -214,7 +215,7 @@ class FieldDependencyHandler:
                 field.table_id,
                 MAX(depth) as depth
             FROM traverse
-            LEFT OUTER JOIN {field_table} as field
+            LEFT OUTER JOIN {fields_db_table_name} as field
                 ON traverse.id = field.id
             WHERE depth <= %(max_depth)s
             GROUP BY traverse.id, traverse.via_ids, field.content_type_id, field.name, field.table_id
@@ -222,7 +223,6 @@ class FieldDependencyHandler:
         """  # nosec b608
 
         queryset = FieldDependency.objects.raw(raw_query, query_parameters)
-
         link_row_field_content_type = ContentType.objects.get_for_model(LinkRowField)
         fields_to_fetch = set()
         fields_in_cache = {}
diff --git a/backend/src/baserow/contrib/database/fields/field_types.py b/backend/src/baserow/contrib/database/fields/field_types.py
index dfff82050..33936afe9 100755
--- a/backend/src/baserow/contrib/database/fields/field_types.py
+++ b/backend/src/baserow/contrib/database/fields/field_types.py
@@ -5085,6 +5085,8 @@ class FormulaFieldType(FormulaFieldTypeArrayFilterSupport, ReadOnlyFieldType):
         field_cache: "Optional[FieldCache]" = None,
         via_path_to_starting_table: Optional[List[LinkRowField]] = None,
         already_updated_fields: Optional[List[Field]] = None,
+        skip_search_updates: bool = False,
+        database_id: Optional[int] = None,
     ):
         from baserow.contrib.database.fields.dependencies.update_collector import (
             FieldUpdateCollector,
@@ -5113,15 +5115,18 @@ class FormulaFieldType(FormulaFieldTypeArrayFilterSupport, ReadOnlyFieldType):
 
         for update_collector in update_collectors.values():
             updated_fields |= set(
-                update_collector.apply_updates_and_get_updated_fields(field_cache)
+                update_collector.apply_updates_and_get_updated_fields(
+                    field_cache, skip_search_updates=skip_search_updates
+                )
             )
 
-        all_dependent_fields_grouped_by_depth = FieldDependencyHandler.group_all_dependent_fields_by_level_from_fields(
-            fields,
-            field_cache,
-            associated_relations_changed=False,
-            # We can't provide the `database_id_prefilter` here because the fields
-            # can belong in different databases.
+        all_dependent_fields_grouped_by_depth = (
+            FieldDependencyHandler.group_all_dependent_fields_by_level_from_fields(
+                fields,
+                field_cache,
+                associated_relations_changed=False,
+                database_id_prefilter=database_id,
+            )
         )
         for dependant_fields_group in all_dependent_fields_grouped_by_depth:
             for table_id, dependant_field in dependant_fields_group:
@@ -5136,7 +5141,9 @@ class FormulaFieldType(FormulaFieldTypeArrayFilterSupport, ReadOnlyFieldType):
                     via_path_to_starting_table,
                 )
             updated_fields |= set(
-                update_collector.apply_updates_and_get_updated_fields(field_cache)
+                update_collector.apply_updates_and_get_updated_fields(
+                    field_cache, skip_search_updates=skip_search_updates
+                )
             )
 
         update_collector.send_force_refresh_signals_for_all_updated_tables()
diff --git a/backend/src/baserow/contrib/database/fields/registries.py b/backend/src/baserow/contrib/database/fields/registries.py
index b1e2cb7fd..ba0e411ae 100644
--- a/backend/src/baserow/contrib/database/fields/registries.py
+++ b/backend/src/baserow/contrib/database/fields/registries.py
@@ -1311,6 +1311,8 @@ class FieldType(
         field_cache: "Optional[FieldCache]" = None,
         via_path_to_starting_table: Optional[List[LinkRowField]] = None,
         already_updated_fields: Optional[List[Field]] = None,
+        skip_search_updates: bool = False,
+        database_id: Optional[int] = None,
     ):
         """
         This method is called periodically for all the fields of the same type
@@ -1327,6 +1329,10 @@ class FieldType(
         :param already_updated_fields: A list of fields that have already been updated
             before. That can happen because it was a dependency of another field for
             example.
+        :param skip_search_updates: If True, search index updates should be skipped.
+        :param database_id: The id of the database that the fields belong to. If not
+            provided, it means fields might belong to different databases and so we
+            should not assume they belong to the same database.
         """
 
         return already_updated_fields
diff --git a/backend/src/baserow/contrib/database/fields/tasks.py b/backend/src/baserow/contrib/database/fields/tasks.py
index c24150d31..cbbbc8104 100644
--- a/backend/src/baserow/contrib/database/fields/tasks.py
+++ b/backend/src/baserow/contrib/database/fields/tasks.py
@@ -1,11 +1,11 @@
+import itertools
 import traceback
-from collections import defaultdict
 from datetime import datetime, timedelta, timezone
-from typing import Optional
+from typing import Optional, Type
 
 from django.conf import settings
 from django.db import transaction
-from django.db.models import Q, QuerySet
+from django.db.models import OuterRef, Q, QuerySet, Subquery
 
 from loguru import logger
 from opentelemetry import trace
@@ -14,9 +14,11 @@ from baserow.config.celery import app
 from baserow.contrib.database.fields.periodic_field_update_handler import (
     PeriodicFieldUpdateHandler,
 )
-from baserow.contrib.database.fields.registries import field_type_registry
+from baserow.contrib.database.fields.registries import FieldType, field_type_registry
 from baserow.contrib.database.search.handler import SearchHandler
 from baserow.contrib.database.table.models import RichTextFieldMention
+from baserow.contrib.database.views.handler import ViewSubscriptionHandler
+from baserow.contrib.database.views.models import View, ViewSubscription
 from baserow.core.models import Workspace
 from baserow.core.telemetry.utils import add_baserow_trace_attrs, baserow_trace
 
@@ -83,7 +85,7 @@ def run_periodic_fields_updates(
 
 @baserow_trace(tracer)
 def _run_periodic_field_type_update_per_workspace(
-    field_type_instance, workspace: Workspace, update_now=True
+    field_type_instance: Type[FieldType], workspace: Workspace, update_now: bool = True
 ):
     qs = field_type_instance.get_fields_needing_periodic_update()
     if qs is None:
@@ -93,45 +95,77 @@ def _run_periodic_field_type_update_per_workspace(
         workspace.refresh_now()
     add_baserow_trace_attrs(update_now=update_now, workspace_id=workspace.id)
 
-    all_updated_fields = []
-
     fields = (
         qs.filter(
             table__database__workspace_id=workspace.id,
-            table__trashed=False,
             table__database__trashed=False,
+            table__trashed=False,
         )
         .select_related("table")
-        .prefetch_related("table__view_set")
+        .order_by("table__database_id")
     )
-    # noinspection PyBroadException
-    try:
-        all_updated_fields = _run_periodic_field_update(
-            fields, field_type_instance, all_updated_fields
-        )
-    except Exception:
-        tb = traceback.format_exc()
-        field_ids = ", ".join(str(field.id) for field in fields)
-        logger.error(
-            "Failed to periodically update {field_ids} because of: \n{tb}",
-            field_ids=field_ids,
-            tb=tb,
+
+    # Grouping by database will allow us to pass the `database_id` to the update
+    # function so recreating the dependency tree will be faster.
+    for database_id, field_group in itertools.groupby(
+        fields, key=lambda f: f.table.database_id
+    ):
+        fields_in_db = list(field_group)
+        database_updated_fields = []
+        try:
+            with transaction.atomic():
+                database_updated_fields = field_type_instance.run_periodic_update(
+                    fields_in_db,
+                    already_updated_fields=database_updated_fields,
+                    skip_search_updates=True,
+                    database_id=database_id,
+                )
+        except Exception:
+            tb = traceback.format_exc()
+            field_ids = ", ".join(str(field.id) for field in fields_in_db)
+            logger.error(
+                "Failed to periodically update {field_ids} because of: \n{tb}",
+                field_ids=field_ids,
+                tb=tb,
+            )
+        else:
+            # Update tsv columns and notify views of the changes.
+            SearchHandler().all_fields_values_changed_or_created(
+                database_updated_fields
+            )
+
+            updated_table_ids = list(
+                {field.table_id for field in database_updated_fields}
+            )
+            notify_table_views_updates.delay(updated_table_ids)
+
+
+@app.task(bind=True)
+def notify_table_views_updates(self, table_ids):
+    """
+    Notifies the views of the provided tables that their data has been updated. For
+    performance reasons, we fetch all the views with subscriptions in one go and group
+    them by table id so we can notify only the views that need to be notified.
+
+    :param table_ids: The ids of the tables that have been updated.
+    """
+
+    subquery = ViewSubscription.objects.filter(view_id=OuterRef("id")).values("view_id")
+    views_need_notify = (
+        View.objects.filter(
+            table_id__in=table_ids,
+            id=Subquery(subquery),
         )
+        .select_related("table")
+        .order_by("table_id")
+    )
 
-    from baserow.contrib.database.views.handler import ViewSubscriptionHandler
-
-    # After a successful periodic update of all fields, we would need to update the
-    # search index for all of them in one function per table to avoid ending up in a
-    # deadlock because rows are updated simultaneously.
-    fields_per_table = defaultdict(list)
-    for field in all_updated_fields:
-        fields_per_table[field.table_id].append(field)
-    for _, fields in fields_per_table.items():
-        SearchHandler().entire_field_values_changed_or_created(fields[0].table, fields)
-
+    for _, views_group in itertools.groupby(
+        views_need_notify, key=lambda v: v.table_id
+    ):
         with transaction.atomic():
-            ViewSubscriptionHandler().notify_table_views_updates(
-                fields[0].table.view_set.all()
+            ViewSubscriptionHandler.notify_table_views(
+                [view.id for view in views_group]
             )
 
 
@@ -145,14 +179,6 @@ def delete_mentions_marked_for_deletion(self):
     ).delete()
 
 
-@baserow_trace(tracer)
-def _run_periodic_field_update(fields, field_type_instance, all_updated_fields):
-    with transaction.atomic():
-        return field_type_instance.run_periodic_update(
-            fields, already_updated_fields=all_updated_fields
-        )
-
-
 @app.on_after_finalize.connect
 def setup_periodic_tasks(sender, **kwargs):
     sender.add_periodic_task(
diff --git a/backend/src/baserow/contrib/database/management/commands/run_periodic_fields_updates.py b/backend/src/baserow/contrib/database/management/commands/run_periodic_fields_updates.py
index d5a1059fd..1cc76c24c 100644
--- a/backend/src/baserow/contrib/database/management/commands/run_periodic_fields_updates.py
+++ b/backend/src/baserow/contrib/database/management/commands/run_periodic_fields_updates.py
@@ -1,5 +1,4 @@
 from django.core.management.base import BaseCommand
-from django.db import transaction
 
 from baserow.contrib.database.fields.tasks import run_periodic_fields_updates
 
@@ -21,7 +20,6 @@ class Command(BaseCommand):
             help="If set to true all the workspaces will be updated with the current time otherwise the previous `now` value will be used.",
         )
 
-    @transaction.atomic
     def handle(self, *args, **options):
         run_periodic_fields_updates(
             options["workspace_id"], not options["dont_update_now"]
diff --git a/backend/src/baserow/contrib/database/search/handler.py b/backend/src/baserow/contrib/database/search/handler.py
index 88f4cda6d..97a8e8a5c 100644
--- a/backend/src/baserow/contrib/database/search/handler.py
+++ b/backend/src/baserow/contrib/database/search/handler.py
@@ -755,6 +755,38 @@ class SearchHandler(
             updated_fields=updated_fields,
         )
 
+    @classmethod
+    def all_fields_values_changed_or_created(cls, updated_fields: List["Field"]):
+        """
+        Called when field values for a table have been changed or created for an entire
+        field column at once. This is more efficient than calling
+        `entire_field_values_changed_or_created` for each table individually when
+        multiple tables have had field values changed. Please, make sure to
+        select_related the table for the "updated_fields" to avoid N+1 queries.
+
+        :param updated_fields: If only some fields have had values changed then the
+            search vector update can be optimized by providing those here.
+        """
+
+        from baserow.contrib.database.search.tasks import (
+            async_update_multiple_fields_tsvector_columns,
+        )
+        from baserow.contrib.database.tasks import (
+            enqueue_task_on_commit_swallowing_any_exceptions,
+        )
+
+        searchable_updated_fields_ids = [
+            field.id for field in updated_fields if field.table.tsvectors_are_supported
+        ]
+
+        if searchable_updated_fields_ids:
+            enqueue_task_on_commit_swallowing_any_exceptions(
+                lambda: async_update_multiple_fields_tsvector_columns.delay(
+                    field_ids=searchable_updated_fields_ids,
+                    update_tsvs_for_changed_rows_only=False,
+                )
+            )
+
     @classmethod
     def _trigger_async_tsvector_task_if_needed(
         cls,
diff --git a/backend/src/baserow/contrib/database/search/tasks.py b/backend/src/baserow/contrib/database/search/tasks.py
index 187aa803c..fb2b980cf 100644
--- a/backend/src/baserow/contrib/database/search/tasks.py
+++ b/backend/src/baserow/contrib/database/search/tasks.py
@@ -1,3 +1,5 @@
+import itertools
+import traceback
 from typing import List, Optional
 
 from django.conf import settings
@@ -5,9 +7,11 @@ from django.conf import settings
 from loguru import logger
 
 from baserow.config.celery import app
+from baserow.contrib.database.fields.models import Field
 from baserow.contrib.database.search.exceptions import (
     PostgresFullTextSearchDisabledException,
 )
+from baserow.contrib.database.table.models import Table
 
 
 @app.task(
@@ -31,12 +35,63 @@ def async_update_tsvector_columns(
     """
 
     from baserow.contrib.database.search.handler import SearchHandler
-    from baserow.contrib.database.table.handler import TableHandler
 
-    table = TableHandler().get_table(table_id)
+    try:
+        table = Table.objects_and_trash.get(id=table_id)
+    except Table.DoesNotExist:
+        logger.warning(
+            f"Could not find table with id {table_id} for updating tsvector columns."
+        )
+        return
     try:
         SearchHandler.update_tsvector_columns_locked(
             table, update_tsvs_for_changed_rows_only, field_ids_to_restrict_update_to
         )
     except PostgresFullTextSearchDisabledException:
         logger.debug(f"Postgres full-text search is disabled.")
+
+
+@app.task(
+    queue="export",
+    time_limit=settings.CELERY_SEARCH_UPDATE_HARD_TIME_LIMIT,
+)
+def async_update_multiple_fields_tsvector_columns(
+    field_ids: List[int],
+    update_tsvs_for_changed_rows_only: bool,
+):
+    """
+    Responsible for asynchronously updating the `tsvector` columns for all the fields
+    provided.
+
+    :param field_ids: The fields we'd like to update the tsvectors for.
+    :param update_tsvs_for_changed_rows_only: By default we will only update rows on the
+        table which have changed since the last search update. If set to `False`, we
+        will index all cells that match the other parameters.
+    """
+
+    from baserow.contrib.database.search.handler import SearchHandler
+
+    fields = (
+        Field.objects_and_trash.filter(id__in=field_ids)
+        .select_related("table")
+        .order_by("table_id")
+    )
+    for _, field_group in itertools.groupby(fields, lambda f: f.table_id):
+        table_fields = list(field_group)
+        table = table_fields[0].table
+        try:
+            SearchHandler.update_tsvector_columns_locked(
+                table, update_tsvs_for_changed_rows_only, [f.id for f in table_fields]
+            )
+        except PostgresFullTextSearchDisabledException:
+            logger.debug(f"Postgres full-text search is disabled.")
+            break
+        except Exception:
+            tb = traceback.format_exc()
+            field_ids = ", ".join(str(field.id) for field in field_group)
+            logger.error(
+                "Failed to update tsvector columns for fields {field_ids} in table {table_id} because of: \n{tb}.",
+                field_ids=field_ids,
+                table_id=table.id,
+                tb=tb,
+            )
diff --git a/backend/src/baserow/contrib/database/views/handler.py b/backend/src/baserow/contrib/database/views/handler.py
index 14567644e..447d809cb 100644
--- a/backend/src/baserow/contrib/database/views/handler.py
+++ b/backend/src/baserow/contrib/database/views/handler.py
@@ -3936,7 +3936,7 @@ class ViewSubscriptionHandler:
         for _, view_group in itertools.groupby(views, key=lambda f: f.table):
             view_ids = [v.id for v in view_group]
             if view_ids:
-                cls._notify_table_views_updates(view_ids)
+                cls.notify_table_views(view_ids)
 
     @classmethod
     def notify_table_views_updates(
@@ -3955,10 +3955,10 @@ class ViewSubscriptionHandler:
             view__in=views
         ).values_list("view_id", flat=True)
         if view_ids_with_subscribers:
-            cls._notify_table_views_updates(view_ids_with_subscribers, model)
+            cls.notify_table_views(view_ids_with_subscribers, model)
 
     @classmethod
-    def _notify_table_views_updates(
+    def notify_table_views(
         cls, view_ids: list[int], model: GeneratedTableModel | None = None
     ):
         """