From 89fa7ea651c7905dc8fca8a7b7c3c674d6452527 Mon Sep 17 00:00:00 2001 From: Davide Silvestri <davide@baserow.io> Date: Wed, 12 Mar 2025 17:08:38 +0100 Subject: [PATCH] Add payload pagination for webhooks --- backend/src/baserow/config/settings/base.py | 3 + .../builder/locale/en/LC_MESSAGES/django.po | 12 +- backend/src/baserow/contrib/database/apps.py | 2 + .../database/locale/en/LC_MESSAGES/django.po | 20 +- ...0181_tablewebhookcall_batch_id_and_more.py | 24 +++ .../contrib/database/webhooks/exceptions.py | 4 + .../contrib/database/webhooks/models.py | 21 ++ .../database/webhooks/notification_types.py | 130 ++++++++++-- .../contrib/database/webhooks/registries.py | 53 ++++- .../contrib/database/webhooks/tasks.py | 195 +++++++++++------- .../core/locale/en/LC_MESSAGES/django.po | 10 +- .../src/baserow/test_utils/pytest_conftest.py | 9 + .../test_webhook_notification_types.py | 59 +++++- .../database/webhooks/test_webhook_tasks.py | 195 +++++++++++++++++- docker-compose.yml | 1 + docs/installation/configuration.md | 1 + .../locale/en/LC_MESSAGES/django.po | 6 +- .../WebhookPayloadTooLargeNotification.vue | 35 ++++ web-frontend/modules/database/locales/en.json | 3 + .../modules/database/notificationTypes.js | 25 +++ web-frontend/modules/database/plugin.js | 5 + 21 files changed, 677 insertions(+), 136 deletions(-) create mode 100644 backend/src/baserow/contrib/database/migrations/0181_tablewebhookcall_batch_id_and_more.py create mode 100644 web-frontend/modules/database/components/notifications/WebhookPayloadTooLargeNotification.vue diff --git a/backend/src/baserow/config/settings/base.py b/backend/src/baserow/config/settings/base.py index ca7c507aa..4a2d25bbb 100644 --- a/backend/src/baserow/config/settings/base.py +++ b/backend/src/baserow/config/settings/base.py @@ -980,6 +980,7 @@ BASEROW_WEBHOOKS_URL_CHECK_TIMEOUT_SECS = int( BASEROW_MAX_WEBHOOK_CALLS_IN_QUEUE_PER_WEBHOOK = ( int(os.getenv("BASEROW_MAX_WEBHOOK_CALLS_IN_QUEUE_PER_WEBHOOK", "0")) or None ) +BASEROW_WEBHOOKS_BATCH_LIMIT = int(os.getenv("BASEROW_WEBHOOKS_BATCH_LIMIT", 5)) # ======== WARNING ======== # Please read and understand everything at: @@ -1265,7 +1266,9 @@ BASEROW_MAX_HEALTHY_CELERY_QUEUE_SIZE = int( BASEROW_USE_LOCAL_CACHE = str_to_bool(os.getenv("BASEROW_USE_LOCAL_CACHE", "true")) + # -- CACHALOT SETTINGS -- + CACHALOT_TIMEOUT = int(os.getenv("BASEROW_CACHALOT_TIMEOUT", 60 * 60 * 24 * 7)) BASEROW_CACHALOT_ONLY_CACHABLE_TABLES = os.getenv( "BASEROW_CACHALOT_ONLY_CACHABLE_TABLES", None diff --git a/backend/src/baserow/contrib/builder/locale/en/LC_MESSAGES/django.po b/backend/src/baserow/contrib/builder/locale/en/LC_MESSAGES/django.po index d5b6ada07..3b1d5f76a 100644 --- a/backend/src/baserow/contrib/builder/locale/en/LC_MESSAGES/django.po +++ b/backend/src/baserow/contrib/builder/locale/en/LC_MESSAGES/django.po @@ -8,7 +8,7 @@ msgid "" msgstr "" "Project-Id-Version: PACKAGE VERSION\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2025-02-11 17:20+0000\n" +"POT-Creation-Date: 2025-03-05 11:01+0000\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" "Language-Team: LANGUAGE <LL@li.org>\n" @@ -46,18 +46,18 @@ msgstr "" msgid "Last name" msgstr "" -#: src/baserow/contrib/builder/data_providers/data_provider_types.py:555 +#: src/baserow/contrib/builder/data_providers/data_provider_types.py:563 #, python-format msgid "%(user_source_name)s member" msgstr "" -#: src/baserow/contrib/builder/data_sources/service.py:154 +#: src/baserow/contrib/builder/data_sources/service.py:158 msgid "Data source" msgstr "" -#: src/baserow/contrib/builder/elements/mixins.py:578 -#: src/baserow/contrib/builder/elements/mixins.py:583 -#: src/baserow/contrib/builder/elements/mixins.py:588 +#: src/baserow/contrib/builder/elements/mixins.py:586 +#: src/baserow/contrib/builder/elements/mixins.py:591 +#: src/baserow/contrib/builder/elements/mixins.py:596 #, python-format msgid "Column %(count)s" msgstr "" diff --git a/backend/src/baserow/contrib/database/apps.py b/backend/src/baserow/contrib/database/apps.py index 2ed619809..5ddeb7995 100755 --- a/backend/src/baserow/contrib/database/apps.py +++ b/backend/src/baserow/contrib/database/apps.py @@ -957,6 +957,7 @@ class DatabaseConfig(AppConfig): ) from baserow.contrib.database.webhooks.notification_types import ( WebhookDeactivatedNotificationType, + WebhookPayloadTooLargeNotificationType, ) from baserow.core.notifications.registries import notification_type_registry @@ -966,6 +967,7 @@ class DatabaseConfig(AppConfig): ) notification_type_registry.register(FormSubmittedNotificationType()) notification_type_registry.register(WebhookDeactivatedNotificationType()) + notification_type_registry.register(WebhookPayloadTooLargeNotificationType()) # The signals must always be imported last because they use the registries # which need to be filled first. diff --git a/backend/src/baserow/contrib/database/locale/en/LC_MESSAGES/django.po b/backend/src/baserow/contrib/database/locale/en/LC_MESSAGES/django.po index 75eb3173a..70228de7c 100644 --- a/backend/src/baserow/contrib/database/locale/en/LC_MESSAGES/django.po +++ b/backend/src/baserow/contrib/database/locale/en/LC_MESSAGES/django.po @@ -8,7 +8,7 @@ msgid "" msgstr "" "Project-Id-Version: PACKAGE VERSION\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2025-02-11 17:20+0000\n" +"POT-Creation-Date: 2025-03-05 11:01+0000\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" "Language-Team: LANGUAGE <LL@li.org>\n" @@ -149,7 +149,7 @@ msgid "" msgstr "" #: src/baserow/contrib/database/fields/models.py:453 -#: src/baserow/contrib/database/fields/models.py:632 +#: src/baserow/contrib/database/fields/models.py:641 msgid "The format of the duration." msgstr "" @@ -646,14 +646,26 @@ msgid "" "to %(webhook_url)s\" updated" msgstr "" -#: src/baserow/contrib/database/webhooks/notification_types.py:70 +#: src/baserow/contrib/database/webhooks/notification_types.py:90 #, python-format msgid "%(name)s webhook has been deactivated." msgstr "" -#: src/baserow/contrib/database/webhooks/notification_types.py:77 +#: src/baserow/contrib/database/webhooks/notification_types.py:97 #, python-format msgid "" "The webhook failed more than %(max_failures)s consecutive times and was " "therefore deactivated." msgstr "" + +#: src/baserow/contrib/database/webhooks/notification_types.py:147 +#, python-format +msgid "%(name)s webhook payload too large." +msgstr "" + +#: src/baserow/contrib/database/webhooks/notification_types.py:154 +#, python-format +msgid "" +"The webhook couldn't send all the data because it reaches the maximum number " +"of batches of %(batch_limit)s." +msgstr "" diff --git a/backend/src/baserow/contrib/database/migrations/0181_tablewebhookcall_batch_id_and_more.py b/backend/src/baserow/contrib/database/migrations/0181_tablewebhookcall_batch_id_and_more.py new file mode 100644 index 000000000..bcbad367e --- /dev/null +++ b/backend/src/baserow/contrib/database/migrations/0181_tablewebhookcall_batch_id_and_more.py @@ -0,0 +1,24 @@ +# Generated by Django 5.0.9 on 2025-03-04 18:18 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("database", "0180_view_allow_public_export"), + ] + + operations = [ + migrations.AddField( + model_name="tablewebhookcall", + name="batch_id", + field=models.PositiveIntegerField( + help_text="The batch ID for this call. Null if not part of a batch. Used for batching multiple calls of the same event_id due to large data.", + null=True, + ), + ), + migrations.AlterUniqueTogether( + name="tablewebhookcall", + unique_together={("event_id", "batch_id", "webhook", "event_type")}, + ), + ] diff --git a/backend/src/baserow/contrib/database/webhooks/exceptions.py b/backend/src/baserow/contrib/database/webhooks/exceptions.py index 19c7db54b..e4eae01a0 100644 --- a/backend/src/baserow/contrib/database/webhooks/exceptions.py +++ b/backend/src/baserow/contrib/database/webhooks/exceptions.py @@ -20,3 +20,7 @@ class TableWebhookEventConfigFieldNotInTable(Exception): class SkipWebhookCall(Exception): """Raised when the webhook call must be skipped""" + + +class WebhookPayloadTooLarge(Exception): + """Raised when the webhook payload is too large and exceeds the batches limit.""" diff --git a/backend/src/baserow/contrib/database/webhooks/models.py b/backend/src/baserow/contrib/database/webhooks/models.py index f2d4ecfce..6d7887913 100644 --- a/backend/src/baserow/contrib/database/webhooks/models.py +++ b/backend/src/baserow/contrib/database/webhooks/models.py @@ -1,5 +1,6 @@ import uuid +from django.conf import settings from django.core.validators import MaxLengthValidator from django.db import models @@ -58,6 +59,18 @@ class TableWebhook(CreatedAndUpdatedOnMixin, models.Model): def header_dict(self): return {header.name: header.value for header in self.headers.all()} + @property + def batch_limit(self) -> int: + """ + This value will be used to limit the amount batches a single webhook can make to + paginate the payload. If the payload is too large to be sent in one go, the + event_type can split it into multiple batches. If the number of batches exceeds + this limit, a notification will be sent to workspace admins informing them that + the webhook couldn't send all the data. + """ + + return settings.BASEROW_WEBHOOKS_BATCH_LIMIT + class Meta: ordering = ("id",) @@ -90,6 +103,13 @@ class TableWebhookCall(models.Model): editable=False, help_text="Event ID where the call originated from.", ) + batch_id = models.PositiveIntegerField( + null=True, + help_text=( + "The batch ID for this call. Null if not part of a batch. " + "Used for batching multiple calls of the same event_id due to large data." + ), + ) webhook = models.ForeignKey( TableWebhook, related_name="calls", on_delete=models.CASCADE ) @@ -111,3 +131,4 @@ class TableWebhookCall(models.Model): class Meta: ordering = ("-called_time",) + unique_together = ("event_id", "batch_id", "webhook", "event_type") diff --git a/backend/src/baserow/contrib/database/webhooks/notification_types.py b/backend/src/baserow/contrib/database/webhooks/notification_types.py index e11c15d52..df0f5688b 100644 --- a/backend/src/baserow/contrib/database/webhooks/notification_types.py +++ b/backend/src/baserow/contrib/database/webhooks/notification_types.py @@ -1,10 +1,14 @@ from dataclasses import asdict, dataclass -from typing import List, Optional +from typing import List from django.conf import settings from django.utils.translation import gettext as _ -from baserow.core.models import WORKSPACE_USER_PERMISSION_ADMIN, WorkspaceUser +from baserow.core.models import ( + WORKSPACE_USER_PERMISSION_ADMIN, + Workspace, + WorkspaceUser, +) from baserow.core.notifications.handler import NotificationHandler from baserow.core.notifications.models import NotificationRecipient from baserow.core.notifications.registries import ( @@ -32,6 +36,36 @@ class DeactivatedWebhookData: ) +def notify_admins_in_workspace( + workspace: Workspace, notification_type: str, data: dict +) -> List[NotificationRecipient]: + """ + Notifies all admins in the workspace about an important event, such as a webhook + deactivation or a payload exceeding size limits. + + :param workspace: The workspace whose admins will be notified. + :param notification_type: The type of notification to send. + :param data: The data to include in the notification. + :return: A list of created notification recipients. + """ + + admins_workspace_users = WorkspaceUser.objects.filter( + workspace=workspace, + permissions=WORKSPACE_USER_PERMISSION_ADMIN, + user__profile__to_be_deleted=False, + user__is_active=True, + ).select_related("user") + admins_in_workspace = [admin.user for admin in admins_workspace_users] + + return NotificationHandler.create_direct_notification_for_users( + notification_type=notification_type, + recipients=admins_in_workspace, + data=data, + sender=None, + workspace=workspace, + ) + + class WebhookDeactivatedNotificationType(EmailNotificationTypeMixin, NotificationType): type = "webhook_deactivated" has_web_frontend_route = True @@ -39,30 +73,18 @@ class WebhookDeactivatedNotificationType(EmailNotificationTypeMixin, Notificatio @classmethod def notify_admins_in_workspace( cls, webhook: TableWebhook - ) -> Optional[List[NotificationRecipient]]: + ) -> List[NotificationRecipient]: """ - Creates a notification for each user that is subscribed to receive comments on - the row on which the comment was created. + Creates a notification of this type for each admin in the workspace that the + webhook belongs to. - :param webhook: The comment that was created. - :return: + :param webhook: The webhook that was deactivated. + :return: A list of notification recipients that have been created. """ workspace = webhook.table.database.workspace - admins_workspace_users = WorkspaceUser.objects.filter( - workspace=workspace, - permissions=WORKSPACE_USER_PERMISSION_ADMIN, - user__profile__to_be_deleted=False, - user__is_active=True, - ).select_related("user") - admins_in_workspace = [admin.user for admin in admins_workspace_users] - - return NotificationHandler.create_direct_notification_for_users( - notification_type=WebhookDeactivatedNotificationType.type, - recipients=admins_in_workspace, - data=asdict(DeactivatedWebhookData.from_webhook(webhook)), - sender=None, - workspace=webhook.table.database.workspace, + return notify_admins_in_workspace( + workspace, cls.type, asdict(DeactivatedWebhookData.from_webhook(webhook)) ) @classmethod @@ -79,3 +101,69 @@ class WebhookDeactivatedNotificationType(EmailNotificationTypeMixin, Notificatio ) % { "max_failures": settings.BASEROW_WEBHOOKS_MAX_CONSECUTIVE_TRIGGER_FAILURES, } + + +@dataclass +class WebhookPayloadTooLargeData: + webhook_id: int + table_id: int + database_id: int + webhook_name: str + event_id: str + batch_limit: int + + @classmethod + def from_webhook(cls, webhook: TableWebhook, event_id: str): + return cls( + webhook_id=webhook.id, + table_id=webhook.table_id, + database_id=webhook.table.database_id, + webhook_name=webhook.name, + event_id=event_id, + batch_limit=webhook.batch_limit, + ) + + +class WebhookPayloadTooLargeNotificationType( + EmailNotificationTypeMixin, NotificationType +): + type = "webhook_payload_too_large" + has_web_frontend_route = True + + @classmethod + def notify_admins_in_workspace( + cls, webhook: TableWebhook, event_id: str + ) -> List[NotificationRecipient]: + """ + Creates a notification of this type for each admin in the workspace that the + webhook belongs to. + + :param webhook: The webhook trying to send a payload that is too large. + :param event_id: The event id that triggered the notification. + :return: A list of notification recipients that have been created. + """ + + workspace = webhook.table.database.workspace + return notify_admins_in_workspace( + workspace, + cls.type, + asdict(WebhookPayloadTooLargeData.from_webhook(webhook, event_id)), + ) + + @classmethod + def get_notification_title_for_email(cls, notification, context): + return _("%(name)s webhook payload too large.") % { + "name": notification.data["webhook_name"], + } + + @classmethod + def get_notification_description_for_email(cls, notification, context): + return _( + "The payload for the %(name)s webhook with event ID %(event_id)s " + "was too large. The content has been split into multiple batches, but " + "data above the batch limit of %(batch_limit)s was discarded." + ) % { + "name": notification.data["webhook_name"], + "event_id": notification.data["event_id"], + "batch_limit": notification.data["batch_limit"], + } diff --git a/backend/src/baserow/contrib/database/webhooks/registries.py b/backend/src/baserow/contrib/database/webhooks/registries.py index 7e8b88d9b..2cecb905f 100644 --- a/backend/src/baserow/contrib/database/webhooks/registries.py +++ b/backend/src/baserow/contrib/database/webhooks/registries.py @@ -7,7 +7,7 @@ from django.dispatch.dispatcher import Signal from baserow.contrib.database.table.models import Table from baserow.core.registry import Instance, ModelRegistryMixin, Registry -from .exceptions import SkipWebhookCall +from .exceptions import SkipWebhookCall, WebhookPayloadTooLarge from .tasks import call_webhook @@ -15,8 +15,8 @@ class WebhookEventType(Instance): """ This class represents a custom webhook event type that can be added to the webhook event type registry. Each registered event type needs to set a django signal on - which it will listen on. Upon initialization the webhook event type will connect - to the django signal. + which it will listen on. Upon initialization the webhook event type will connect to + the django signal. The 'listener' function will be called for every received signal. The listener will generate a unique ID for every received signal, find all webhooks that need to be @@ -107,6 +107,51 @@ class WebhookEventType(Instance): transaction.on_commit(lambda: self.listener_after_commit(**kwargs)) + def _paginate_payload(self, payload) -> tuple[dict, dict | None]: + """ + This method is called in the celery task and can be overwritten to paginate the + payload, if it's too large to send all the data at once. The default + implementation returns the payload and None as the next cursor, but if the + payload is too large to be sent in one go, this method can be used to return a + part of the payload and the remaining part as the next cursor. Proper `batch_id` + values will be added to the payload by the caller to keep track of the current + batch. + + :param payload: The payload that must be paginated. + :return: A tuple containing the payload to be sent and the remaining payload for + the next batch if any or None. + """ + + return payload, None + + def paginate_payload(self, webhook, event_id, payload) -> tuple[dict, dict | None]: + """ + This method calls the `_paginate_payload` method and adds a `batch_id` to the + payload if the remaining payload is not None. The `batch_id` is used to keep + track of the current batch of the payload. + + :param webhook: The webhook object related to the call. + :param event_id: The unique uuid event id of the event that triggered the call. + :param payload: The payload that must be paginated. + :return: A tuple containing the payload to be sent and the remaining payload for + the next batch if any or None. + """ + + batch_id = int(payload.get("batch_id", None) or 1) + if webhook.batch_limit > 0 and batch_id > webhook.batch_limit: + raise WebhookPayloadTooLarge( + f"Payload for event '{self.type}' (event_id: '{event_id}') exceeds " + f"the batch limit of ({webhook.batch_limit} batches)." + ) + + prepared_payload, remaining_payload = self._paginate_payload(payload) + + if remaining_payload is not None: + prepared_payload["batch_id"] = batch_id + remaining_payload["batch_id"] = batch_id + 1 + + return prepared_payload, remaining_payload + def listener_after_commit(self, **kwargs): """ Called after the signal is triggered and the transaction commits. By default it @@ -145,7 +190,7 @@ class WebhookEventType(Instance): pass -class WebhookEventTypeRegistry(ModelRegistryMixin, Registry): +class WebhookEventTypeRegistry(ModelRegistryMixin, Registry[WebhookEventType]): name = "webhook_event" diff --git a/backend/src/baserow/contrib/database/webhooks/tasks.py b/backend/src/baserow/contrib/database/webhooks/tasks.py index e4bec1d88..b53cbb85c 100644 --- a/backend/src/baserow/contrib/database/webhooks/tasks.py +++ b/backend/src/baserow/contrib/database/webhooks/tasks.py @@ -1,3 +1,4 @@ +from copy import deepcopy from datetime import datetime, timezone from django.conf import settings @@ -8,6 +9,10 @@ from django.db.utils import OperationalError from loguru import logger from baserow.config.celery import app +from baserow.contrib.database.webhooks.exceptions import WebhookPayloadTooLarge +from baserow.contrib.database.webhooks.notification_types import ( + WebhookPayloadTooLargeNotificationType, +) from baserow.core.redis import RedisQueue @@ -83,30 +88,18 @@ def call_webhook( can still measure this. """ - from advocate import UnacceptableAddressException - from requests import RequestException - - from .handler import WebhookHandler - from .models import TableWebhook, TableWebhookCall - from .notification_types import WebhookDeactivatedNotificationType + from .models import TableWebhook + from .registries import webhook_event_type_registry if self.request.retries > retries: retries = self.request.retries try: with transaction.atomic(): - handler = WebhookHandler() - try: webhook = TableWebhook.objects.select_for_update( - of=("self",), - nowait=True, - ).get( - id=webhook_id, - # If a webhook is not active anymore, then it should not be - # executed. - active=True, - ) + of=("self",), nowait=True + ).get(id=webhook_id, active=True) except TableWebhook.DoesNotExist: # If the webhook has been deleted or disabled while executing, we don't # want to continue making calls the URL because we can't update the @@ -128,73 +121,36 @@ def call_webhook( else: raise e - request = None - response = None - success = False - error = "" - + # Paginate the payload if necessary and enqueue the remaining data. + webhook_event_type = webhook_event_type_registry.get(event_type) try: - request, response = handler.make_request(method, url, headers, payload) - success = response.ok - except RequestException as exception: - request = exception.request - response = exception.response - error = str(exception) - except UnacceptableAddressException as exception: - error = f"UnacceptableAddressException: {exception}" - - TableWebhookCall.objects.update_or_create( - event_id=event_id, - event_type=event_type, - webhook=webhook, - defaults={ - "called_time": datetime.now(tz=timezone.utc), - "called_url": url, - "request": handler.format_request(request) - if request is not None - else None, - "response": handler.format_response(response) - if response is not None - else None, - "response_status": response.status_code - if response is not None - else None, - "error": error, - }, - ) - handler.clean_webhook_calls(webhook) - - if success and webhook.failed_triggers != 0: - # If the call was successful and failed triggers had been increased in - # the past, we can safely reset it to 0 again to prevent deactivation of - # the webhook. - webhook.failed_triggers = 0 - webhook.save() - elif not success and ( - webhook.failed_triggers - < settings.BASEROW_WEBHOOKS_MAX_CONSECUTIVE_TRIGGER_FAILURES - ): - # If the task has reached the maximum amount of failed calls, we're - # going to give up and increase the total failed triggers of the webhook - # if we're still operating within the limits of the max consecutive - # trigger failures. - webhook.failed_triggers += 1 - webhook.save() - elif not success: - # If webhook has reached the maximum amount of failed triggers, - # we're going to deactivate it because we can reasonably assume that the - # target doesn't listen anymore. At this point we've tried 8 * 10 times. - # The user can manually activate it again when it's fixed. - webhook.active = False - webhook.save() - - # Send a notification to the workspace admins that the webhook was - # deactivated. + payload, remaining = webhook_event_type.paginate_payload( + webhook, event_id, deepcopy(payload) + ) + except WebhookPayloadTooLarge: + success = True # We don't want to retry this call, because it will fail again. transaction.on_commit( - lambda: WebhookDeactivatedNotificationType.notify_admins_in_workspace( - webhook + lambda: WebhookPayloadTooLargeNotificationType.notify_admins_in_workspace( + webhook, event_id ) ) + else: + success = make_request_and_save_result( + webhook, event_id, event_type, method, url, headers, payload + ) + # enqueue the next call if there is still remaining payload + if success and remaining is not None: + args = ( + webhook_id, + event_id, + event_type, + method, + url, + headers, + remaining, + ) + kwargs = {"retries": 0} + enqueue_webhook_task(webhook_id, event_id, args, kwargs) # After the transaction successfully commits we can delay the next call # in the queue, so that only one call is triggered concurrently. @@ -216,3 +172,84 @@ def call_webhook( kwargs = self.request.kwargs or {} kwargs["retries"] = retries + 1 self.retry(countdown=2**retries, kwargs=kwargs) + + +def make_request_and_save_result( + webhook, event_id, event_type, method, url, headers, payload +): + from advocate import UnacceptableAddressException + from requests import RequestException + + from .handler import WebhookHandler + from .models import TableWebhookCall + from .notification_types import WebhookDeactivatedNotificationType + + handler = WebhookHandler() + + request = None + response = None + success = False + error = "" + + try: + request, response = handler.make_request(method, url, headers, payload) + success = response.ok + except RequestException as exception: + request = exception.request + response = exception.response + error = str(exception) + except UnacceptableAddressException as exception: + error = f"UnacceptableAddressException: {exception}" + + TableWebhookCall.objects.update_or_create( + event_id=event_id, + batch_id=payload.get("batch_id", None), + event_type=event_type, + webhook=webhook, + defaults={ + "called_time": datetime.now(tz=timezone.utc), + "called_url": url, + "request": handler.format_request(request) if request is not None else None, + "response": handler.format_response(response) + if response is not None + else None, + "response_status": response.status_code if response is not None else None, + "error": error, + }, + ) + handler.clean_webhook_calls(webhook) + + if success: + if webhook.failed_triggers != 0: + # If the call was successful and failed triggers had been increased + # in the past, we can safely reset it to 0 again to prevent + # deactivation of the webhook. + webhook.failed_triggers = 0 + webhook.save() + elif ( + webhook.failed_triggers + < settings.BASEROW_WEBHOOKS_MAX_CONSECUTIVE_TRIGGER_FAILURES + ): + # If the task has reached the maximum amount of failed calls, we're + # going to give up and increase the total failed triggers of the webhook + # if we're still operating within the limits of the max consecutive + # trigger failures. + webhook.failed_triggers += 1 + webhook.save() + else: + # If webhook has reached the maximum amount of failed triggers, we're + # going to deactivate it because we can reasonably assume that the + # target doesn't listen anymore. At this point we've tried 8 * 10 times. + # The user can manually activate it again when it's fixed. + webhook.active = False + webhook.save() + + # Send a notification to the workspace admins that the webhook was + # deactivated. + transaction.on_commit( + lambda: WebhookDeactivatedNotificationType.notify_admins_in_workspace( + webhook + ) + ) + + return success diff --git a/backend/src/baserow/core/locale/en/LC_MESSAGES/django.po b/backend/src/baserow/core/locale/en/LC_MESSAGES/django.po index 7a41c9d85..dc8ef21d6 100644 --- a/backend/src/baserow/core/locale/en/LC_MESSAGES/django.po +++ b/backend/src/baserow/core/locale/en/LC_MESSAGES/django.po @@ -8,7 +8,7 @@ msgid "" msgstr "" "Project-Id-Version: PACKAGE VERSION\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2025-02-11 17:20+0000\n" +"POT-Creation-Date: 2025-03-05 11:01+0000\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" "Language-Team: LANGUAGE <LL@li.org>\n" @@ -242,7 +242,7 @@ msgstr "" msgid "Decimal number" msgstr "" -#: src/baserow/core/handler.py:2122 src/baserow/core/user/handler.py:267 +#: src/baserow/core/handler.py:2130 src/baserow/core/user/handler.py:267 #, python-format msgid "%(name)s's workspace" msgstr "" @@ -305,11 +305,7 @@ msgid_plural "Plus %(counter)s more notifications." msgstr[0] "" msgstr[1] "" -#: src/baserow/core/templates/baserow/core/notifications_summary.html:239 -msgid "View in Baserow" -msgstr "" - -#: src/baserow/core/templates/baserow/core/notifications_summary.html:253 +#: src/baserow/core/templates/baserow/core/notifications_summary.html:235 #: src/baserow/core/templates/baserow/core/user/account_deleted.html:186 #: src/baserow/core/templates/baserow/core/user/account_deletion_cancelled.html:186 #: src/baserow/core/templates/baserow/core/user/account_deletion_scheduled.html:191 diff --git a/backend/src/baserow/test_utils/pytest_conftest.py b/backend/src/baserow/test_utils/pytest_conftest.py index c69b7cf3e..e92f52fc8 100755 --- a/backend/src/baserow/test_utils/pytest_conftest.py +++ b/backend/src/baserow/test_utils/pytest_conftest.py @@ -30,6 +30,7 @@ from sqlparse import format from baserow.contrib.database.application_types import DatabaseApplicationType from baserow.contrib.database.fields.fields import SerialField from baserow.contrib.database.fields.models import LinkRowField +from baserow.contrib.database.webhooks.registries import webhook_event_type_registry from baserow.core.cache import local_cache from baserow.core.context import clear_current_workspace_id from baserow.core.exceptions import PermissionDenied @@ -233,6 +234,14 @@ def mutable_job_type_registry(): job_type_registry.get_for_class.cache_clear() +@pytest.fixture() +def mutable_webhook_event_type_registry(): + with patch.object(webhook_event_type_registry, "registry", {}): + webhook_event_type_registry.get_for_class.cache_clear() + yield webhook_event_type_registry + webhook_event_type_registry.get_for_class.cache_clear() + + @pytest.fixture() def stub_user_source_registry(data_fixture, mutable_user_source_registry, fake): from baserow.core.user_sources.registries import UserSourceType diff --git a/backend/tests/baserow/contrib/database/webhooks/test_webhook_notification_types.py b/backend/tests/baserow/contrib/database/webhooks/test_webhook_notification_types.py index 55b545e90..496096f06 100644 --- a/backend/tests/baserow/contrib/database/webhooks/test_webhook_notification_types.py +++ b/backend/tests/baserow/contrib/database/webhooks/test_webhook_notification_types.py @@ -1,20 +1,21 @@ +from django.test import override_settings + import pytest from baserow.contrib.database.webhooks.notification_types import ( WebhookDeactivatedNotificationType, + WebhookPayloadTooLargeNotificationType, ) -@pytest.mark.django_db(transaction=True) -def test_webhook_deactivated_notification_can_be_render_as_email( - api_client, data_fixture -): +@pytest.mark.django_db +def test_webhook_deactivated_notification_can_be_render_as_email(data_fixture): user = data_fixture.create_user() workspace = data_fixture.create_workspace(user=user) database = data_fixture.create_database_application(workspace=workspace) table = data_fixture.create_database_table(database=database) webhook = data_fixture.create_table_webhook( - table=table, active=True, failed_triggers=1 + table=table, active=True, failed_triggers=1, name="test" ) notification_recipients = ( @@ -22,11 +23,12 @@ def test_webhook_deactivated_notification_can_be_render_as_email( ) notification = notification_recipients[0].notification - assert WebhookDeactivatedNotificationType.get_notification_title_for_email( - notification, {} - ) == "%(name)s webhook has been deactivated." % { - "name": notification.data["webhook_name"], - } + assert ( + WebhookDeactivatedNotificationType.get_notification_title_for_email( + notification, {} + ) + == "test webhook has been deactivated." + ) assert ( WebhookDeactivatedNotificationType.get_notification_description_for_email( @@ -35,3 +37,40 @@ def test_webhook_deactivated_notification_can_be_render_as_email( == "The webhook failed more than 8 consecutive times and " "was therefore deactivated." ) + + +@pytest.mark.django_db +@override_settings(BASEROW_WEBHOOKS_BATCH_LIMIT=1) +def test_webhook_payload_too_large_can_be_render_as_email(data_fixture): + user = data_fixture.create_user() + workspace = data_fixture.create_workspace(user=user) + database = data_fixture.create_database_application(workspace=workspace) + table = data_fixture.create_database_table(database=database) + webhook = data_fixture.create_table_webhook( + table=table, active=True, failed_triggers=1, name="test" + ) + + notification_recipients = ( + WebhookPayloadTooLargeNotificationType.notify_admins_in_workspace( + webhook, "123" + ) + ) + notification = notification_recipients[0].notification + + email_title = ( + WebhookPayloadTooLargeNotificationType.get_notification_title_for_email( + notification, {} + ) + ) + assert email_title == "test webhook payload too large." + + email_descr = ( + WebhookPayloadTooLargeNotificationType.get_notification_description_for_email( + notification, {} + ) + ) + assert email_descr == ( + "The payload for the test webhook with event ID 123 " + "was too large. The content has been split into multiple batches, but " + "data above the batch limit of 1 was discarded." + ) diff --git a/backend/tests/baserow/contrib/database/webhooks/test_webhook_tasks.py b/backend/tests/baserow/contrib/database/webhooks/test_webhook_tasks.py index 08ccb63b5..7ea3f115a 100644 --- a/backend/tests/baserow/contrib/database/webhooks/test_webhook_tasks.py +++ b/backend/tests/baserow/contrib/database/webhooks/test_webhook_tasks.py @@ -12,8 +12,14 @@ from celery.exceptions import Retry from baserow.contrib.database.webhooks.models import TableWebhook, TableWebhookCall from baserow.contrib.database.webhooks.notification_types import ( WebhookDeactivatedNotificationType, + WebhookPayloadTooLargeNotificationType, +) +from baserow.contrib.database.webhooks.registries import WebhookEventType +from baserow.contrib.database.webhooks.tasks import ( + call_webhook, + enqueue_webhook_task, + schedule_next_task_in_queue, ) -from baserow.contrib.database.webhooks.tasks import call_webhook from baserow.core.models import WorkspaceUser from baserow.core.notifications.models import Notification from baserow.core.redis import RedisQueue @@ -29,7 +35,7 @@ class MemoryQueue(RedisQueue): def get_and_pop_next(self): try: - self.queues[self.queue_key].pop(0) + return self.queues[self.queue_key].pop(0) except IndexError: return None @@ -430,3 +436,188 @@ def test_call_webhook_failed_reached_notification_send( "webhook_id": webhook.id, "webhook_name": webhook.name, } + + # the webhook should be deactivated after the notification is sent. + webhook.refresh_from_db() + assert webhook.active is False + + +class PaginatedWebhookEventType(WebhookEventType): + type = "test.paginated" + + def __init__(self): + self.i = 1 + + def _paginate_payload(self, payload) -> tuple[dict, dict | None]: + payload["data"] = f"part {self.i}" + self.i += 1 + return payload, {"data": f"part {self.i}"} + + +@pytest.mark.django_db(transaction=True) +@responses.activate +@patch("baserow.contrib.database.webhooks.tasks.RedisQueue", MemoryQueue) +@patch("baserow.contrib.database.webhooks.tasks.cache", MagicMock()) +def test_webhook_with_paginated_payload( + mutable_webhook_event_type_registry, data_fixture +): + mutable_webhook_event_type_registry.register(PaginatedWebhookEventType()) + + webhook = data_fixture.create_table_webhook() + responses.add(responses.POST, "http://localhost/", json={}, status=200) + event_id = "00000000-0000-0000-0000-000000000002" + expected_payload = ( + webhook.id, + event_id, + "test.paginated", + "POST", + "http://localhost/", + {}, + {"batch_id": 2, "data": "part 2"}, + ) + + # The first page of the payload is sent and contains the batch_id 1. + with patch( + "baserow.contrib.database.webhooks.tasks.enqueue_webhook_task" + ) as mock_enqueue, patch( + "baserow.contrib.database.webhooks.tasks.schedule_next_task_in_queue" + ) as mock_schedule: + call_webhook( + webhook_id=webhook.id, + event_id=event_id, + event_type="test.paginated", + method="POST", + url="http://localhost/", + headers={}, + payload={}, + ) + + assert mock_enqueue.call_args[0][2] == expected_payload + mock_schedule.assert_called_with(webhook.id) + + assert TableWebhookCall.objects.all().count() == 1 + assert TableWebhookCall.objects.filter(event_id=event_id).first().batch_id == 1 + + # we mocked this function to ensure the enqueued payload is correct, now if we call + # the function again, we should see the next batch_id being sent. + enqueue_webhook_task(webhook.id, event_id, expected_payload, {}) + + with patch( + "baserow.contrib.database.webhooks.tasks.enqueue_webhook_task" + ) as mock_enqueue: + schedule_next_task_in_queue(webhook.id) + assert mock_enqueue.call_args[0][2] == ( + webhook.id, + event_id, + "test.paginated", + "POST", + "http://localhost/", + {}, + {"batch_id": 3, "data": "part 3"}, + ) + assert TableWebhookCall.objects.all().count() == 2 + # Same event_id, but different batch_id. + assert TableWebhookCall.objects.filter(event_id=event_id).first().batch_id == 2 + + +@pytest.mark.django_db(transaction=True) +@responses.activate +@override_settings(BASEROW_WEBHOOKS_BATCH_LIMIT=1) +@patch("baserow.contrib.database.webhooks.tasks.RedisQueue", MemoryQueue) +@patch("baserow.contrib.database.webhooks.tasks.cache", MagicMock()) +@patch("baserow.ws.tasks.broadcast_to_users.apply") +def test_call_webhook_payload_too_large_send_notification( + mocked_broadcast_to_users, mutable_webhook_event_type_registry, data_fixture +): + user_1 = data_fixture.create_user() + user_2 = data_fixture.create_user() + admin_1 = data_fixture.create_user() + admin_2 = data_fixture.create_user() + workspace = data_fixture.create_workspace() + + WorkspaceUser.objects.create( + user=user_1, workspace=workspace, order=1, permissions="MEMBER" + ) + WorkspaceUser.objects.create( + user=user_2, workspace=workspace, order=2, permissions="MEMBER" + ) + WorkspaceUser.objects.create( + user=admin_1, workspace=workspace, order=3, permissions="ADMIN" + ) + WorkspaceUser.objects.create( + user=admin_2, workspace=workspace, order=4, permissions="ADMIN" + ) + + database = data_fixture.create_database_application(workspace=workspace) + table = data_fixture.create_database_table(database=database) + + mutable_webhook_event_type_registry.register(PaginatedWebhookEventType()) + webhook = data_fixture.create_table_webhook(table=table, active=True) + responses.add(responses.POST, "http://localhost/", json={}, status=200) + event_id = "00000000-0000-0000-0000-000000000002" + expected_payload = ( + webhook.id, + event_id, + "test.paginated", + "POST", + "http://localhost/", + {}, + {"batch_id": 2, "data": "part 2"}, + ) + + # The first page of the payload is sent and contains the batch_id 1. + with patch( + "baserow.contrib.database.webhooks.tasks.enqueue_webhook_task" + ) as mock_enqueue, patch( + "baserow.contrib.database.webhooks.tasks.schedule_next_task_in_queue" + ) as mock_schedule: + call_webhook( + webhook_id=webhook.id, + event_id=event_id, + event_type="test.paginated", + method="POST", + url="http://localhost/", + headers={}, + payload={}, + ) + + assert mock_enqueue.call_args[0][2] == expected_payload + mock_schedule.assert_called_with(webhook.id) + + assert TableWebhookCall.objects.all().count() == 1 + assert TableWebhookCall.objects.filter(event_id=event_id).first().batch_id == 1 + + # The second part of the payload exceeds the batch limit of 1. Therefore, it should + # not send the data but should trigger a notification. + enqueue_webhook_task(webhook.id, event_id, expected_payload, {}) + schedule_next_task_in_queue(webhook.id) + + # No new call should be made. + assert TableWebhookCall.objects.all().count() == 1 + + all_notifications = list(Notification.objects.all()) + assert len(all_notifications) == 1 + recipient_ids = [r.id for r in all_notifications[0].recipients.all()] + assert recipient_ids == [admin_1.id, admin_2.id] + assert all_notifications[0].type == WebhookPayloadTooLargeNotificationType.type + assert all_notifications[0].broadcast is False + assert all_notifications[0].workspace_id == workspace.id + assert all_notifications[0].sender is None + assert all_notifications[0].data == { + "database_id": database.id, + "table_id": table.id, + "webhook_id": webhook.id, + "webhook_name": webhook.name, + "event_id": event_id, + "batch_limit": 1, + } + + # The webhook should still be active, but the queue should be empty. + webhook.refresh_from_db() + assert webhook.active is True + + with patch("baserow.contrib.database.webhooks.tasks.call_webhook.delay") as mock: + schedule_next_task_in_queue(webhook.id) + mock.assert_not_called() # nothing else has been scheduled. + + assert TableWebhookCall.objects.all().count() == 1 diff --git a/docker-compose.yml b/docker-compose.yml index a4b895d96..72f5f5b3a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -200,6 +200,7 @@ x-backend-variables: &backend-variables BASEROW_ENTERPRISE_PERIODIC_DATA_SYNC_CHECK_INTERVAL_MINUTES: BASEROW_ENTERPRISE_MAX_PERIODIC_DATA_SYNC_CONSECUTIVE_ERRORS: BASEROW_USE_LOCAL_CACHE: + BASEROW_WEBHOOKS_BATCH_LIMIT: services: # A caddy reverse proxy sitting in-front of all the services. Responsible for routing diff --git a/docs/installation/configuration.md b/docs/installation/configuration.md index ca911fec8..fdb269d73 100644 --- a/docs/installation/configuration.md +++ b/docs/installation/configuration.md @@ -120,6 +120,7 @@ The installation methods referred to in the variable descriptions are: | BASEROW\_WEBHOOKS\_MAX\_CALL\_LOG\_ENTRIES | The maximum number of call log entries stored per webhook. | 10 | | BASEROW\_WEBHOOKS\_REQUEST\_TIMEOUT\_SECONDS | How long to wait on making the webhook request before timing out. | 5 | | BASEROW\_MAX\_WEBHOOK\_CALLS\_IN\_QUEUE\_PER\_WEBHOOK | Maximum number of calls that can be in the webhook's queue. Can be useful to limit when massive numbers of webhooks are triggered due an automation loop. If not set or set to `0`, then there is no limit. | 0 | +| BASEROW\_WEBHOOKS\_BATCH\_LIMIT | This limit applies to all webhook_event_types that split large payloads into multiple batches. Each batch is a separate request with the same event_id and an incremental batch_id. This parameter sets the maximum number of batches per event. Set to 0 for unlimited batches. | 5 | ### Generative AI configuration diff --git a/enterprise/backend/src/baserow_enterprise/locale/en/LC_MESSAGES/django.po b/enterprise/backend/src/baserow_enterprise/locale/en/LC_MESSAGES/django.po index 33f30af25..b2c7c2bc8 100644 --- a/enterprise/backend/src/baserow_enterprise/locale/en/LC_MESSAGES/django.po +++ b/enterprise/backend/src/baserow_enterprise/locale/en/LC_MESSAGES/django.po @@ -8,7 +8,7 @@ msgid "" msgstr "" "Project-Id-Version: PACKAGE VERSION\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2025-02-13 13:04+0000\n" +"POT-Creation-Date: 2025-03-05 11:01+0000\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" "Language-Team: LANGUAGE <LL@li.org>\n" @@ -71,12 +71,12 @@ msgstr "" msgid "Data sync table \"%(table_name)s\" (%(table_id)s) updated" msgstr "" -#: src/baserow_enterprise/data_sync/notification_types.py:63 +#: src/baserow_enterprise/data_sync/notification_types.py:66 #, python-format msgid "%(name)s periodic data sync has been deactivated." msgstr "" -#: src/baserow_enterprise/data_sync/notification_types.py:70 +#: src/baserow_enterprise/data_sync/notification_types.py:73 #, python-format msgid "" "The periodic data sync failed more than %(max_failures)s consecutive " diff --git a/web-frontend/modules/database/components/notifications/WebhookPayloadTooLargeNotification.vue b/web-frontend/modules/database/components/notifications/WebhookPayloadTooLargeNotification.vue new file mode 100644 index 000000000..5c2b5ce94 --- /dev/null +++ b/web-frontend/modules/database/components/notifications/WebhookPayloadTooLargeNotification.vue @@ -0,0 +1,35 @@ +<template> + <nuxt-link + class="notification-panel__notification-link" + :to="route" + @click.native="markAsReadAndHandleClick" + > + <div class="notification-panel__notification-content-title"> + <i18n path="webhookPayloadTooLargeNotification.body" tag="span"> + <template #name> + <strong>{{ notification.data.webhook_name }}</strong> + </template> + <template #event_id> + <strong>{{ notification.data.event_id }}</strong> + </template> + <template #batch_limit> + <strong>{{ notification.data.batch_limit }}</strong> + </template> + </i18n> + </div> + </nuxt-link> +</template> + +<script> +import notificationContent from '@baserow/modules/core/mixins/notificationContent' + +export default { + name: 'WebhookPayloadTooLargeNotification', + mixins: [notificationContent], + methods: { + handleClick() { + this.$emit('close-panel') + }, + }, +} +</script> diff --git a/web-frontend/modules/database/locales/en.json b/web-frontend/modules/database/locales/en.json index a18fc1bd9..c9738e4ec 100644 --- a/web-frontend/modules/database/locales/en.json +++ b/web-frontend/modules/database/locales/en.json @@ -1058,5 +1058,8 @@ }, "webhookDeactivatedNotification": { "body": "{name} webhook has been deactivated because it failed too many times consecutively." + }, + "webhookPayloadTooLargeNotification": { + "body": "The payload for the {name} webhook with event ID {event_id} was too large. The content has been split into multiple batches, but data above the batch limit of {batch_limit} was discarded." } } diff --git a/web-frontend/modules/database/notificationTypes.js b/web-frontend/modules/database/notificationTypes.js index 787153f68..ca2bf162f 100644 --- a/web-frontend/modules/database/notificationTypes.js +++ b/web-frontend/modules/database/notificationTypes.js @@ -4,6 +4,7 @@ import CollaboratorAddedToRowNotification from '@baserow/modules/database/compon import UserMentionInRichTextFieldNotification from '@baserow/modules/database/components/notifications/UserMentionInRichTextFieldNotification' import FormSubmittedNotification from '@baserow/modules/database/components/notifications/FormSubmittedNotification' import WebhookDeactivatedNotification from '@baserow/modules/database/components/notifications/WebhookDeactivatedNotification' +import WebhookPayloadTooLargeNotification from '@baserow/modules/database/components/notifications/WebhookPayloadTooLargeNotification' export class CollaboratorAddedToRowNotificationType extends NotificationType { static getType() { @@ -103,3 +104,27 @@ export class WebhookDeactivatedNotificationType extends NotificationType { } } } + +export class WebhookPayloadTooLargedNotificationType extends NotificationType { + static getType() { + return 'webhook_payload_too_large' + } + + getIconComponent() { + return null + } + + getContentComponent() { + return WebhookPayloadTooLargeNotification + } + + getRoute(notificationData) { + return { + name: 'database-table-open-webhooks', + params: { + databaseId: notificationData.database_id, + tableId: notificationData.table_id, + }, + } + } +} diff --git a/web-frontend/modules/database/plugin.js b/web-frontend/modules/database/plugin.js index 85709d98c..431fec47a 100644 --- a/web-frontend/modules/database/plugin.js +++ b/web-frontend/modules/database/plugin.js @@ -320,6 +320,7 @@ import { FormSubmittedNotificationType, UserMentionInRichTextFieldNotificationType, WebhookDeactivatedNotificationType, + WebhookPayloadTooLargedNotificationType, } from '@baserow/modules/database/notificationTypes' import { HistoryRowModalSidebarType } from '@baserow/modules/database/rowModalSidebarTypes' import { FieldsDataProviderType } from '@baserow/modules/database/dataProviderTypes' @@ -1024,6 +1025,10 @@ export default (context) => { 'notification', new WebhookDeactivatedNotificationType(context) ) + app.$registry.register( + 'notification', + new WebhookPayloadTooLargedNotificationType(context) + ) app.$registry.register( 'rowModalSidebar',