1
0
Fork 0
mirror of https://gitlab.com/bramw/baserow.git synced 2025-04-04 21:25:24 +00:00

Add payload pagination for webhooks

This commit is contained in:
Davide Silvestri 2025-03-12 17:08:38 +01:00
parent 0877bf1c05
commit 89fa7ea651
21 changed files with 677 additions and 136 deletions
backend
src/baserow
config/settings
contrib
core/locale/en/LC_MESSAGES
test_utils
tests/baserow/contrib/database/webhooks
docker-compose.yml
docs/installation
enterprise/backend/src/baserow_enterprise/locale/en/LC_MESSAGES
web-frontend/modules/database

View file

@ -980,6 +980,7 @@ BASEROW_WEBHOOKS_URL_CHECK_TIMEOUT_SECS = int(
BASEROW_MAX_WEBHOOK_CALLS_IN_QUEUE_PER_WEBHOOK = (
int(os.getenv("BASEROW_MAX_WEBHOOK_CALLS_IN_QUEUE_PER_WEBHOOK", "0")) or None
)
BASEROW_WEBHOOKS_BATCH_LIMIT = int(os.getenv("BASEROW_WEBHOOKS_BATCH_LIMIT", 5))
# ======== WARNING ========
# Please read and understand everything at:
@ -1265,7 +1266,9 @@ BASEROW_MAX_HEALTHY_CELERY_QUEUE_SIZE = int(
BASEROW_USE_LOCAL_CACHE = str_to_bool(os.getenv("BASEROW_USE_LOCAL_CACHE", "true"))
# -- CACHALOT SETTINGS --
CACHALOT_TIMEOUT = int(os.getenv("BASEROW_CACHALOT_TIMEOUT", 60 * 60 * 24 * 7))
BASEROW_CACHALOT_ONLY_CACHABLE_TABLES = os.getenv(
"BASEROW_CACHALOT_ONLY_CACHABLE_TABLES", None

View file

@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-02-11 17:20+0000\n"
"POT-Creation-Date: 2025-03-05 11:01+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@ -46,18 +46,18 @@ msgstr ""
msgid "Last name"
msgstr ""
#: src/baserow/contrib/builder/data_providers/data_provider_types.py:555
#: src/baserow/contrib/builder/data_providers/data_provider_types.py:563
#, python-format
msgid "%(user_source_name)s member"
msgstr ""
#: src/baserow/contrib/builder/data_sources/service.py:154
#: src/baserow/contrib/builder/data_sources/service.py:158
msgid "Data source"
msgstr ""
#: src/baserow/contrib/builder/elements/mixins.py:578
#: src/baserow/contrib/builder/elements/mixins.py:583
#: src/baserow/contrib/builder/elements/mixins.py:588
#: src/baserow/contrib/builder/elements/mixins.py:586
#: src/baserow/contrib/builder/elements/mixins.py:591
#: src/baserow/contrib/builder/elements/mixins.py:596
#, python-format
msgid "Column %(count)s"
msgstr ""

View file

@ -957,6 +957,7 @@ class DatabaseConfig(AppConfig):
)
from baserow.contrib.database.webhooks.notification_types import (
WebhookDeactivatedNotificationType,
WebhookPayloadTooLargeNotificationType,
)
from baserow.core.notifications.registries import notification_type_registry
@ -966,6 +967,7 @@ class DatabaseConfig(AppConfig):
)
notification_type_registry.register(FormSubmittedNotificationType())
notification_type_registry.register(WebhookDeactivatedNotificationType())
notification_type_registry.register(WebhookPayloadTooLargeNotificationType())
# The signals must always be imported last because they use the registries
# which need to be filled first.

View file

@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-02-11 17:20+0000\n"
"POT-Creation-Date: 2025-03-05 11:01+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@ -149,7 +149,7 @@ msgid ""
msgstr ""
#: src/baserow/contrib/database/fields/models.py:453
#: src/baserow/contrib/database/fields/models.py:632
#: src/baserow/contrib/database/fields/models.py:641
msgid "The format of the duration."
msgstr ""
@ -646,14 +646,26 @@ msgid ""
"to %(webhook_url)s\" updated"
msgstr ""
#: src/baserow/contrib/database/webhooks/notification_types.py:70
#: src/baserow/contrib/database/webhooks/notification_types.py:90
#, python-format
msgid "%(name)s webhook has been deactivated."
msgstr ""
#: src/baserow/contrib/database/webhooks/notification_types.py:77
#: src/baserow/contrib/database/webhooks/notification_types.py:97
#, python-format
msgid ""
"The webhook failed more than %(max_failures)s consecutive times and was "
"therefore deactivated."
msgstr ""
#: src/baserow/contrib/database/webhooks/notification_types.py:147
#, python-format
msgid "%(name)s webhook payload too large."
msgstr ""
#: src/baserow/contrib/database/webhooks/notification_types.py:154
#, python-format
msgid ""
"The webhook couldn't send all the data because it reaches the maximum number "
"of batches of %(batch_limit)s."
msgstr ""

View file

@ -0,0 +1,24 @@
# Generated by Django 5.0.9 on 2025-03-04 18:18
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("database", "0180_view_allow_public_export"),
]
operations = [
migrations.AddField(
model_name="tablewebhookcall",
name="batch_id",
field=models.PositiveIntegerField(
help_text="The batch ID for this call. Null if not part of a batch. Used for batching multiple calls of the same event_id due to large data.",
null=True,
),
),
migrations.AlterUniqueTogether(
name="tablewebhookcall",
unique_together={("event_id", "batch_id", "webhook", "event_type")},
),
]

View file

@ -20,3 +20,7 @@ class TableWebhookEventConfigFieldNotInTable(Exception):
class SkipWebhookCall(Exception):
"""Raised when the webhook call must be skipped"""
class WebhookPayloadTooLarge(Exception):
"""Raised when the webhook payload is too large and exceeds the batches limit."""

View file

@ -1,5 +1,6 @@
import uuid
from django.conf import settings
from django.core.validators import MaxLengthValidator
from django.db import models
@ -58,6 +59,18 @@ class TableWebhook(CreatedAndUpdatedOnMixin, models.Model):
def header_dict(self):
return {header.name: header.value for header in self.headers.all()}
@property
def batch_limit(self) -> int:
"""
This value will be used to limit the amount batches a single webhook can make to
paginate the payload. If the payload is too large to be sent in one go, the
event_type can split it into multiple batches. If the number of batches exceeds
this limit, a notification will be sent to workspace admins informing them that
the webhook couldn't send all the data.
"""
return settings.BASEROW_WEBHOOKS_BATCH_LIMIT
class Meta:
ordering = ("id",)
@ -90,6 +103,13 @@ class TableWebhookCall(models.Model):
editable=False,
help_text="Event ID where the call originated from.",
)
batch_id = models.PositiveIntegerField(
null=True,
help_text=(
"The batch ID for this call. Null if not part of a batch. "
"Used for batching multiple calls of the same event_id due to large data."
),
)
webhook = models.ForeignKey(
TableWebhook, related_name="calls", on_delete=models.CASCADE
)
@ -111,3 +131,4 @@ class TableWebhookCall(models.Model):
class Meta:
ordering = ("-called_time",)
unique_together = ("event_id", "batch_id", "webhook", "event_type")

View file

@ -1,10 +1,14 @@
from dataclasses import asdict, dataclass
from typing import List, Optional
from typing import List
from django.conf import settings
from django.utils.translation import gettext as _
from baserow.core.models import WORKSPACE_USER_PERMISSION_ADMIN, WorkspaceUser
from baserow.core.models import (
WORKSPACE_USER_PERMISSION_ADMIN,
Workspace,
WorkspaceUser,
)
from baserow.core.notifications.handler import NotificationHandler
from baserow.core.notifications.models import NotificationRecipient
from baserow.core.notifications.registries import (
@ -32,6 +36,36 @@ class DeactivatedWebhookData:
)
def notify_admins_in_workspace(
workspace: Workspace, notification_type: str, data: dict
) -> List[NotificationRecipient]:
"""
Notifies all admins in the workspace about an important event, such as a webhook
deactivation or a payload exceeding size limits.
:param workspace: The workspace whose admins will be notified.
:param notification_type: The type of notification to send.
:param data: The data to include in the notification.
:return: A list of created notification recipients.
"""
admins_workspace_users = WorkspaceUser.objects.filter(
workspace=workspace,
permissions=WORKSPACE_USER_PERMISSION_ADMIN,
user__profile__to_be_deleted=False,
user__is_active=True,
).select_related("user")
admins_in_workspace = [admin.user for admin in admins_workspace_users]
return NotificationHandler.create_direct_notification_for_users(
notification_type=notification_type,
recipients=admins_in_workspace,
data=data,
sender=None,
workspace=workspace,
)
class WebhookDeactivatedNotificationType(EmailNotificationTypeMixin, NotificationType):
type = "webhook_deactivated"
has_web_frontend_route = True
@ -39,30 +73,18 @@ class WebhookDeactivatedNotificationType(EmailNotificationTypeMixin, Notificatio
@classmethod
def notify_admins_in_workspace(
cls, webhook: TableWebhook
) -> Optional[List[NotificationRecipient]]:
) -> List[NotificationRecipient]:
"""
Creates a notification for each user that is subscribed to receive comments on
the row on which the comment was created.
Creates a notification of this type for each admin in the workspace that the
webhook belongs to.
:param webhook: The comment that was created.
:return:
:param webhook: The webhook that was deactivated.
:return: A list of notification recipients that have been created.
"""
workspace = webhook.table.database.workspace
admins_workspace_users = WorkspaceUser.objects.filter(
workspace=workspace,
permissions=WORKSPACE_USER_PERMISSION_ADMIN,
user__profile__to_be_deleted=False,
user__is_active=True,
).select_related("user")
admins_in_workspace = [admin.user for admin in admins_workspace_users]
return NotificationHandler.create_direct_notification_for_users(
notification_type=WebhookDeactivatedNotificationType.type,
recipients=admins_in_workspace,
data=asdict(DeactivatedWebhookData.from_webhook(webhook)),
sender=None,
workspace=webhook.table.database.workspace,
return notify_admins_in_workspace(
workspace, cls.type, asdict(DeactivatedWebhookData.from_webhook(webhook))
)
@classmethod
@ -79,3 +101,69 @@ class WebhookDeactivatedNotificationType(EmailNotificationTypeMixin, Notificatio
) % {
"max_failures": settings.BASEROW_WEBHOOKS_MAX_CONSECUTIVE_TRIGGER_FAILURES,
}
@dataclass
class WebhookPayloadTooLargeData:
webhook_id: int
table_id: int
database_id: int
webhook_name: str
event_id: str
batch_limit: int
@classmethod
def from_webhook(cls, webhook: TableWebhook, event_id: str):
return cls(
webhook_id=webhook.id,
table_id=webhook.table_id,
database_id=webhook.table.database_id,
webhook_name=webhook.name,
event_id=event_id,
batch_limit=webhook.batch_limit,
)
class WebhookPayloadTooLargeNotificationType(
EmailNotificationTypeMixin, NotificationType
):
type = "webhook_payload_too_large"
has_web_frontend_route = True
@classmethod
def notify_admins_in_workspace(
cls, webhook: TableWebhook, event_id: str
) -> List[NotificationRecipient]:
"""
Creates a notification of this type for each admin in the workspace that the
webhook belongs to.
:param webhook: The webhook trying to send a payload that is too large.
:param event_id: The event id that triggered the notification.
:return: A list of notification recipients that have been created.
"""
workspace = webhook.table.database.workspace
return notify_admins_in_workspace(
workspace,
cls.type,
asdict(WebhookPayloadTooLargeData.from_webhook(webhook, event_id)),
)
@classmethod
def get_notification_title_for_email(cls, notification, context):
return _("%(name)s webhook payload too large.") % {
"name": notification.data["webhook_name"],
}
@classmethod
def get_notification_description_for_email(cls, notification, context):
return _(
"The payload for the %(name)s webhook with event ID %(event_id)s "
"was too large. The content has been split into multiple batches, but "
"data above the batch limit of %(batch_limit)s was discarded."
) % {
"name": notification.data["webhook_name"],
"event_id": notification.data["event_id"],
"batch_limit": notification.data["batch_limit"],
}

View file

@ -7,7 +7,7 @@ from django.dispatch.dispatcher import Signal
from baserow.contrib.database.table.models import Table
from baserow.core.registry import Instance, ModelRegistryMixin, Registry
from .exceptions import SkipWebhookCall
from .exceptions import SkipWebhookCall, WebhookPayloadTooLarge
from .tasks import call_webhook
@ -15,8 +15,8 @@ class WebhookEventType(Instance):
"""
This class represents a custom webhook event type that can be added to the webhook
event type registry. Each registered event type needs to set a django signal on
which it will listen on. Upon initialization the webhook event type will connect
to the django signal.
which it will listen on. Upon initialization the webhook event type will connect to
the django signal.
The 'listener' function will be called for every received signal. The listener will
generate a unique ID for every received signal, find all webhooks that need to be
@ -107,6 +107,51 @@ class WebhookEventType(Instance):
transaction.on_commit(lambda: self.listener_after_commit(**kwargs))
def _paginate_payload(self, payload) -> tuple[dict, dict | None]:
"""
This method is called in the celery task and can be overwritten to paginate the
payload, if it's too large to send all the data at once. The default
implementation returns the payload and None as the next cursor, but if the
payload is too large to be sent in one go, this method can be used to return a
part of the payload and the remaining part as the next cursor. Proper `batch_id`
values will be added to the payload by the caller to keep track of the current
batch.
:param payload: The payload that must be paginated.
:return: A tuple containing the payload to be sent and the remaining payload for
the next batch if any or None.
"""
return payload, None
def paginate_payload(self, webhook, event_id, payload) -> tuple[dict, dict | None]:
"""
This method calls the `_paginate_payload` method and adds a `batch_id` to the
payload if the remaining payload is not None. The `batch_id` is used to keep
track of the current batch of the payload.
:param webhook: The webhook object related to the call.
:param event_id: The unique uuid event id of the event that triggered the call.
:param payload: The payload that must be paginated.
:return: A tuple containing the payload to be sent and the remaining payload for
the next batch if any or None.
"""
batch_id = int(payload.get("batch_id", None) or 1)
if webhook.batch_limit > 0 and batch_id > webhook.batch_limit:
raise WebhookPayloadTooLarge(
f"Payload for event '{self.type}' (event_id: '{event_id}') exceeds "
f"the batch limit of ({webhook.batch_limit} batches)."
)
prepared_payload, remaining_payload = self._paginate_payload(payload)
if remaining_payload is not None:
prepared_payload["batch_id"] = batch_id
remaining_payload["batch_id"] = batch_id + 1
return prepared_payload, remaining_payload
def listener_after_commit(self, **kwargs):
"""
Called after the signal is triggered and the transaction commits. By default it
@ -145,7 +190,7 @@ class WebhookEventType(Instance):
pass
class WebhookEventTypeRegistry(ModelRegistryMixin, Registry):
class WebhookEventTypeRegistry(ModelRegistryMixin, Registry[WebhookEventType]):
name = "webhook_event"

View file

@ -1,3 +1,4 @@
from copy import deepcopy
from datetime import datetime, timezone
from django.conf import settings
@ -8,6 +9,10 @@ from django.db.utils import OperationalError
from loguru import logger
from baserow.config.celery import app
from baserow.contrib.database.webhooks.exceptions import WebhookPayloadTooLarge
from baserow.contrib.database.webhooks.notification_types import (
WebhookPayloadTooLargeNotificationType,
)
from baserow.core.redis import RedisQueue
@ -83,30 +88,18 @@ def call_webhook(
can still measure this.
"""
from advocate import UnacceptableAddressException
from requests import RequestException
from .handler import WebhookHandler
from .models import TableWebhook, TableWebhookCall
from .notification_types import WebhookDeactivatedNotificationType
from .models import TableWebhook
from .registries import webhook_event_type_registry
if self.request.retries > retries:
retries = self.request.retries
try:
with transaction.atomic():
handler = WebhookHandler()
try:
webhook = TableWebhook.objects.select_for_update(
of=("self",),
nowait=True,
).get(
id=webhook_id,
# If a webhook is not active anymore, then it should not be
# executed.
active=True,
)
of=("self",), nowait=True
).get(id=webhook_id, active=True)
except TableWebhook.DoesNotExist:
# If the webhook has been deleted or disabled while executing, we don't
# want to continue making calls the URL because we can't update the
@ -128,73 +121,36 @@ def call_webhook(
else:
raise e
request = None
response = None
success = False
error = ""
# Paginate the payload if necessary and enqueue the remaining data.
webhook_event_type = webhook_event_type_registry.get(event_type)
try:
request, response = handler.make_request(method, url, headers, payload)
success = response.ok
except RequestException as exception:
request = exception.request
response = exception.response
error = str(exception)
except UnacceptableAddressException as exception:
error = f"UnacceptableAddressException: {exception}"
TableWebhookCall.objects.update_or_create(
event_id=event_id,
event_type=event_type,
webhook=webhook,
defaults={
"called_time": datetime.now(tz=timezone.utc),
"called_url": url,
"request": handler.format_request(request)
if request is not None
else None,
"response": handler.format_response(response)
if response is not None
else None,
"response_status": response.status_code
if response is not None
else None,
"error": error,
},
)
handler.clean_webhook_calls(webhook)
if success and webhook.failed_triggers != 0:
# If the call was successful and failed triggers had been increased in
# the past, we can safely reset it to 0 again to prevent deactivation of
# the webhook.
webhook.failed_triggers = 0
webhook.save()
elif not success and (
webhook.failed_triggers
< settings.BASEROW_WEBHOOKS_MAX_CONSECUTIVE_TRIGGER_FAILURES
):
# If the task has reached the maximum amount of failed calls, we're
# going to give up and increase the total failed triggers of the webhook
# if we're still operating within the limits of the max consecutive
# trigger failures.
webhook.failed_triggers += 1
webhook.save()
elif not success:
# If webhook has reached the maximum amount of failed triggers,
# we're going to deactivate it because we can reasonably assume that the
# target doesn't listen anymore. At this point we've tried 8 * 10 times.
# The user can manually activate it again when it's fixed.
webhook.active = False
webhook.save()
# Send a notification to the workspace admins that the webhook was
# deactivated.
payload, remaining = webhook_event_type.paginate_payload(
webhook, event_id, deepcopy(payload)
)
except WebhookPayloadTooLarge:
success = True # We don't want to retry this call, because it will fail again.
transaction.on_commit(
lambda: WebhookDeactivatedNotificationType.notify_admins_in_workspace(
webhook
lambda: WebhookPayloadTooLargeNotificationType.notify_admins_in_workspace(
webhook, event_id
)
)
else:
success = make_request_and_save_result(
webhook, event_id, event_type, method, url, headers, payload
)
# enqueue the next call if there is still remaining payload
if success and remaining is not None:
args = (
webhook_id,
event_id,
event_type,
method,
url,
headers,
remaining,
)
kwargs = {"retries": 0}
enqueue_webhook_task(webhook_id, event_id, args, kwargs)
# After the transaction successfully commits we can delay the next call
# in the queue, so that only one call is triggered concurrently.
@ -216,3 +172,84 @@ def call_webhook(
kwargs = self.request.kwargs or {}
kwargs["retries"] = retries + 1
self.retry(countdown=2**retries, kwargs=kwargs)
def make_request_and_save_result(
webhook, event_id, event_type, method, url, headers, payload
):
from advocate import UnacceptableAddressException
from requests import RequestException
from .handler import WebhookHandler
from .models import TableWebhookCall
from .notification_types import WebhookDeactivatedNotificationType
handler = WebhookHandler()
request = None
response = None
success = False
error = ""
try:
request, response = handler.make_request(method, url, headers, payload)
success = response.ok
except RequestException as exception:
request = exception.request
response = exception.response
error = str(exception)
except UnacceptableAddressException as exception:
error = f"UnacceptableAddressException: {exception}"
TableWebhookCall.objects.update_or_create(
event_id=event_id,
batch_id=payload.get("batch_id", None),
event_type=event_type,
webhook=webhook,
defaults={
"called_time": datetime.now(tz=timezone.utc),
"called_url": url,
"request": handler.format_request(request) if request is not None else None,
"response": handler.format_response(response)
if response is not None
else None,
"response_status": response.status_code if response is not None else None,
"error": error,
},
)
handler.clean_webhook_calls(webhook)
if success:
if webhook.failed_triggers != 0:
# If the call was successful and failed triggers had been increased
# in the past, we can safely reset it to 0 again to prevent
# deactivation of the webhook.
webhook.failed_triggers = 0
webhook.save()
elif (
webhook.failed_triggers
< settings.BASEROW_WEBHOOKS_MAX_CONSECUTIVE_TRIGGER_FAILURES
):
# If the task has reached the maximum amount of failed calls, we're
# going to give up and increase the total failed triggers of the webhook
# if we're still operating within the limits of the max consecutive
# trigger failures.
webhook.failed_triggers += 1
webhook.save()
else:
# If webhook has reached the maximum amount of failed triggers, we're
# going to deactivate it because we can reasonably assume that the
# target doesn't listen anymore. At this point we've tried 8 * 10 times.
# The user can manually activate it again when it's fixed.
webhook.active = False
webhook.save()
# Send a notification to the workspace admins that the webhook was
# deactivated.
transaction.on_commit(
lambda: WebhookDeactivatedNotificationType.notify_admins_in_workspace(
webhook
)
)
return success

View file

@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-02-11 17:20+0000\n"
"POT-Creation-Date: 2025-03-05 11:01+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@ -242,7 +242,7 @@ msgstr ""
msgid "Decimal number"
msgstr ""
#: src/baserow/core/handler.py:2122 src/baserow/core/user/handler.py:267
#: src/baserow/core/handler.py:2130 src/baserow/core/user/handler.py:267
#, python-format
msgid "%(name)s's workspace"
msgstr ""
@ -305,11 +305,7 @@ msgid_plural "Plus %(counter)s more notifications."
msgstr[0] ""
msgstr[1] ""
#: src/baserow/core/templates/baserow/core/notifications_summary.html:239
msgid "View in Baserow"
msgstr ""
#: src/baserow/core/templates/baserow/core/notifications_summary.html:253
#: src/baserow/core/templates/baserow/core/notifications_summary.html:235
#: src/baserow/core/templates/baserow/core/user/account_deleted.html:186
#: src/baserow/core/templates/baserow/core/user/account_deletion_cancelled.html:186
#: src/baserow/core/templates/baserow/core/user/account_deletion_scheduled.html:191

View file

@ -30,6 +30,7 @@ from sqlparse import format
from baserow.contrib.database.application_types import DatabaseApplicationType
from baserow.contrib.database.fields.fields import SerialField
from baserow.contrib.database.fields.models import LinkRowField
from baserow.contrib.database.webhooks.registries import webhook_event_type_registry
from baserow.core.cache import local_cache
from baserow.core.context import clear_current_workspace_id
from baserow.core.exceptions import PermissionDenied
@ -233,6 +234,14 @@ def mutable_job_type_registry():
job_type_registry.get_for_class.cache_clear()
@pytest.fixture()
def mutable_webhook_event_type_registry():
with patch.object(webhook_event_type_registry, "registry", {}):
webhook_event_type_registry.get_for_class.cache_clear()
yield webhook_event_type_registry
webhook_event_type_registry.get_for_class.cache_clear()
@pytest.fixture()
def stub_user_source_registry(data_fixture, mutable_user_source_registry, fake):
from baserow.core.user_sources.registries import UserSourceType

View file

@ -1,20 +1,21 @@
from django.test import override_settings
import pytest
from baserow.contrib.database.webhooks.notification_types import (
WebhookDeactivatedNotificationType,
WebhookPayloadTooLargeNotificationType,
)
@pytest.mark.django_db(transaction=True)
def test_webhook_deactivated_notification_can_be_render_as_email(
api_client, data_fixture
):
@pytest.mark.django_db
def test_webhook_deactivated_notification_can_be_render_as_email(data_fixture):
user = data_fixture.create_user()
workspace = data_fixture.create_workspace(user=user)
database = data_fixture.create_database_application(workspace=workspace)
table = data_fixture.create_database_table(database=database)
webhook = data_fixture.create_table_webhook(
table=table, active=True, failed_triggers=1
table=table, active=True, failed_triggers=1, name="test"
)
notification_recipients = (
@ -22,11 +23,12 @@ def test_webhook_deactivated_notification_can_be_render_as_email(
)
notification = notification_recipients[0].notification
assert WebhookDeactivatedNotificationType.get_notification_title_for_email(
notification, {}
) == "%(name)s webhook has been deactivated." % {
"name": notification.data["webhook_name"],
}
assert (
WebhookDeactivatedNotificationType.get_notification_title_for_email(
notification, {}
)
== "test webhook has been deactivated."
)
assert (
WebhookDeactivatedNotificationType.get_notification_description_for_email(
@ -35,3 +37,40 @@ def test_webhook_deactivated_notification_can_be_render_as_email(
== "The webhook failed more than 8 consecutive times and "
"was therefore deactivated."
)
@pytest.mark.django_db
@override_settings(BASEROW_WEBHOOKS_BATCH_LIMIT=1)
def test_webhook_payload_too_large_can_be_render_as_email(data_fixture):
user = data_fixture.create_user()
workspace = data_fixture.create_workspace(user=user)
database = data_fixture.create_database_application(workspace=workspace)
table = data_fixture.create_database_table(database=database)
webhook = data_fixture.create_table_webhook(
table=table, active=True, failed_triggers=1, name="test"
)
notification_recipients = (
WebhookPayloadTooLargeNotificationType.notify_admins_in_workspace(
webhook, "123"
)
)
notification = notification_recipients[0].notification
email_title = (
WebhookPayloadTooLargeNotificationType.get_notification_title_for_email(
notification, {}
)
)
assert email_title == "test webhook payload too large."
email_descr = (
WebhookPayloadTooLargeNotificationType.get_notification_description_for_email(
notification, {}
)
)
assert email_descr == (
"The payload for the test webhook with event ID 123 "
"was too large. The content has been split into multiple batches, but "
"data above the batch limit of 1 was discarded."
)

View file

@ -12,8 +12,14 @@ from celery.exceptions import Retry
from baserow.contrib.database.webhooks.models import TableWebhook, TableWebhookCall
from baserow.contrib.database.webhooks.notification_types import (
WebhookDeactivatedNotificationType,
WebhookPayloadTooLargeNotificationType,
)
from baserow.contrib.database.webhooks.registries import WebhookEventType
from baserow.contrib.database.webhooks.tasks import (
call_webhook,
enqueue_webhook_task,
schedule_next_task_in_queue,
)
from baserow.contrib.database.webhooks.tasks import call_webhook
from baserow.core.models import WorkspaceUser
from baserow.core.notifications.models import Notification
from baserow.core.redis import RedisQueue
@ -29,7 +35,7 @@ class MemoryQueue(RedisQueue):
def get_and_pop_next(self):
try:
self.queues[self.queue_key].pop(0)
return self.queues[self.queue_key].pop(0)
except IndexError:
return None
@ -430,3 +436,188 @@ def test_call_webhook_failed_reached_notification_send(
"webhook_id": webhook.id,
"webhook_name": webhook.name,
}
# the webhook should be deactivated after the notification is sent.
webhook.refresh_from_db()
assert webhook.active is False
class PaginatedWebhookEventType(WebhookEventType):
type = "test.paginated"
def __init__(self):
self.i = 1
def _paginate_payload(self, payload) -> tuple[dict, dict | None]:
payload["data"] = f"part {self.i}"
self.i += 1
return payload, {"data": f"part {self.i}"}
@pytest.mark.django_db(transaction=True)
@responses.activate
@patch("baserow.contrib.database.webhooks.tasks.RedisQueue", MemoryQueue)
@patch("baserow.contrib.database.webhooks.tasks.cache", MagicMock())
def test_webhook_with_paginated_payload(
mutable_webhook_event_type_registry, data_fixture
):
mutable_webhook_event_type_registry.register(PaginatedWebhookEventType())
webhook = data_fixture.create_table_webhook()
responses.add(responses.POST, "http://localhost/", json={}, status=200)
event_id = "00000000-0000-0000-0000-000000000002"
expected_payload = (
webhook.id,
event_id,
"test.paginated",
"POST",
"http://localhost/",
{},
{"batch_id": 2, "data": "part 2"},
)
# The first page of the payload is sent and contains the batch_id 1.
with patch(
"baserow.contrib.database.webhooks.tasks.enqueue_webhook_task"
) as mock_enqueue, patch(
"baserow.contrib.database.webhooks.tasks.schedule_next_task_in_queue"
) as mock_schedule:
call_webhook(
webhook_id=webhook.id,
event_id=event_id,
event_type="test.paginated",
method="POST",
url="http://localhost/",
headers={},
payload={},
)
assert mock_enqueue.call_args[0][2] == expected_payload
mock_schedule.assert_called_with(webhook.id)
assert TableWebhookCall.objects.all().count() == 1
assert TableWebhookCall.objects.filter(event_id=event_id).first().batch_id == 1
# we mocked this function to ensure the enqueued payload is correct, now if we call
# the function again, we should see the next batch_id being sent.
enqueue_webhook_task(webhook.id, event_id, expected_payload, {})
with patch(
"baserow.contrib.database.webhooks.tasks.enqueue_webhook_task"
) as mock_enqueue:
schedule_next_task_in_queue(webhook.id)
assert mock_enqueue.call_args[0][2] == (
webhook.id,
event_id,
"test.paginated",
"POST",
"http://localhost/",
{},
{"batch_id": 3, "data": "part 3"},
)
assert TableWebhookCall.objects.all().count() == 2
# Same event_id, but different batch_id.
assert TableWebhookCall.objects.filter(event_id=event_id).first().batch_id == 2
@pytest.mark.django_db(transaction=True)
@responses.activate
@override_settings(BASEROW_WEBHOOKS_BATCH_LIMIT=1)
@patch("baserow.contrib.database.webhooks.tasks.RedisQueue", MemoryQueue)
@patch("baserow.contrib.database.webhooks.tasks.cache", MagicMock())
@patch("baserow.ws.tasks.broadcast_to_users.apply")
def test_call_webhook_payload_too_large_send_notification(
mocked_broadcast_to_users, mutable_webhook_event_type_registry, data_fixture
):
user_1 = data_fixture.create_user()
user_2 = data_fixture.create_user()
admin_1 = data_fixture.create_user()
admin_2 = data_fixture.create_user()
workspace = data_fixture.create_workspace()
WorkspaceUser.objects.create(
user=user_1, workspace=workspace, order=1, permissions="MEMBER"
)
WorkspaceUser.objects.create(
user=user_2, workspace=workspace, order=2, permissions="MEMBER"
)
WorkspaceUser.objects.create(
user=admin_1, workspace=workspace, order=3, permissions="ADMIN"
)
WorkspaceUser.objects.create(
user=admin_2, workspace=workspace, order=4, permissions="ADMIN"
)
database = data_fixture.create_database_application(workspace=workspace)
table = data_fixture.create_database_table(database=database)
mutable_webhook_event_type_registry.register(PaginatedWebhookEventType())
webhook = data_fixture.create_table_webhook(table=table, active=True)
responses.add(responses.POST, "http://localhost/", json={}, status=200)
event_id = "00000000-0000-0000-0000-000000000002"
expected_payload = (
webhook.id,
event_id,
"test.paginated",
"POST",
"http://localhost/",
{},
{"batch_id": 2, "data": "part 2"},
)
# The first page of the payload is sent and contains the batch_id 1.
with patch(
"baserow.contrib.database.webhooks.tasks.enqueue_webhook_task"
) as mock_enqueue, patch(
"baserow.contrib.database.webhooks.tasks.schedule_next_task_in_queue"
) as mock_schedule:
call_webhook(
webhook_id=webhook.id,
event_id=event_id,
event_type="test.paginated",
method="POST",
url="http://localhost/",
headers={},
payload={},
)
assert mock_enqueue.call_args[0][2] == expected_payload
mock_schedule.assert_called_with(webhook.id)
assert TableWebhookCall.objects.all().count() == 1
assert TableWebhookCall.objects.filter(event_id=event_id).first().batch_id == 1
# The second part of the payload exceeds the batch limit of 1. Therefore, it should
# not send the data but should trigger a notification.
enqueue_webhook_task(webhook.id, event_id, expected_payload, {})
schedule_next_task_in_queue(webhook.id)
# No new call should be made.
assert TableWebhookCall.objects.all().count() == 1
all_notifications = list(Notification.objects.all())
assert len(all_notifications) == 1
recipient_ids = [r.id for r in all_notifications[0].recipients.all()]
assert recipient_ids == [admin_1.id, admin_2.id]
assert all_notifications[0].type == WebhookPayloadTooLargeNotificationType.type
assert all_notifications[0].broadcast is False
assert all_notifications[0].workspace_id == workspace.id
assert all_notifications[0].sender is None
assert all_notifications[0].data == {
"database_id": database.id,
"table_id": table.id,
"webhook_id": webhook.id,
"webhook_name": webhook.name,
"event_id": event_id,
"batch_limit": 1,
}
# The webhook should still be active, but the queue should be empty.
webhook.refresh_from_db()
assert webhook.active is True
with patch("baserow.contrib.database.webhooks.tasks.call_webhook.delay") as mock:
schedule_next_task_in_queue(webhook.id)
mock.assert_not_called() # nothing else has been scheduled.
assert TableWebhookCall.objects.all().count() == 1

View file

@ -200,6 +200,7 @@ x-backend-variables: &backend-variables
BASEROW_ENTERPRISE_PERIODIC_DATA_SYNC_CHECK_INTERVAL_MINUTES:
BASEROW_ENTERPRISE_MAX_PERIODIC_DATA_SYNC_CONSECUTIVE_ERRORS:
BASEROW_USE_LOCAL_CACHE:
BASEROW_WEBHOOKS_BATCH_LIMIT:
services:
# A caddy reverse proxy sitting in-front of all the services. Responsible for routing

View file

@ -120,6 +120,7 @@ The installation methods referred to in the variable descriptions are:
| BASEROW\_WEBHOOKS\_MAX\_CALL\_LOG\_ENTRIES | The maximum number of call log entries stored per webhook. | 10 |
| BASEROW\_WEBHOOKS\_REQUEST\_TIMEOUT\_SECONDS | How long to wait on making the webhook request before timing out. | 5 |
| BASEROW\_MAX\_WEBHOOK\_CALLS\_IN\_QUEUE\_PER\_WEBHOOK | Maximum number of calls that can be in the webhook's queue. Can be useful to limit when massive numbers of webhooks are triggered due an automation loop. If not set or set to `0`, then there is no limit. | 0 |
| BASEROW\_WEBHOOKS\_BATCH\_LIMIT | This limit applies to all webhook_event_types that split large payloads into multiple batches. Each batch is a separate request with the same event_id and an incremental batch_id. This parameter sets the maximum number of batches per event. Set to 0 for unlimited batches.  | 5 |
### Generative AI configuration

View file

@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-02-13 13:04+0000\n"
"POT-Creation-Date: 2025-03-05 11:01+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@ -71,12 +71,12 @@ msgstr ""
msgid "Data sync table \"%(table_name)s\" (%(table_id)s) updated"
msgstr ""
#: src/baserow_enterprise/data_sync/notification_types.py:63
#: src/baserow_enterprise/data_sync/notification_types.py:66
#, python-format
msgid "%(name)s periodic data sync has been deactivated."
msgstr ""
#: src/baserow_enterprise/data_sync/notification_types.py:70
#: src/baserow_enterprise/data_sync/notification_types.py:73
#, python-format
msgid ""
"The periodic data sync failed more than %(max_failures)s consecutive "

View file

@ -0,0 +1,35 @@
<template>
<nuxt-link
class="notification-panel__notification-link"
:to="route"
@click.native="markAsReadAndHandleClick"
>
<div class="notification-panel__notification-content-title">
<i18n path="webhookPayloadTooLargeNotification.body" tag="span">
<template #name>
<strong>{{ notification.data.webhook_name }}</strong>
</template>
<template #event_id>
<strong>{{ notification.data.event_id }}</strong>
</template>
<template #batch_limit>
<strong>{{ notification.data.batch_limit }}</strong>
</template>
</i18n>
</div>
</nuxt-link>
</template>
<script>
import notificationContent from '@baserow/modules/core/mixins/notificationContent'
export default {
name: 'WebhookPayloadTooLargeNotification',
mixins: [notificationContent],
methods: {
handleClick() {
this.$emit('close-panel')
},
},
}
</script>

View file

@ -1058,5 +1058,8 @@
},
"webhookDeactivatedNotification": {
"body": "{name} webhook has been deactivated because it failed too many times consecutively."
},
"webhookPayloadTooLargeNotification": {
"body": "The payload for the {name} webhook with event ID {event_id} was too large. The content has been split into multiple batches, but data above the batch limit of {batch_limit} was discarded."
}
}

View file

@ -4,6 +4,7 @@ import CollaboratorAddedToRowNotification from '@baserow/modules/database/compon
import UserMentionInRichTextFieldNotification from '@baserow/modules/database/components/notifications/UserMentionInRichTextFieldNotification'
import FormSubmittedNotification from '@baserow/modules/database/components/notifications/FormSubmittedNotification'
import WebhookDeactivatedNotification from '@baserow/modules/database/components/notifications/WebhookDeactivatedNotification'
import WebhookPayloadTooLargeNotification from '@baserow/modules/database/components/notifications/WebhookPayloadTooLargeNotification'
export class CollaboratorAddedToRowNotificationType extends NotificationType {
static getType() {
@ -103,3 +104,27 @@ export class WebhookDeactivatedNotificationType extends NotificationType {
}
}
}
export class WebhookPayloadTooLargedNotificationType extends NotificationType {
static getType() {
return 'webhook_payload_too_large'
}
getIconComponent() {
return null
}
getContentComponent() {
return WebhookPayloadTooLargeNotification
}
getRoute(notificationData) {
return {
name: 'database-table-open-webhooks',
params: {
databaseId: notificationData.database_id,
tableId: notificationData.table_id,
},
}
}
}

View file

@ -320,6 +320,7 @@ import {
FormSubmittedNotificationType,
UserMentionInRichTextFieldNotificationType,
WebhookDeactivatedNotificationType,
WebhookPayloadTooLargedNotificationType,
} from '@baserow/modules/database/notificationTypes'
import { HistoryRowModalSidebarType } from '@baserow/modules/database/rowModalSidebarTypes'
import { FieldsDataProviderType } from '@baserow/modules/database/dataProviderTypes'
@ -1024,6 +1025,10 @@ export default (context) => {
'notification',
new WebhookDeactivatedNotificationType(context)
)
app.$registry.register(
'notification',
new WebhookPayloadTooLargedNotificationType(context)
)
app.$registry.register(
'rowModalSidebar',