1
0
Fork 0
mirror of https://gitlab.com/bramw/baserow.git synced 2025-03-10 02:52:25 +00:00

Ensure DispatchDataSourcesView excludes hidden data

This commit is contained in:
Tsering Paljor 2024-10-03 14:34:29 +00:00
parent 71ef458c47
commit cc952f0685
48 changed files with 4186 additions and 149 deletions

View file

@ -447,15 +447,15 @@ class DispatchDataSourceView(APIView):
DoesNotExist: ERROR_DATA_DOES_NOT_EXIST,
}
)
def post(self, request, data_source_id: int):
def post(self, request, data_source_id: str):
"""
Call the given data_source related service dispatch method.
"""
data_source = DataSourceHandler().get_data_source(data_source_id)
dispatch_context = BuilderDispatchContext(request, data_source.page)
data_source = DataSourceHandler().get_data_source(int(data_source_id))
dispatch_context = BuilderDispatchContext(
request, data_source.page, only_expose_public_formula_fields=False
)
response = DataSourceService().dispatch_data_source(
request.user, data_source, dispatch_context
)
@ -498,15 +498,15 @@ class DispatchDataSourcesView(APIView):
DoesNotExist: ERROR_DATA_DOES_NOT_EXIST,
}
)
def post(self, request, page_id: int):
def post(self, request, page_id: str):
"""
Call the given data_source related service dispatch method.
"""
page = PageHandler().get_page(page_id)
dispatch_context = BuilderDispatchContext(request, page)
page = PageHandler().get_page(int(page_id))
dispatch_context = BuilderDispatchContext(
request, page, only_expose_public_formula_fields=False
)
service_contents = DataSourceService().dispatch_page_data_sources(
request.user, page, dispatch_context
)

View file

@ -1,3 +1,5 @@
from django.db import transaction
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter, extend_schema
from rest_framework.permissions import AllowAny
@ -6,13 +8,30 @@ from rest_framework.views import APIView
from baserow.api.applications.errors import ERROR_APPLICATION_DOES_NOT_EXIST
from baserow.api.decorators import map_exceptions
from baserow.api.schemas import get_error_schema
from baserow.api.utils import DiscriminatorCustomFieldsMappingSerializer
from baserow.api.errors import ERROR_PERMISSION_DENIED
from baserow.api.schemas import CLIENT_SESSION_ID_SCHEMA_PARAMETER, get_error_schema
from baserow.api.utils import (
DiscriminatorCustomFieldsMappingSerializer,
apply_exception_mapping,
)
from baserow.contrib.builder.api.data_sources.errors import (
ERROR_DATA_DOES_NOT_EXIST,
ERROR_DATA_SOURCE_DOES_NOT_EXIST,
ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED,
)
from baserow.contrib.builder.api.domains.serializers import PublicBuilderSerializer
from baserow.contrib.builder.api.pages.errors import ERROR_PAGE_DOES_NOT_EXIST
from baserow.contrib.builder.api.workflow_actions.serializers import (
BuilderWorkflowActionSerializer,
)
from baserow.contrib.builder.data_sources.builder_dispatch_context import (
BuilderDispatchContext,
)
from baserow.contrib.builder.data_sources.exceptions import (
DataSourceDoesNotExist,
DataSourceImproperlyConfigured,
)
from baserow.contrib.builder.data_sources.handler import DataSourceHandler
from baserow.contrib.builder.data_sources.service import DataSourceService
from baserow.contrib.builder.domains.service import DomainService
from baserow.contrib.builder.elements.registries import element_type_registry
@ -28,7 +47,8 @@ from baserow.contrib.builder.workflow_actions.registries import (
from baserow.contrib.builder.workflow_actions.service import (
BuilderWorkflowActionService,
)
from baserow.core.exceptions import ApplicationDoesNotExist
from baserow.core.exceptions import ApplicationDoesNotExist, PermissionException
from baserow.core.services.exceptions import DoesNotExist, ServiceImproperlyConfigured
from baserow.core.services.registries import service_type_registry
from .serializers import PublicDataSourceSerializer, PublicElementSerializer
@ -267,3 +287,129 @@ class PublicBuilderWorkflowActionsView(APIView):
]
return Response(data)
class PublicDispatchDataSourceView(APIView):
permission_classes = (AllowAny,)
@extend_schema(
parameters=[
OpenApiParameter(
name="data_source_id",
location=OpenApiParameter.PATH,
type=OpenApiTypes.INT,
description="The id of the data_source you want to call the dispatch "
"for",
),
CLIENT_SESSION_ID_SCHEMA_PARAMETER,
],
tags=["Builder data sources"],
operation_id="dispatch_builder_page_data_source",
description=(
"Dispatches the service of the related data_source and returns "
"the result."
),
responses={
404: get_error_schema(
[
"ERROR_DATA_SOURCE_DOES_NOT_EXIST",
"ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED",
"ERROR_IN_DISPATCH_CONTEXT",
"ERROR_DATA_DOES_NOT_EXIST",
]
),
},
)
@transaction.atomic
@map_exceptions(
{
DataSourceDoesNotExist: ERROR_DATA_SOURCE_DOES_NOT_EXIST,
DataSourceImproperlyConfigured: ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED,
ServiceImproperlyConfigured: ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED,
DoesNotExist: ERROR_DATA_DOES_NOT_EXIST,
}
)
def post(self, request, data_source_id: str):
"""
Call the given data_source related service dispatch method.
"""
data_source = DataSourceHandler().get_data_source(int(data_source_id))
dispatch_context = BuilderDispatchContext(
request, data_source.page, only_expose_public_formula_fields=True
)
response = DataSourceService().dispatch_data_source(
request.user, data_source, dispatch_context
)
return Response(response)
class PublicDispatchDataSourcesView(APIView):
permission_classes = (AllowAny,)
@extend_schema(
parameters=[
OpenApiParameter(
name="page_id",
location=OpenApiParameter.PATH,
type=OpenApiTypes.INT,
description="The page we want to dispatch the data source for.",
),
CLIENT_SESSION_ID_SCHEMA_PARAMETER,
],
tags=["Builder data sources"],
operation_id="dispatch_builder_page_data_sources",
description="Dispatches the service of the related page data_sources",
responses={
404: get_error_schema(
[
"ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED",
"ERROR_IN_DISPATCH_CONTEXT",
"ERROR_DATA_DOES_NOT_EXIST",
"ERROR_PAGE_DOES_NOT_EXIST",
]
),
},
)
@transaction.atomic
@map_exceptions(
{
PageDoesNotExist: ERROR_PAGE_DOES_NOT_EXIST,
ServiceImproperlyConfigured: ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED,
DoesNotExist: ERROR_DATA_DOES_NOT_EXIST,
}
)
def post(self, request, page_id: str):
"""
Call the given data_source related service dispatch method.
"""
page = PageHandler().get_page(int(page_id))
dispatch_context = BuilderDispatchContext(
request, page, only_expose_public_formula_fields=True
)
service_contents = DataSourceService().dispatch_page_data_sources(
request.user, page, dispatch_context
)
responses = {}
for service_id, content in service_contents.items():
if isinstance(content, Exception):
_, error, detail = apply_exception_mapping(
{
DataSourceDoesNotExist: ERROR_DATA_SOURCE_DOES_NOT_EXIST,
DataSourceImproperlyConfigured: ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED,
ServiceImproperlyConfigured: ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED,
DoesNotExist: ERROR_DATA_DOES_NOT_EXIST,
PermissionException: ERROR_PERMISSION_DENIED,
},
content,
with_fallback=True,
)
responses[service_id] = {"_error": error, "detail": detail}
else:
responses[service_id] = content
return Response(responses)

View file

@ -5,6 +5,8 @@ from baserow.contrib.builder.api.domains.public_views import (
PublicBuilderByIdView,
PublicBuilderWorkflowActionsView,
PublicDataSourcesView,
PublicDispatchDataSourcesView,
PublicDispatchDataSourceView,
PublicElementsView,
)
from baserow.contrib.builder.api.domains.views import (
@ -63,4 +65,14 @@ urlpatterns_without_builder_id = [
AskPublicBuilderDomainExistsView.as_view(),
name="ask_exists",
),
re_path(
r"published/data-source/(?P<data_source_id>[0-9]+)/dispatch/$",
PublicDispatchDataSourceView.as_view(),
name="public_dispatch",
),
re_path(
r"published/page/(?P<page_id>[0-9]+)/dispatch-data-sources/$",
PublicDispatchDataSourcesView.as_view(),
name="public_dispatch_all",
),
]

View file

@ -1,5 +1,5 @@
import re
from typing import Any, Dict, List, Type, Union
from typing import Any, Dict, List, Optional, Type, Union
from django.utils.translation import gettext as _
@ -24,7 +24,7 @@ from baserow.contrib.builder.elements.models import FormElement
from baserow.contrib.builder.workflow_actions.handler import (
BuilderWorkflowActionHandler,
)
from baserow.core.formula.exceptions import FormulaRecursion
from baserow.core.formula.exceptions import FormulaRecursion, InvalidBaserowFormula
from baserow.core.formula.registries import DataProviderType
from baserow.core.services.dispatch_context import DispatchContext
from baserow.core.user_sources.constants import DEFAULT_USER_ROLE_PREFIX
@ -177,6 +177,34 @@ class DataSourceDataProviderType(DataProviderType):
return [str(data_source_id), *rest]
def extract_properties(self, path: List[str], **kwargs) -> Dict[str, List[str]]:
"""
Given a list of formula path parts, call the ServiceType's
extract_properties() method and return a dict where the keys are the
Service IDs and the values are the field names.
E.g. given that path is: ['96', '1', 'field_5191'], returns
{1: ['field_5191']}.
"""
if not path:
return {}
_data_source_id, *rest = path
try:
data_source_id = int(_data_source_id)
except ValueError:
return {}
try:
data_source = DataSourceHandler().get_data_source(data_source_id)
except DataSourceDoesNotExist as exc:
# The data source has probably been deleted
raise InvalidBaserowFormula() from exc
service_type = data_source.service.specific.get_type()
return {data_source.service_id: service_type.extract_properties(rest, **kwargs)}
class DataSourceContextDataProviderType(DataProviderType):
"""
@ -220,6 +248,34 @@ class DataSourceContextDataProviderType(DataProviderType):
return [str(data_source_id), *rest]
def extract_properties(self, path: List[str], **kwargs) -> Dict[str, List[str]]:
"""
Given a list of formula path parts, call the ServiceType's
extract_properties() method and return a dict where the keys are the
Service IDs and the values are the field names.
E.g. given that path is: ['96', '1', 'field_5191'], returns
{1: ['field_5191']}.
"""
if not path:
return {}
_data_source_id, *rest = path
try:
data_source_id = int(_data_source_id)
except ValueError:
return {}
try:
data_source = DataSourceHandler().get_data_source(data_source_id)
except DataSourceDoesNotExist as exc:
# The data source has probably been deleted
raise InvalidBaserowFormula() from exc
service_type = data_source.service.specific.get_type()
return {data_source.service_id: service_type.extract_properties(rest, **kwargs)}
class CurrentRecordDataProviderType(DataProviderType):
"""
@ -290,6 +346,53 @@ class CurrentRecordDataProviderType(DataProviderType):
return rest
def extract_properties(
self,
path: List[str],
data_source_id: Optional[int] = None,
schema_property: Optional[str] = None,
**kwargs,
) -> Dict[str, List[str]]:
"""
Given a list of formula path parts, call the ServiceType's
extract_properties() method and return a dict where the keys are the
Service IDs and the values are the field names.
E.g. given that path is: ['96', '1', 'field_5191'], returns
{1: ['field_5191']}.
"""
if not path:
return {}
if data_source_id is None:
return {}
try:
data_source = DataSourceHandler().get_data_source(data_source_id)
except DataSourceDoesNotExist as exc:
# The data source is probably not accessible so we raise an invalid formula
raise InvalidBaserowFormula() from exc
service_type = data_source.service.specific.get_type()
if service_type.returns_list:
# Here we add a fake row part to make it match the usual shape
# for this path
if schema_property:
path = ["0", schema_property, *path]
else:
path = ["0", *path]
else:
# Current Record could also use Get Row service type (via Repeat
# element), so we need to add the field name if it is available.
if not schema_property:
return {}
else:
path = [schema_property, *path]
return {data_source.service_id: service_type.extract_properties(path, **kwargs)}
class PreviousActionProviderType(DataProviderType):
"""

View file

@ -1,19 +1,31 @@
from typing import TYPE_CHECKING, Optional
from functools import cached_property
from typing import TYPE_CHECKING, Dict, List, Optional
from django.http import HttpRequest
from baserow.contrib.builder.data_providers.registries import (
builder_data_provider_type_registry,
)
from baserow.contrib.builder.formula_property_extractor import get_formula_field_names
from baserow.contrib.builder.pages.models import Page
from baserow.core.feature_flags import feature_flag_is_enabled
from baserow.core.services.dispatch_context import DispatchContext
if TYPE_CHECKING:
from baserow.core.workflow_actions.models import WorkflowAction
FEATURE_FLAG_EXCLUDE_UNUSED_FIELDS = "feature-exclude-unused-fields"
class BuilderDispatchContext(DispatchContext):
own_properties = ["request", "page", "workflow_action", "offset", "count"]
own_properties = [
"request",
"page",
"workflow_action",
"offset",
"count",
"only_expose_public_formula_fields",
]
def __init__(
self,
@ -22,6 +34,7 @@ class BuilderDispatchContext(DispatchContext):
workflow_action: Optional["WorkflowAction"] = None,
offset: Optional[int] = None,
count: Optional[int] = None,
only_expose_public_formula_fields: Optional[bool] = True,
):
self.request = request
self.page = page
@ -30,6 +43,7 @@ class BuilderDispatchContext(DispatchContext):
# Overrides the `request` GET offset/count values.
self.offset = offset
self.count = count
self.only_expose_public_formula_fields = only_expose_public_formula_fields
super().__init__()
@ -37,6 +51,31 @@ class BuilderDispatchContext(DispatchContext):
def data_provider_registry(self):
return builder_data_provider_type_registry
@cached_property
def public_formula_fields(self) -> Optional[Dict[str, Dict[int, List[str]]]]:
"""
Return a Dict where keys are ["all", "external", "internal"] and values
dicts. The internal dicts' keys are Service IDs and values are a list
of Data Source field names.
Returns None if field names shouldn't be included in the dispatch
context. This is mainly to support a feature flag for this new feature.
The field names are used to improve the security of the backend by
ensuring only the minimum necessary data is exposed to the frontend.
It is used to restrict the queryset as well as to discern which Data
Source fields are external and safe (user facing) vs internal and
sensitive (required only by the backend).
"""
if self.only_expose_public_formula_fields and feature_flag_is_enabled(
FEATURE_FLAG_EXCLUDE_UNUSED_FIELDS
):
return get_formula_field_names(self.request.user, self.page)
return None
def range(self, service):
"""
Return page range from the `offset`, `count` kwargs,

View file

@ -366,6 +366,7 @@ class DataSourceHandler:
service_dispatch = self.service_handler.dispatch_service(
data_source.service.specific, dispatch_context
)
# Cache the dispatch in the formula cache if we have formulas that need
# it later
dispatch_context.cache["data_source_contents"][

View file

@ -1,6 +1,7 @@
from django.contrib.contenttypes.models import ContentType
from django.db import models
from baserow.contrib.builder.mixins import BuilderInstanceWithFormulaMixin
from baserow.contrib.builder.pages.models import Page
from baserow.core.mixins import (
FractionOrderableMixin,
@ -18,6 +19,7 @@ class DataSource(
HierarchicalModelMixin,
TrashableModelMixin,
FractionOrderableMixin,
BuilderInstanceWithFormulaMixin,
models.Model,
):
"""
@ -79,3 +81,18 @@ class DataSource(
queryset = DataSource.objects.filter(page=before.page)
return cls.get_unique_orders_before_item(before, queryset)[0]
def formula_generator(self, instance: "DataSource"):
"""
Yield the formulas from the current data source instance and from the underlying
service if it exists.
"""
yield from super().formula_generator(instance)
service = instance.service.specific if instance.service else None
# The Data Source's service can be None if the user created a Data
# Source but didn't finish configuring it.
if service:
yield from service.get_type().formula_generator(service)

View file

@ -255,7 +255,50 @@ class DataSourceService:
workspace=data_sources[0].page.builder.workspace,
)
return self.handler.dispatch_data_sources(data_sources, dispatch_context)
results = self.handler.dispatch_data_sources(data_sources, dispatch_context)
if dispatch_context.public_formula_fields is None:
return results
# We filter the fields before returning the result
for data_source in data_sources:
if isinstance(results[data_source.id], Exception):
continue
field_names = dispatch_context.public_formula_fields.get(
"external", {}
).get(data_source.service.id, [])
if data_source.service.get_type().returns_list:
new_result = []
for row in results[data_source.id]["results"]:
new_row = {}
for key, value in row.items():
if key in ["id", "order"]:
# Ensure keys like "id" and "order" are included
# in new_row
new_row[key] = value
elif key in field_names:
# Only include the field if it is in the
# external/safe field_names list
new_row[key] = value
new_result.append(new_row)
results[data_source.id] = {
**results[data_source.id],
"results": new_result,
}
else:
new_result = {}
for key, value in results[data_source.id].items():
if key in ["id", "order"]:
# Ensure keys like "id" and "order" are included in new_row
new_result[key] = value
elif key in field_names:
# Only include the field if it is in the external/safe
# field_names list
new_result[key] = value
results[data_source.id] = new_result
return results
def dispatch_page_data_sources(
self,

View file

@ -372,7 +372,7 @@ class RepeatElementType(
**kwargs,
)
def import_context_addition(self, instance, id_mapping):
def import_context_addition(self, instance):
return {"data_source_id": instance.data_source_id}
def get_pytest_params(self, pytest_data_fixture) -> Dict[str, Any]:
@ -488,7 +488,7 @@ class RecordSelectorElementType(
"option_name_suffix",
]
def import_context_addition(self, instance, id_mapping):
def import_context_addition(self, instance):
return {"data_source_id": instance.data_source_id}
def get_pytest_params(self, pytest_data_fixture) -> Dict[str, Any]:

View file

@ -591,7 +591,6 @@ class ElementHandler:
def get_import_context_addition(
self,
element_id: int,
id_mapping: Dict[str, Dict[int, int]],
element_map: Dict[int, Element] = None,
) -> Dict[str, Any]:
"""
@ -601,7 +600,6 @@ class ElementHandler:
actions.
:param element_id: The element_id to compute the context for.
:param id_mapping: The ID mapping dict used by import process.
:param element_map: An optional map of already loaded elements to improve
performances.
:return: An object that can be used as import context.
@ -620,9 +618,9 @@ class ElementHandler:
current_element = self.get_element(element_id)
return current_element.get_type().import_context_addition(
current_element, id_mapping
current_element
) | self.get_import_context_addition(
current_element.parent_element_id, id_mapping, element_map
current_element.parent_element_id, element_map
)
def import_element(

View file

@ -1,5 +1,7 @@
from typing import Any, Dict, Generator, List, Optional, Type
from typing import Any, Dict, List, Optional, Type
from zipfile import ZipFile
from django.core.files.storage import Storage
from django.db import IntegrityError
from django.db.models import Q, QuerySet
from django.utils.translation import gettext_lazy as _
@ -33,11 +35,15 @@ from baserow.contrib.builder.elements.registries import (
element_type_registry,
)
from baserow.contrib.builder.elements.signals import elements_moved
from baserow.contrib.builder.elements.types import CollectionElementSubClass
from baserow.contrib.builder.elements.types import (
CollectionElementSubClass,
ElementSubClass,
)
from baserow.contrib.builder.formula_importer import import_formula
from baserow.contrib.builder.pages.handler import PageHandler
from baserow.contrib.builder.types import ElementDict
from baserow.core.registry import Instance
from baserow.core.services.dispatch_context import DispatchContext
from baserow.core.utils import merge_dicts_no_duplicates
class ContainerElementTypeMixin:
@ -314,40 +320,20 @@ class CollectionElementTypeMixin:
**kwargs,
)
def import_serialized(
self,
parent: Any,
serialized_values: Dict[str, Any],
id_mapping: Dict[str, Any],
files_zip=None,
storage=None,
cache=None,
**kwargs,
):
def import_context_addition(self, instance: CollectionElement) -> Dict[str, int]:
"""
Here we add the data_source_id to the import process to be able to resolve
current_record formulas migration.
Given a collection element, adds the data_source_id to the import context.
The data_source_id is not store in some formulas (current_record ones) so
we need the generate this import context for all formulas of this element.
"""
actual_data_source_id = None
if (
serialized_values.get("data_source_id", None)
and "builder_data_sources" in id_mapping
):
actual_data_source_id = id_mapping["builder_data_sources"][
serialized_values["data_source_id"]
]
results = {"data_source_id": instance.data_source_id}
return super().import_serialized(
parent,
serialized_values,
id_mapping,
files_zip=files_zip,
storage=storage,
cache=cache,
data_source_id=actual_data_source_id,
**kwargs,
)
if instance.schema_property is not None:
results["schema_property"] = instance.schema_property
return results
def create_instance_from_serialized(
self,
@ -410,20 +396,6 @@ class CollectionElementWithFieldsTypeMixin(CollectionElementTypeMixin):
class SerializedDict(CollectionElementTypeMixin.SerializedDict):
fields: List[Dict]
def formula_generator(
self, element: "CollectionElementWithFieldsTypeMixin"
) -> Generator[str | Instance, str, None]:
"""
Generator that iterates over formula fields for LinkCollectionFieldType.
Some formula fields are in the config JSON field, e.g. page_parameters.
"""
yield from super().formula_generator(element)
for collection_field in element.fields.all():
yield from collection_field.get_type().formula_generator(collection_field)
def serialize_property(
self,
element: CollectionElementSubClass,
@ -546,7 +518,7 @@ class CollectionElementWithFieldsTypeMixin(CollectionElementTypeMixin):
)
import_field_context = ElementHandler().get_import_context_addition(
instance.id, id_mapping, cache.get("imported_element_map")
instance.id, cache.get("imported_element_map")
)
fields = [
@ -567,6 +539,85 @@ class CollectionElementWithFieldsTypeMixin(CollectionElementTypeMixin):
return instance
def import_serialized(
self,
page: Any,
serialized_values: Dict[str, Any],
id_mapping: Dict[str, Dict[int, int]],
files_zip: ZipFile | None = None,
storage: Storage | None = None,
cache: Dict[str, Any] | None = None,
**kwargs,
) -> ElementSubClass:
"""
This method is overridden to ensure that the import_context contains
the data sources and correctly imports formulas.
"""
# Import the element itself
created_instance = super().import_serialized(
page,
serialized_values,
id_mapping,
files_zip=files_zip,
storage=storage,
cache=cache,
**kwargs,
)
# For collection fields, import_context should include the current element
import_context = ElementHandler().get_import_context_addition(
created_instance.id,
element_map=cache.get("imported_element_map", None) if cache else None,
)
# Import the collection field formulas
updated_models = []
for collection_field in created_instance.fields.all():
collection_field.get_type().import_formulas(
collection_field,
id_mapping,
import_formula,
**(kwargs | import_context),
)
updated_models.append(collection_field)
[m.save() for m in updated_models]
return created_instance
def extract_formula_properties(
self,
instance: CollectionElementSubClass,
element_map: Dict[str, Element],
**kwargs,
) -> Dict[int, List[str]]:
"""
Extract all formula field names of the collection element instance.
Returns a dict where keys are the Service ID and values are a list of
field names, e.g.: {164: ['field_5440', 'field_5441', 'field_5439']}
"""
from baserow.contrib.builder.elements.handler import ElementHandler
# First get from the current element
result = super().extract_formula_properties(instance, element_map, **kwargs)
# then extract the properties used in the collection field formulas
formula_context = ElementHandler().get_import_context_addition(
instance.id, element_map
)
for collection_field in instance.fields.all():
result = merge_dicts_no_duplicates(
result,
collection_field.get_type().extract_formula_properties(
collection_field, **formula_context
),
)
return result
class FormElementTypeMixin:
# Form element types are imported second, after containers.

View file

@ -9,13 +9,13 @@ from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from baserow.contrib.builder.formula_importer import import_formula
from baserow.contrib.builder.mixins import BuilderInstanceWithFormulaMixin
from baserow.contrib.database.db.functions import RandomUUID
from baserow.core.registry import (
CustomFieldsInstanceMixin,
CustomFieldsRegistryMixin,
EasyImportExportMixin,
Instance,
InstanceWithFormulaMixin,
ModelInstanceMixin,
ModelRegistryMixin,
Registry,
@ -33,7 +33,7 @@ EXISTING_USER_SOURCE_ROLES = "_existing_user_source_roles"
class ElementType(
InstanceWithFormulaMixin,
BuilderInstanceWithFormulaMixin,
EasyImportExportMixin[ElementSubClass],
CustomFieldsInstanceMixin,
ModelInstanceMixin[ElementSubClass],
@ -112,16 +112,13 @@ class ElementType(
:param instance: The to be deleted element instance.
"""
def import_context_addition(
self, instance: ElementSubClass, id_mapping
) -> Dict[str, Any]:
def import_context_addition(self, instance: ElementSubClass) -> Dict[str, Any]:
"""
This hook allow to specify extra context data when importing objects related
to this one like child elements, collection fields or workflow actions.
This extra context is then used as import context for these objects.
:param instance: The instance we want the context for.
:param id_mapping: The import ID mapping object.
:return: An object containing the extra context for the import process.
"""
@ -153,7 +150,6 @@ class ElementType(
]
import_context = ElementHandler().get_import_context_addition(
imported_parent_element_id,
id_mapping,
element_map=cache.get("imported_element_map", None),
)
@ -180,6 +176,7 @@ class ElementType(
**(kwargs | import_context),
)
# Update formulas of the current element
updated_models = self.import_formulas(
created_instance, id_mapping, import_formula, **(kwargs | import_context)
)
@ -309,6 +306,30 @@ class ElementType(
return value
def extract_formula_properties(
self,
instance: Element,
element_map: Dict[str, Element],
**kwargs,
) -> Dict[int, List[str]]:
"""
Extract all formula field names of the element instance.
Returns a dict where keys are the Service ID and values are a list of
field names, e.g.: {164: ['field_5440', 'field_5441', 'field_5439']}
"""
from baserow.contrib.builder.elements.handler import ElementHandler
# We get the context from the parent elements
formula_context = ElementHandler().get_import_context_addition(
instance.parent_element_id, element_map
)
return super().extract_formula_properties(
instance, **(kwargs | formula_context)
)
@abstractmethod
def get_pytest_params(self, pytest_data_fixture) -> Dict[str, Any]:
"""
@ -339,7 +360,7 @@ element_type_registry = ElementTypeRegistry()
class CollectionFieldType(
InstanceWithFormulaMixin,
BuilderInstanceWithFormulaMixin,
CustomFieldsInstanceMixin,
Instance,
ABC,

View file

@ -0,0 +1,225 @@
from typing import Dict, List, Set
from django.contrib.auth.models import AbstractUser
from antlr4.tree import Tree
from baserow.contrib.builder.data_providers.registries import (
builder_data_provider_type_registry,
)
from baserow.contrib.builder.elements.models import Element
from baserow.contrib.builder.formula_importer import BaserowFormulaImporter
from baserow.contrib.builder.pages.models import Page
from baserow.core.formula import BaserowFormula
from baserow.core.formula.exceptions import InvalidBaserowFormula
from baserow.core.utils import merge_dicts_no_duplicates, to_path
class FormulaFieldVisitor(BaserowFormulaImporter):
"""
This visitor will visit all nodes of a formula and return its formula fields.
"""
def __init__(self, **kwargs):
"""
Save the extra context to give it to data provider later.
"""
self.results = {}
self.extra_context = kwargs
def visit(self, tree: Tree) -> Set[str]:
"""
Due to the way the formula parsing works, the fields that are found by
visitFunctionCall() need to be collected in an instance variable.
This method is overridden to create the results set and return it.
"""
self.results = {}
super().visit(tree)
return self.results
def visitFunctionCall(self, ctx: BaserowFormula.FunctionCallContext):
"""
Visits all nodes of the formula and stores its Field IDs in the
self.results instance attribute.
"""
function_name = ctx.func_name().accept(self).lower()
function_argument_expressions = ctx.expr()
parts = [expr.accept(self) for expr in function_argument_expressions]
if function_name == "get" and isinstance(
function_argument_expressions[0], BaserowFormula.StringLiteralContext
):
# This is the formula with the function name stripped
# e.g. "'current_record.field_33'"
unquoted_arg = parts[0]
# Remove the surrounding quotes and split the data provider name
# e.g. "current_record" from the rest of the path, e.g. ["field_33"]
data_provider_name, *path = to_path(unquoted_arg[1:-1])
data_provider_type = builder_data_provider_type_registry.get(
data_provider_name
)
try:
self.results = merge_dicts_no_duplicates(
self.results,
data_provider_type.extract_properties(path, **self.extra_context),
)
except InvalidBaserowFormula:
# If the property extraction failed because of an Invalid formula
# we can ignore it. May be the related data source is gone.
pass
def get_element_field_names(
elements: List[Element],
element_map: Dict[str, Element],
) -> Dict[str, Dict[int, List[str]]]:
"""
Given a list of elements, find its formulas and extract their field names.
This function will update the results dict. It will only update the
"external" key, since all builder Elements are user-facing.
"""
results = {}
for element in elements:
results = merge_dicts_no_duplicates(
results,
element.get_type().extract_formula_properties(
element.specific, element_map
),
)
return {"external": results}
def get_workflow_action_field_names(
user: AbstractUser,
page: Page,
element_map: Dict[str, Element],
) -> Dict[str, Dict[int, List[str]]]:
"""
Given a Page, loop through all of its workflow actions and find its formula
field names.
This function will update the results dict. It will update both the
"internal" and "external" keys.
- "internal" field names are those that are only needed by the backend.
- "external" field names are those needed in the frontend.
"""
from baserow.contrib.builder.workflow_actions.service import (
BuilderWorkflowActionService,
)
from baserow.contrib.builder.workflow_actions.workflow_action_types import (
BuilderWorkflowServiceActionType,
)
results = {"internal": {}, "external": {}}
for workflow_action in BuilderWorkflowActionService().get_workflow_actions(
user, page
):
found_fields = workflow_action.get_type().extract_formula_properties(
workflow_action, element_map
)
if isinstance(workflow_action.get_type(), BuilderWorkflowServiceActionType):
# Action using service are internal use only
results["internal"] = merge_dicts_no_duplicates(
results["internal"], found_fields
)
else:
results["external"] = merge_dicts_no_duplicates(
results["external"], found_fields
)
return results
def get_data_source_field_names(
page: Page,
) -> Dict[str, Dict[int, List[str]]]:
"""
Given a Page, loop through all of its data sources. Find all related
services and return the field names of their formulas.
This function will update the results dict. It will only update the
"internal" keys, since data source field names are only required by
the backend..
"""
results = {}
from baserow.contrib.builder.data_sources.handler import DataSourceHandler
for data_source in DataSourceHandler().get_data_sources_with_cache(page):
results = merge_dicts_no_duplicates(
results,
data_source.extract_formula_properties(data_source),
)
return {"internal": results}
def get_formula_field_names(
user: AbstractUser, page: Page
) -> Dict[str, Dict[int, List[str]]]:
"""
Given a User and a Page, return all formula field names used in the Page.
This involves looping over all Elements, Workflow Actions, and Data Sources
in the Page.
The field names are categorized by "internal" and "external" field names.
Internal field names are those formula field names that the frontend does
not require. By excluding these field names, we improve the security of
the AB.
External field names are those formula field names used explicitly by
Elements or certain Workflow Actions in the Page.
If the user isn't allowed to view any Elements due to permissions, or
if the Elements have no formulas, no field names will be returned.
"""
from baserow.contrib.builder.elements.service import ElementService
elements = list(ElementService().get_elements(user, page))
element_map = {e.id: e for e in elements}
element_results = get_element_field_names(elements, element_map)
wa_results = get_workflow_action_field_names(user, page, element_map)
ds_results = get_data_source_field_names(page)
results = {
"internal": merge_dicts_no_duplicates(
wa_results["internal"], ds_results["internal"]
),
"external": merge_dicts_no_duplicates(
wa_results["external"], element_results["external"]
),
}
all_field_names = merge_dicts_no_duplicates(
results["internal"],
results["external"],
)
results["all"] = {key: sorted(value) for key, value in all_field_names.items()}
results["internal"] = {
key: sorted(value) for key, value in results["internal"].items()
}
results["external"] = {
key: sorted(value) for key, value in results["external"].items()
}
return results

View file

@ -0,0 +1,25 @@
from baserow.contrib.builder.formula_property_extractor import FormulaFieldVisitor
from baserow.core.formula.parser.exceptions import BaserowFormulaSyntaxError
from baserow.core.formula.parser.parser import get_parse_tree_for_formula
from baserow.core.registry import InstanceWithFormulaMixin
from baserow.core.utils import merge_dicts_no_duplicates
class BuilderInstanceWithFormulaMixin(InstanceWithFormulaMixin):
def extract_formula_properties(self, instance, **kwargs):
result = {}
for formula in self.formula_generator(instance):
if not formula:
continue
try:
tree = get_parse_tree_for_formula(formula)
except BaserowFormulaSyntaxError:
continue
result = merge_dicts_no_duplicates(
result, FormulaFieldVisitor(**kwargs).visit(tree)
)
return result

View file

@ -1,8 +1,9 @@
from typing import Any, Dict
from typing import Any, Dict, List
from django.contrib.auth.models import AbstractUser
from baserow.contrib.builder.formula_importer import import_formula
from baserow.contrib.builder.mixins import BuilderInstanceWithFormulaMixin
from baserow.contrib.builder.registries import PublicCustomFieldsInstanceMixin
from baserow.contrib.builder.workflow_actions.models import BuilderWorkflowAction
from baserow.core.registry import (
@ -13,7 +14,9 @@ from baserow.core.registry import (
from baserow.core.workflow_actions.registries import WorkflowActionType
class BuilderWorkflowActionType(WorkflowActionType, PublicCustomFieldsInstanceMixin):
class BuilderWorkflowActionType(
WorkflowActionType, PublicCustomFieldsInstanceMixin, BuilderInstanceWithFormulaMixin
):
allowed_fields = ["order", "page", "page_id", "element", "element_id", "event"]
parent_property_name = "page"
@ -88,7 +91,7 @@ class BuilderWorkflowActionType(WorkflowActionType, PublicCustomFieldsInstanceMi
if element_id:
imported_element_id = id_mapping["builder_page_elements"][element_id]
import_context = ElementHandler().get_import_context_addition(
imported_element_id, id_mapping, cache.get("imported_element_map", None)
imported_element_id, cache.get("imported_element_map", None)
)
created_instance = super().import_serialized(
@ -109,6 +112,29 @@ class BuilderWorkflowActionType(WorkflowActionType, PublicCustomFieldsInstanceMi
return created_instance
def extract_formula_properties(
self,
instance: BuilderWorkflowAction,
element_map: Dict[str, BuilderWorkflowAction],
**kwargs,
) -> Dict[int, List[str]]:
"""
Extract all formula field names of the workflow action instance.
Returns a dict where keys are the Service ID and values are a list of
field names, e.g.: {164: ['field_5440', 'field_5441', 'field_5439']}
"""
from baserow.contrib.builder.elements.handler import ElementHandler
formula_context = ElementHandler().get_import_context_addition(
instance.element_id, element_map
)
return super().extract_formula_properties(
instance, **(kwargs | formula_context)
)
class BuilderWorkflowActionTypeRegistry(
Registry, ModelRegistryMixin, CustomFieldsRegistryMixin

View file

@ -366,6 +366,19 @@ class BuilderWorkflowServiceActionType(BuilderWorkflowActionType):
values["service"] = service
return super().prepare_values(values, user, instance)
def formula_generator(
self, workflow_action: WorkflowAction
) -> Generator[str | Instance, str, None]:
"""
This formula generator includes the service formulas.
"""
yield from super().formula_generator(workflow_action)
# Now yield from the service
service = workflow_action.service.specific
yield from service.get_type().formula_generator(service)
class UpsertRowWorkflowActionType(BuilderWorkflowServiceActionType):
type = "upsert_row"

View file

@ -163,6 +163,7 @@ class LocalBaserowTableServiceType(LocalBaserowServiceType):
model = table.get_model()
queryset = self.get_queryset(service, table, dispatch_context, model)
return queryset
def get_queryset(
@ -617,6 +618,25 @@ class LocalBaserowViewServiceType(LocalBaserowTableServiceType):
return super().prepare_values(values, user, instance)
def extract_field_ids(
self, field_names: Optional[List[str]]
) -> Optional[List[int]]:
"""
Given a list of field names, e.g. ["field_123"], return a list of
IDs, e.g. [123].
None will be returned if field_names is None.
"""
if field_names is None:
return None
return [
int(field_name.split("field_")[-1])
for field_name in field_names
if isinstance(field_name, str) and "field_" in field_name
]
class LocalBaserowListRowsUserServiceType(
ListServiceTypeMixin,
@ -690,14 +710,9 @@ class LocalBaserowListRowsUserServiceType(
# can currently import properly, so we return the path as is.
return path
# If the `field_dbname` isn't a Baserow `Field.db_column`, then
# we don't have anything to map. This can happen if the `field_dbname`
# is an `id`, or single/multiple select option.
if not field_dbname.startswith("field_"):
return path
# If the field_dbname starts with anything other than "field_", it
# implies that the path is not a valid one for this service type.
# implies that the path is not a valid one for this service type or the name
# is the id.
#
# E.g. if the Page Designer changes a Data Source service type from
# List Rows to Get Row, any Element using the Data Source will have
@ -746,6 +761,46 @@ class LocalBaserowListRowsUserServiceType(
return [f"field_{field_id}", *rest]
def extract_properties(self, path: List[str], **kwargs) -> List[str]:
"""
Given a list of formula path parts, call the ServiceType's
extract_properties() method and return a set of unique field IDs.
E.g. given that path is: ['*', 'field_5191'], returns the
following: ['field_5191']
Returns None if the Field ID is not found.
The path can contain one or more parts, depending on the field type
and the formula. Some examples of `path` are:
An element that specifies a specific row and field:
['1', 'field_5439']
An element that specifies a field and all rows:
['*', 'field_5439']
An element that uses a Link Row Field formula
['0', 'field_5569', '0', 'value']
"""
# If the path length is greater or equal to 1, then we have
# the current data source formula format of row and field.
if len(path) >= 2:
row_id, field_dbname, *rest = path
else:
# In any other scenario, we have a formula that is not a format we
# can currently parse properly, so we return an empty list.
return []
# If the field_dbname doesn't start with "field_" and isn't "id" it probably
# means that the formula is invalid.
if not str(field_dbname).startswith("field_") and field_dbname != "id":
return []
return [field_dbname]
def serialize_property(
self,
service: ServiceSubClass,
@ -823,6 +878,19 @@ class LocalBaserowListRowsUserServiceType(
table = resolved_values["table"]
queryset = self.build_queryset(service, table, dispatch_context)
public_formula_fields = None
if dispatch_context.public_formula_fields is not None:
all_field_names = dispatch_context.public_formula_fields.get("all", {}).get(
service.id, None
)
if all_field_names is not None:
# Ensure that only the public_formula_fields explicitly used
# in the page are fetched from the database.
queryset = queryset.only(*all_field_names)
public_formula_fields = all_field_names
offset, count = dispatch_context.range(service)
# We query one more row to be able to know if there is another page that can be
@ -837,13 +905,10 @@ class LocalBaserowListRowsUserServiceType(
"results": rows[:-1] if has_next_page else rows,
"has_next_page": has_next_page,
"baserow_table_model": table.get_model(),
"public_formula_fields": public_formula_fields,
}
def dispatch_transform(
self,
dispatch_data: Dict[str, Any],
**kwargs,
) -> Any:
def dispatch_transform(self, dispatch_data: Dict[str, Any]) -> Any:
"""
Given the rows found in `dispatch_data`, serializes them.
@ -851,10 +916,12 @@ class LocalBaserowListRowsUserServiceType(
:return: The list of rows.
"""
field_ids = self.extract_field_ids(dispatch_data.get("public_formula_fields"))
serializer = get_row_serializer_class(
dispatch_data["baserow_table_model"],
RowSerializer,
is_response=True,
field_ids=field_ids,
)
return {
@ -1000,6 +1067,33 @@ class LocalBaserowGetRowUserServiceType(
return self.import_path(path, id_mapping)
def extract_properties(self, path: List[str], **kwargs) -> List[str]:
"""
Given a list of formula path parts, call the ServiceType's
extract_properties() method and return a set of unique field names.
E.g. given that path is: ['field_5191', 'prop1', '*', 'value'], returns the
following: ['field_5191']
Returns an empty list if the field name isn't found.
"""
# If the path length is greater or equal to one, then we have
# the current data source formula format of field and may be something else.
if len(path) >= 1:
field_dbname, *rest = path
else:
# In any other scenario, we have a formula that is not a format we
# can currently parse it properly, so we return an empty list.
return []
# If the field_dbname doesn't start with "field_" or isn't "id" it probably
# means that the formula is invalid.
if not str(field_dbname).startswith("field_") and field_dbname != "id":
return []
return [field_dbname]
def serialize_property(
self,
service: ServiceSubClass,
@ -1051,10 +1145,7 @@ class LocalBaserowGetRowUserServiceType(
**kwargs,
)
def dispatch_transform(
self,
dispatch_data: Dict[str, Any],
) -> Any:
def dispatch_transform(self, dispatch_data: Dict[str, Any]) -> Any:
"""
Responsible for serializing the `dispatch_data` row.
@ -1062,8 +1153,12 @@ class LocalBaserowGetRowUserServiceType(
:return:
"""
field_ids = self.extract_field_ids(dispatch_data.get("public_formula_fields"))
serializer = get_row_serializer_class(
dispatch_data["baserow_table_model"], RowSerializer, is_response=True
dispatch_data["baserow_table_model"],
RowSerializer,
is_response=True,
field_ids=field_ids,
)
serialized_row = serializer(dispatch_data["data"]).data
@ -1106,17 +1201,37 @@ class LocalBaserowGetRowUserServiceType(
model = table.get_model()
queryset = self.build_queryset(service, table, dispatch_context, model)
public_formula_fields = None
if dispatch_context.public_formula_fields is not None:
all_field_names = dispatch_context.public_formula_fields.get("all", {}).get(
service.id, None
)
if all_field_names is not None:
# Ensure that only the public_formula_fields explicitly used
# in the page are fetched from the database.
queryset = queryset.only(*all_field_names)
public_formula_fields = all_field_names
# If no row id is provided return the first item from the queryset
# This is useful when we want to use filters to specifically choose one
# row by setting the right condition
if "row_id" not in resolved_values:
if not queryset.exists():
raise DoesNotExist()
return {"data": queryset.first(), "baserow_table_model": model}
return {
"data": queryset.first(),
"baserow_table_model": model,
"public_formula_fields": public_formula_fields,
}
try:
row = queryset.get(pk=resolved_values["row_id"])
return {"data": row, "baserow_table_model": model}
return {
"data": row,
"baserow_table_model": model,
"public_formula_fields": public_formula_fields,
}
except model.DoesNotExist:
raise DoesNotExist()
@ -1358,10 +1473,7 @@ class LocalBaserowUpsertRowServiceType(
def enhance_queryset(self, queryset):
return super().enhance_queryset(queryset).prefetch_related("field_mappings")
def dispatch_transform(
self,
dispatch_data: Dict[str, Any],
) -> Any:
def dispatch_transform(self, dispatch_data: Dict[str, Any]) -> Any:
"""
Responsible for serializing the `dispatch_data` row.
@ -1370,7 +1482,9 @@ class LocalBaserowUpsertRowServiceType(
"""
serializer = get_row_serializer_class(
dispatch_data["baserow_table_model"], RowSerializer, is_response=True
dispatch_data["baserow_table_model"],
RowSerializer,
is_response=True,
)
serialized_row = serializer(dispatch_data["data"]).data
@ -1604,10 +1718,7 @@ class LocalBaserowDeleteRowServiceType(
resolved_values = super().resolve_service_formulas(service, dispatch_context)
return self.resolve_row_id(resolved_values, service, dispatch_context)
def dispatch_transform(
self,
dispatch_data: Dict[str, Any],
) -> Response:
def dispatch_transform(self, dispatch_data: Dict[str, Any]) -> Response:
"""
The delete row action's `dispatch_data` will contain an empty
`data` dictionary. When we get to this method and wish to transform

View file

@ -5,6 +5,10 @@ from loguru import logger
from baserow.core.utils import exception_capturer
class InvalidBaserowFormula(Exception):
"""Raised when manipulating an invalid formula"""
class FormulaRecursion(Exception):
"""Raised when the formula context detects a recursion."""

View file

@ -162,6 +162,22 @@ class DataProviderType(
return path
def extract_properties(
self,
path: List[str],
**kwargs,
) -> Dict[str, List[str]]:
"""
Given a list of formula path parts, call the ServiceType's
extract_properties() method and return a dict where the keys are the
Service IDs and the values are the field names.
E.g. given that path is: ['96', '1', 'field_5191'], returns
{1: ['field_5191']}.
"""
return {}
DataProviderTypeSubClass = TypeVar("DataProviderTypeSubClass", bound=DataProviderType)

View file

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Optional
from typing import Dict, List, Optional
from baserow.core.formula.runtime_formula_context import RuntimeFormulaContext
from baserow.core.services.models import Service
@ -62,3 +62,22 @@ class DispatchContext(RuntimeFormulaContext, ABC):
Responsible for returning the on-demand sortings, depending
on which module the `DispatchContext` is used by.
"""
@property
@abstractmethod
def public_formula_fields(self) -> Optional[Dict[str, Dict[int, List[str]]]]:
"""
Return a Dict where keys are ["all", "external", "internal"] and values
dicts. The internal dicts' keys are Service IDs and values are a list
of Data Source field names.
Returns None if public_formula_fields shouldn't be included in the dispatch
context. This is mainly to support a feature flag for this new feature.
The field names are used to improve the security of the backend by
ensuring only the minimum necessary data is exposed to the frontend.
It is used to restrict the queryset as well as to discern which Data
Source fields are external and safe (user facing) vs internal and
sensitive (required only by the backend).
"""

View file

@ -288,6 +288,9 @@ class ServiceType(
return created_instance
def extract_properties(self, path: List[str], **kwargs) -> List[str]:
return []
ServiceTypeSubClass = TypeVar("ServiceTypeSubClass", bound=ServiceType)

View file

@ -11,7 +11,7 @@ import string
from collections import defaultdict, namedtuple
from decimal import Decimal
from fractions import Fraction
from itertools import islice
from itertools import chain, islice
from numbers import Number
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
@ -1112,3 +1112,29 @@ def remove_duplicates(input_list):
seen = set()
return [x for x in input_list if not (x in seen or seen.add(x))]
def merge_dicts_no_duplicates(*dicts):
"""
Merges multiple dictionaries by combining the lists of values for any shared keys,
removing duplicate elements.
Parameters:
*dicts (dict): Multiple dictionaries with lists as values.
Returns:
dict: A new dictionary with merged values for shared keys, without duplicates.
"""
merged_dict = {}
for dictionary in dicts:
for key in dictionary:
# Combine the lists and remove duplicates by converting to a set,
# then back to a list
if key in merged_dict:
merged_dict[key] = list(set(chain(merged_dict[key], dictionary[key])))
else:
merged_dict[key] = dictionary[key]
return merged_dict

View file

@ -11,12 +11,16 @@ from typing import Any, Dict, List, Optional, Type, Union
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AbstractUser
from django.db import connection
from django.shortcuts import reverse
from django.utils.dateparse import parse_date, parse_datetime
import psycopg2
from freezegun import freeze_time
from pytest_unordered import unordered
from baserow.contrib.builder.data_sources.builder_dispatch_context import (
BuilderDispatchContext,
)
from baserow.contrib.database.fields.field_helpers import (
construct_all_possible_field_kwargs,
)
@ -567,3 +571,25 @@ def load_test_cases(name: str) -> Union[List, Dict]:
with open(file_path, "r") as file:
return json.load(file)
def get_dispatch_context(
url, data_fixture, api_request_factory, builder, page, data=None
):
"""Helper that returns a dispatch context to be used in tests."""
user_source = data_fixture.create_user_source_with_first_type(application=builder)
user_source_user = data_fixture.create_user_source_user(
user_source=user_source,
)
token = user_source_user.get_refresh_token().access_token
fake_request = api_request_factory.post(
reverse(url, kwargs={"page_id": page.id}),
{},
HTTP_USERSOURCEAUTHORIZATION=f"JWT {token}",
)
fake_request.user = user_source_user
if data is not None:
fake_request.data = data
return BuilderDispatchContext(fake_request, page)

View file

@ -6,7 +6,7 @@ import threading
from contextlib import contextmanager
from functools import partial
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from unittest.mock import patch
from django.conf import settings as django_settings
@ -770,6 +770,10 @@ class FakeDispatchContext(DispatchContext):
return get_value_at_path(self.context, key)
@property
def public_formula_fields(self) -> Optional[Dict[str, Dict[int, List[str]]]]:
return {}
@pytest.fixture()
def test_thread():

View file

@ -1,4 +1,5 @@
import json
from unittest.mock import ANY, MagicMock, patch
from django.urls import reverse
@ -12,7 +13,10 @@ from rest_framework.status import (
)
from baserow.contrib.builder.data_sources.models import DataSource
from baserow.contrib.builder.elements.models import Element
from baserow.core.services.models import Service
from baserow.core.user_sources.registries import user_source_type_registry
from baserow.core.user_sources.user_source_user import UserSourceUser
from baserow.test_utils.helpers import AnyStr
@ -1100,3 +1104,656 @@ def test_get_record_names(api_client, data_fixture):
response = api_client.get(url, format="json", HTTP_AUTHORIZATION=f"JWT {token}")
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json()["error"] == "ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED"
@pytest.fixture
def data_source_fixture(data_fixture):
"""A fixture to help test the DispatchDataSourcesView view."""
user, token = data_fixture.create_user_and_token()
table, fields, rows = data_fixture.build_table(
user=user,
columns=[
("Name", "text"),
("Color", "text"),
],
rows=[
["Apple", "Red"],
["Banana", "Yellow"],
["Cherry", "Purple"],
],
)
builder = data_fixture.create_builder_application(user=user)
integration = data_fixture.create_local_baserow_integration(
user=user, application=builder
)
page = data_fixture.create_builder_page(user=user, builder=builder)
return {
"user": user,
"token": token,
"page": page,
"integration": integration,
"table": table,
"rows": rows,
"fields": fields,
}
@pytest.mark.django_db
def test_dispatch_data_sources_list_rows_no_elements(
api_client, data_fixture, data_source_fixture
):
"""
Test the DispatchDataSourcesView endpoint when using a Data Source type
of List Rows.
If the page has zero elements, the API response should not contain any
field specific data.
"""
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=data_source_fixture["user"],
page=data_source_fixture["page"],
integration=data_source_fixture["integration"],
table=data_source_fixture["table"],
)
url = reverse(
"api:builder:domains:public_dispatch_all",
kwargs={"page_id": data_source_fixture["page"].id},
)
response = api_client.post(
url,
{},
format="json",
HTTP_AUTHORIZATION=f"JWT {data_source_fixture['token']}",
)
rows = data_source_fixture["rows"]
assert response.status_code == HTTP_200_OK
assert response.json() == {
str(data_source.id): {
"has_next_page": False,
"results": [
{
"id": rows[0].id,
"order": str(rows[0].order),
},
{
"id": rows[1].id,
"order": str(rows[1].order),
},
{
"id": rows[2].id,
"order": str(rows[2].order),
},
],
},
}
@pytest.mark.django_db
def test_dispatch_data_sources_get_row_no_elements(
api_client, data_fixture, data_source_fixture
):
"""
Test the DispatchDataSourcesView endpoint when using a Data Source type
of Get Row.
If the page has zero elements, the API response should not contain any
field specific data.
"""
data_source = data_fixture.create_builder_local_baserow_get_row_data_source(
user=data_source_fixture["user"],
page=data_source_fixture["page"],
integration=data_source_fixture["integration"],
table=data_source_fixture["table"],
row_id="2",
)
url = reverse(
"api:builder:domains:public_dispatch_all",
kwargs={"page_id": data_source_fixture["page"].id},
)
response = api_client.post(
url,
{},
format="json",
HTTP_AUTHORIZATION=f"JWT {data_source_fixture['token']}",
)
rows = data_source_fixture["rows"]
assert response.status_code == HTTP_200_OK
assert response.json() == {
str(data_source.id): {
"id": rows[1].id,
"order": str(rows[1].order),
}
}
@pytest.mark.django_db
def test_dispatch_data_sources_list_rows_with_elements(
api_client, data_fixture, data_source_fixture
):
"""
Test the DispatchDataSourcesView endpoint when using a Data Source type
of List Rows.
The API response should only contain field data when the field is
referenced in an element via a formula.
"""
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=data_source_fixture["user"],
page=data_source_fixture["page"],
integration=data_source_fixture["integration"],
table=data_source_fixture["table"],
)
field_id = data_source_fixture["fields"][0].id
# Create an element that uses a formula referencing the data source
data_fixture.create_builder_table_element(
page=data_source_fixture["page"],
data_source=data_source,
fields=[
{
"name": "FieldA",
"type": "text",
"config": {"value": f"get('current_record.field_{field_id}')"},
},
],
)
url = reverse(
"api:builder:domains:public_dispatch_all",
kwargs={"page_id": data_source_fixture["page"].id},
)
response = api_client.post(
url,
{},
format="json",
HTTP_AUTHORIZATION=f"JWT {data_source_fixture['token']}",
)
expected_results = []
rows = data_source_fixture["rows"]
for row in rows:
expected_results.append(
{
f"field_{field_id}": getattr(row, f"field_{field_id}"),
"id": row.id,
"order": str(row.order),
}
)
assert response.status_code == HTTP_200_OK
# Although this Data Source has 2 Fields/Columns, only one is returned
# since only one field_id is used by the Table.
assert response.json() == {
str(data_source.id): {
"has_next_page": False,
"results": expected_results,
},
}
@pytest.mark.django_db
@pytest.mark.parametrize(
# table_row_id is 1-indexed to reflect the row ID in formulas
# db_row_id is 0-indexed to reflect the row ID in the database
"table_row_id,db_row_id,",
[
(1, 0),
(2, 1),
(3, 2),
],
)
def test_dispatch_data_sources_get_row_with_elements(
api_client, data_fixture, data_source_fixture, table_row_id, db_row_id
):
"""
Test the DispatchDataSourcesView endpoint when using a Data Source type
of Get Row.
The API response should only contain field data when the field is
referenced in an element via a formula.
"""
data_source = data_fixture.create_builder_local_baserow_get_row_data_source(
user=data_source_fixture["user"],
page=data_source_fixture["page"],
integration=data_source_fixture["integration"],
table=data_source_fixture["table"],
row_id=table_row_id,
)
field_id = data_source_fixture["fields"][0].id
# Create an element that uses a formula referencing the data source
data_fixture.create_builder_table_element(
page=data_source_fixture["page"],
data_source=data_source,
fields=[
{
"name": "FieldA",
"type": "text",
"config": {
"value": f"get('data_source.{data_source.id}.field_{field_id}')"
},
},
],
)
url = reverse(
"api:builder:domains:public_dispatch_all",
kwargs={"page_id": data_source_fixture["page"].id},
)
response = api_client.post(
url,
{},
format="json",
HTTP_AUTHORIZATION=f"JWT {data_source_fixture['token']}",
)
rows = data_source_fixture["rows"]
assert response.status_code == HTTP_200_OK
assert response.json() == {
str(data_source.id): {
f"field_{field_id}": getattr(rows[db_row_id], f"field_{field_id}"),
"id": rows[db_row_id].id,
"order": str(rows[db_row_id].order),
}
}
@pytest.mark.django_db
def test_dispatch_data_sources_get_and_list_rows_with_elements(
api_client,
data_fixture,
data_source_fixture,
):
"""
Test the DispatchDataSourcesView endpoint when using a mix of Data Source
types, i.e. Get Row and List Rows.
The API response should only contain field data when the field is
referenced in an element via a formula.
"""
user = data_source_fixture["user"]
table_1, fields_1, rows_1 = data_fixture.build_table(
user=user,
columns=[
("Food", "text"),
],
rows=[
["Palak Paneer", "Paneer Pakora"],
],
)
data_source_1 = data_fixture.create_builder_local_baserow_get_row_data_source(
user=data_source_fixture["user"],
page=data_source_fixture["page"],
integration=data_source_fixture["integration"],
table=table_1,
row_id=1,
)
table_2, fields_2, rows_2 = data_fixture.build_table(
user=user,
columns=[
("Fruits", "text"),
],
rows=[
["Kiwi", "Cherry"],
],
)
data_source_2 = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=data_source_fixture["page"],
integration=data_source_fixture["integration"],
table=table_2,
)
# Create an element that uses a concatenation of two "get" formulas; one
# using the Get Row and the other using List Row data sources.
formula = (
f"concat(get('current_record.field_{fields_1[0].id}'),"
f"get('data_source.{data_source_1.id}.field_{fields_1[0].id}'))"
)
data_fixture.create_builder_table_element(
page=data_source_fixture["page"],
data_source=data_source_1,
fields=[
{
"name": "My Dishes",
"type": "text",
"config": {"value": formula},
},
],
)
# Create another table, this time using the List Row data source
data_fixture.create_builder_table_element(
page=data_source_fixture["page"],
data_source=data_source_2,
fields=[
{
"name": "My Fruits",
"type": "text",
"config": {"value": f"get('current_record.field_{fields_2[0].id}')"},
},
],
)
url = reverse(
"api:builder:domains:public_dispatch_all",
kwargs={"page_id": data_source_fixture["page"].id},
)
response = api_client.post(
url,
{},
format="json",
HTTP_AUTHORIZATION=f"JWT {data_source_fixture['token']}",
)
assert response.status_code == HTTP_200_OK
assert response.json() == {
str(data_source_1.id): {
f"field_{fields_1[0].id}": getattr(rows_1[0], f"field_{fields_1[0].id}"),
"id": rows_1[0].id,
"order": str(rows_1[0].order),
},
# Although this Data Source has 2 Fields/Columns, only one is returned
# since only one field_id is used by the Table.
str(data_source_2.id): {
"has_next_page": False,
"results": [
{
f"field_{fields_2[0].id}": getattr(
rows_2[0], f"field_{fields_2[0].id}"
),
"id": rows_2[0].id,
"order": str(rows_2[0].order),
},
],
},
}
@pytest.fixture
def data_source_element_roles_fixture(data_fixture):
"""
A fixture to help test the DispatchDataSourcesView view using Elements
and user roles.
"""
user = data_fixture.create_user()
builder = data_fixture.create_builder_application(user=user)
builder_to = data_fixture.create_builder_application(workspace=None)
data_fixture.create_builder_custom_domain(builder=builder, published_to=builder_to)
public_page = data_fixture.create_builder_page(builder=builder_to)
table, fields, rows = data_fixture.build_table(
user=user,
columns=[
("Name", "text"),
("Color", "text"),
],
rows=[
["Apple", "Red"],
["Banana", "Yellow"],
["Cherry", "Purple"],
],
)
return {
"page": public_page,
"user": user,
"table": table,
"fields": fields,
"rows": rows,
"builder_to": builder_to,
}
def create_user_table_and_role(data_fixture, user, builder, user_role):
"""Helper to create a User table with a particular user role."""
# Create the user table for the user_source
user_table, user_fields, user_rows = data_fixture.build_table(
user=user,
columns=[
("Email", "text"),
("Name", "text"),
("Password", "text"),
("Role", "text"),
],
rows=[
["foo@bar.com", "Foo User", "secret", user_role],
],
)
email_field, name_field, password_field, role_field = user_fields
integration = data_fixture.create_local_baserow_integration(
user=user, application=builder
)
user_source = data_fixture.create_user_source(
user_source_type_registry.get("local_baserow").model_class,
application=builder,
integration=integration,
table=user_table,
email_field=email_field,
name_field=name_field,
role_field=role_field,
)
return user_source, integration
@pytest.mark.django_db
@pytest.mark.parametrize(
"user_role,element_role,expect_fields",
[
# When the user role doesn't match the Element's role,
# the fields should *not* be returned.
("foo_role", "bar_role", False),
# When the user and Element roles match, the fields should
# be returned.
("foo_role", "foo_role", True),
],
)
def test_dispatch_data_sources_list_rows_with_elements_and_role(
api_client,
data_fixture,
data_source_element_roles_fixture,
user_role,
element_role,
expect_fields,
):
"""
Test the DispatchDataSourcesView endpoint when using a Data Source type
of List Rows.
This test creates a Element with a role. Depending on whether expect_fields
is True or False, the test checks to see if the Data Source view returns
the fields.
The API response should only contain field data when the field is
referenced in an element via a formula, and that element is visible
to the user.
"""
page = data_source_element_roles_fixture["page"]
user_source, integration = create_user_table_and_role(
data_fixture,
data_source_element_roles_fixture["user"],
data_source_element_roles_fixture["builder_to"],
user_role,
)
user_source_user = UserSourceUser(
user_source, None, 1, "foo_username", "foo@bar.com"
)
token = user_source_user.get_refresh_token().access_token
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=data_source_element_roles_fixture["user"],
page=page,
integration=integration,
table=data_source_element_roles_fixture["table"],
)
field_id = data_source_element_roles_fixture["fields"][0].id
# Create an element that uses a formula referencing the data source
data_fixture.create_builder_table_element(
page=page,
data_source=data_source,
visibility=Element.VISIBILITY_TYPES.LOGGED_IN,
roles=[element_role],
role_type=Element.ROLE_TYPES.DISALLOW_ALL_EXCEPT,
fields=[
{
"name": "FieldA",
"type": "text",
"config": {"value": f"get('current_record.field_{field_id}')"},
},
],
)
url = reverse(
"api:builder:domains:public_dispatch_all",
kwargs={"page_id": page.id},
)
response = api_client.post(
url,
{},
format="json",
HTTP_AUTHORIZATION=f"JWT {token}",
)
expected_results = []
for row in data_source_element_roles_fixture["rows"]:
result = {
"id": row.id,
"order": str(row.order),
}
if expect_fields:
# Field should only be visible if the user's role allows them
# to see the data source fields.
result[f"field_{field_id}"] = getattr(row, f"field_{field_id}")
expected_results.append(result)
assert response.status_code == HTTP_200_OK
assert response.json() == {
str(data_source.id): {
"has_next_page": False,
"results": expected_results,
},
}
@pytest.mark.django_db
@patch(
"baserow.contrib.builder.api.data_sources.views.DataSourceService.dispatch_data_source"
)
@patch("baserow.contrib.builder.api.data_sources.views.BuilderDispatchContext")
@patch(
"baserow.contrib.builder.api.data_sources.views.DataSourceHandler.get_data_source"
)
def test_dispatch_data_source_view(
mock_get_data_source,
mock_builder_dispatch_context,
mock_dispatch_data_source,
api_client,
):
"""
Test the DispatchDataSourceView endpoint.
Ensure that the field_names are not computed, because we don't want to
filter any fields in the Editor.
"""
mock_data_source = MagicMock()
mock_get_data_source.return_value = mock_data_source
mock_dispatch_context = MagicMock()
mock_builder_dispatch_context.return_value = mock_dispatch_context
mock_response = {}
mock_dispatch_data_source.return_value = mock_response
mock_data_source_id = 100
url = reverse(
"api:builder:data_source:dispatch",
kwargs={"data_source_id": mock_data_source_id},
)
response = api_client.post(url)
assert response.status_code == 200
assert response.json() == mock_response
mock_get_data_source.assert_called_once_with(mock_data_source_id)
mock_builder_dispatch_context.assert_called_once_with(
ANY,
mock_data_source.page,
only_expose_public_formula_fields=False,
)
mock_dispatch_data_source.assert_called_once_with(
ANY, mock_data_source, mock_dispatch_context
)
@pytest.mark.django_db
@patch(
"baserow.contrib.builder.api.data_sources.views.DataSourceService.dispatch_page_data_sources"
)
@patch("baserow.contrib.builder.api.data_sources.views.BuilderDispatchContext")
@patch("baserow.contrib.builder.api.data_sources.views.PageHandler.get_page")
def test_dispatch_data_sources_view(
mock_get_page,
mock_builder_dispatch_context,
mock_dispatch_page_data_sources,
api_client,
):
"""
Test the DispatchDataSourcesView endpoint.
Ensure that the field_names are not computed, because we don't want to
filter any fields in the Editor.
"""
mock_page = MagicMock()
mock_get_page.return_value = mock_page
mock_dispatch_context = MagicMock()
mock_builder_dispatch_context.return_value = mock_dispatch_context
mock_service_contents = {"101": "mock_content"}
mock_dispatch_page_data_sources.return_value = mock_service_contents
mock_page_id = 100
url = reverse(
"api:builder:data_source:dispatch-all", kwargs={"page_id": mock_page_id}
)
response = api_client.post(url)
assert response.status_code == 200
assert response.json() == mock_service_contents
mock_get_page.assert_called_once_with(mock_page_id)
mock_builder_dispatch_context.assert_called_once_with(
ANY, mock_page, only_expose_public_formula_fields=False
)
mock_dispatch_page_data_sources.assert_called_once_with(
ANY, mock_page, mock_dispatch_context
)

View file

@ -1,4 +1,4 @@
from unittest.mock import patch
from unittest.mock import ANY, MagicMock, patch
from django.test.utils import override_settings
from django.urls import reverse
@ -12,6 +12,12 @@ from rest_framework.status import (
)
from baserow.api.user_files.serializers import UserFileSerializer
from baserow.contrib.builder.data_sources.exceptions import (
DataSourceDoesNotExist,
DataSourceImproperlyConfigured,
)
from baserow.core.exceptions import PermissionException
from baserow.core.services.exceptions import DoesNotExist, ServiceImproperlyConfigured
@pytest.mark.django_db
@ -376,3 +382,178 @@ def test_ask_public_builder_domain_exists_with_public_backend_and_web_frontend_d
url = reverse("api:builder:domains:ask_exists") + "?domain=web-frontend.localhost"
response = api_client.get(url)
assert response.status_code == 200
@pytest.mark.django_db
@patch("baserow.contrib.builder.api.domains.public_views.BuilderDispatchContext")
@patch(
"baserow.contrib.builder.api.domains.public_views.DataSourceService.dispatch_data_source"
)
@patch(
"baserow.contrib.builder.api.domains.public_views.DataSourceHandler.get_data_source"
)
def test_public_dispatch_data_source_view(
mock_get_data_source,
mock_dispatch_data_source,
mock_builder_dispatch_context,
api_client,
):
"""
Test the PublicDispatchDataSourceView endpoint.
Ensure that the field_names are computed to secure the backend.
"""
mock_data_source = MagicMock()
mock_get_data_source.return_value = mock_data_source
mock_response = {}
mock_dispatch_data_source.return_value = mock_response
mock_dispatch_context = MagicMock()
mock_builder_dispatch_context.return_value = mock_dispatch_context
mock_data_source_id = 100
url = reverse(
"api:builder:domains:public_dispatch",
kwargs={"data_source_id": mock_data_source_id},
)
response = api_client.post(url)
assert response.status_code == 200
assert response.json() == mock_response
mock_get_data_source.assert_called_once_with(mock_data_source_id)
mock_builder_dispatch_context.assert_called_once_with(
ANY,
mock_data_source.page,
only_expose_public_formula_fields=True,
)
mock_dispatch_data_source.assert_called_once_with(
ANY, mock_data_source, mock_dispatch_context
)
@pytest.mark.django_db
@patch(
"baserow.contrib.builder.api.domains.public_views.DataSourceService.dispatch_page_data_sources"
)
@patch("baserow.contrib.builder.api.domains.public_views.BuilderDispatchContext")
@patch("baserow.contrib.builder.api.domains.public_views.PageHandler.get_page")
def test_public_dispatch_data_sources_view(
mock_get_page,
mock_builder_dispatch_context,
mock_dispatch_page_data_sources,
api_client,
):
"""
Test the PublicDispatchDataSourcesView endpoint.
Ensure that the field_names are computed to secure the backend.
"""
mock_page = MagicMock()
mock_get_page.return_value = mock_page
mock_dispatch_context = MagicMock()
mock_builder_dispatch_context.return_value = mock_dispatch_context
mock_service_contents = {"101": "mock_content"}
mock_dispatch_page_data_sources.return_value = mock_service_contents
mock_page_id = 100
url = reverse(
"api:builder:domains:public_dispatch_all", kwargs={"page_id": mock_page_id}
)
response = api_client.post(url)
assert response.status_code == 200
assert response.json() == mock_service_contents
mock_get_page.assert_called_once_with(mock_page_id)
mock_builder_dispatch_context.assert_called_once_with(
ANY, mock_page, only_expose_public_formula_fields=True
)
mock_dispatch_page_data_sources.assert_called_once_with(
ANY, mock_page, mock_dispatch_context
)
@pytest.mark.django_db
@pytest.mark.parametrize(
"expected_exception,error,detail",
[
(
DataSourceDoesNotExist,
"ERROR_DATA_SOURCE_DOES_NOT_EXIST",
"The requested data_source does not exist.",
),
(
DataSourceImproperlyConfigured,
"ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED",
"The data_source configuration is incorrect: ",
),
(
ServiceImproperlyConfigured,
"ERROR_DATA_SOURCE_IMPROPERLY_CONFIGURED",
"The data_source configuration is incorrect: ",
),
(
DoesNotExist,
"ERROR_DATA_SOURCE_DOES_NOT_EXIST",
"The requested data does not exist.",
),
(
PermissionException,
"PERMISSION_DENIED",
"You don't have the required permission to execute this operation.",
),
],
)
@patch(
"baserow.contrib.builder.api.domains.public_views.DataSourceService.dispatch_page_data_sources"
)
@patch("baserow.contrib.builder.api.domains.public_views.BuilderDispatchContext")
@patch("baserow.contrib.builder.api.domains.public_views.PageHandler.get_page")
def test_public_dispatch_data_sources_view_returns_error(
mock_get_page,
mock_builder_dispatch_context,
mock_dispatch_page_data_sources,
api_client,
expected_exception,
error,
detail,
):
"""
Test the PublicDispatchDataSourcesView endpoint.
Ensure that exceptions are handled and returned correctly.
"""
mock_page = MagicMock()
mock_get_page.return_value = mock_page
mock_dispatch_context = MagicMock()
mock_builder_dispatch_context.return_value = mock_dispatch_context
mock_service_contents = {"101": expected_exception()}
mock_dispatch_page_data_sources.return_value = mock_service_contents
mock_page_id = 100
url = reverse(
"api:builder:domains:public_dispatch_all", kwargs={"page_id": mock_page_id}
)
response = api_client.post(url)
assert response.status_code == 200
assert response.json() == {
"101": {
"_error": error,
"detail": detail,
}
}
mock_get_page.assert_called_once_with(mock_page_id)
mock_builder_dispatch_context.assert_called_once_with(
ANY, mock_page, only_expose_public_formula_fields=True
)
mock_dispatch_page_data_sources.assert_called_once_with(
ANY, mock_page, mock_dispatch_context
)

View file

@ -3,6 +3,7 @@ from unittest.mock import MagicMock, Mock, patch
from django.contrib.auth.models import AnonymousUser
from django.http import HttpRequest
from django.shortcuts import reverse
import pytest
@ -10,6 +11,12 @@ from baserow.contrib.builder.data_providers.data_provider_types import (
CurrentRecordDataProviderType,
DataSourceContextDataProviderType,
DataSourceDataProviderType,
DataSourceHandler,
)
from baserow.contrib.builder.data_providers.data_provider_types import (
ElementHandler as ElementHandlerToMock,
)
from baserow.contrib.builder.data_providers.data_provider_types import (
FormDataProviderType,
PageParameterDataProviderType,
PreviousActionProviderType,
@ -22,16 +29,41 @@ from baserow.contrib.builder.data_providers.exceptions import (
from baserow.contrib.builder.data_sources.builder_dispatch_context import (
BuilderDispatchContext,
)
from baserow.contrib.builder.data_sources.exceptions import DataSourceDoesNotExist
from baserow.contrib.builder.elements.handler import ElementHandler
from baserow.contrib.builder.formula_importer import import_formula
from baserow.contrib.builder.workflow_actions.models import EventTypes
from baserow.contrib.database.fields.handler import FieldHandler
from baserow.core.formula.exceptions import InvalidBaserowFormula
from baserow.core.formula.registries import DataProviderType
from baserow.core.services.exceptions import ServiceImproperlyConfigured
from baserow.core.user_sources.constants import DEFAULT_USER_ROLE_PREFIX
from baserow.core.user_sources.user_source_user import UserSourceUser
from baserow.core.utils import MirrorDict
def get_dispatch_context(data_fixture, api_request_factory, builder, page, data=None):
"""Helper that returns a dispatch context to be used in tests."""
user_source = data_fixture.create_user_source_with_first_type(application=builder)
user_source_user = data_fixture.create_user_source_user(
user_source=user_source,
)
token = user_source_user.get_refresh_token().access_token
fake_request = api_request_factory.post(
reverse("api:builder:domains:public_dispatch_all", kwargs={"page_id": page.id}),
{},
HTTP_USERSOURCEAUTHORIZATION=f"JWT {token}",
)
fake_request.user = user_source_user
if data is not None:
fake_request.data = data
return BuilderDispatchContext(
fake_request, page, only_expose_public_formula_fields=True
)
def test_page_parameter_data_provider_get_data_chunk():
page_parameter_provider = PageParameterDataProviderType()
@ -131,7 +163,9 @@ def test_data_source_data_provider_get_data_chunk(data_fixture):
data_source_provider = DataSourceDataProviderType()
dispatch_context = BuilderDispatchContext(HttpRequest(), page)
dispatch_context = BuilderDispatchContext(
HttpRequest(), page, only_expose_public_formula_fields=False
)
assert (
data_source_provider.get_data_chunk(
@ -181,7 +215,9 @@ def test_data_source_data_provider_get_data_chunk_with_formula(data_fixture):
"page_parameter": {"id": 2},
}
dispatch_context = BuilderDispatchContext(fake_request, page)
dispatch_context = BuilderDispatchContext(
fake_request, page, only_expose_public_formula_fields=False
)
assert (
data_source_provider.get_data_chunk(
@ -237,6 +273,7 @@ def test_data_source_data_provider_get_data_chunk_with_formula_using_datasource(
row_id="get('page_parameter.id')",
name="Id source",
)
data_source = data_fixture.create_builder_local_baserow_get_row_data_source(
user=user,
page=page,
@ -255,7 +292,9 @@ def test_data_source_data_provider_get_data_chunk_with_formula_using_datasource(
"page_parameter": {"id": 2},
}
dispatch_context = BuilderDispatchContext(fake_request, page)
dispatch_context = BuilderDispatchContext(
fake_request, page, only_expose_public_formula_fields=False
)
assert (
data_source_provider.get_data_chunk(
@ -329,7 +368,9 @@ def test_data_source_data_provider_get_data_chunk_with_formula_using_list_dataso
}
fake_request.GET = {"count": 20}
dispatch_context = BuilderDispatchContext(fake_request, page)
dispatch_context = BuilderDispatchContext(
fake_request, page, only_expose_public_formula_fields=False
)
assert (
data_source_provider.get_data_chunk(
@ -460,7 +501,9 @@ def test_data_source_data_provider_get_data_chunk_with_formula_recursion(
"page_parameter": {},
}
dispatch_context = BuilderDispatchContext(fake_request, page)
dispatch_context = BuilderDispatchContext(
fake_request, page, only_expose_public_formula_fields=False
)
assert (
data_source_provider.get_data_chunk(
@ -537,7 +580,9 @@ def test_data_source_data_provider_get_data_chunk_with_formula_using_datasource_
"page_parameter": {},
}
dispatch_context = BuilderDispatchContext(fake_request, page)
dispatch_context = BuilderDispatchContext(
fake_request, page, only_expose_public_formula_fields=False
)
assert (
data_source_provider.get_data_chunk(
@ -666,7 +711,9 @@ def test_data_source_context_data_provider_get_data_chunk(data_fixture):
fake_request.data = {
"page_parameter": {},
}
dispatch_context = BuilderDispatchContext(fake_request, page)
dispatch_context = BuilderDispatchContext(
fake_request, page, only_expose_public_formula_fields=False
)
# For fields that are not single select, `get_data_chunk` returns an empty response
assert (
@ -964,7 +1011,9 @@ def test_current_record_provider_get_data_chunk(data_fixture):
page=page, element=button_element, event=EventTypes.CLICK, user=user
)
dispatch_context = BuilderDispatchContext(fake_request, page, workflow_action)
dispatch_context = BuilderDispatchContext(
fake_request, page, workflow_action, only_expose_public_formula_fields=False
)
assert (
current_record_provider.get_data_chunk(dispatch_context, [field.db_column])
@ -991,3 +1040,502 @@ def test_current_record_provider_type_import_path(data_fixture):
assert CurrentRecordDataProviderType().import_path(
[field_1.db_column], id_mapping, data_source_id=data_source.id
) == [field_2.db_column]
def test_extract_properties_base_implementation():
"""Test that the base implementation of extract_properties() returns None."""
for provider_type in [
DataSourceDataProviderType,
FormDataProviderType,
PageParameterDataProviderType,
PreviousActionProviderType,
UserDataProviderType,
]:
assert provider_type().extract_properties([]) == {}
@pytest.mark.parametrize("path", ([], [""], ["foo"]))
@pytest.mark.django_db
def test_data_source_data_extract_properties_returns_none_if_invalid_data_source_id(
path,
):
"""
Test the DataSourceDataProviderType::extract_properties() method.
Ensure that None is returned if the data_source_id cannot be inferred or
is invalid.
"""
result = DataSourceDataProviderType().extract_properties(path)
assert result == {}
@patch.object(DataSourceHandler, "get_data_source")
@pytest.mark.django_db
def test_data_source_data_extract_properties_calls_correct_service_type(
mocked_get_data_source,
):
"""
Test the DataSourceDataProviderType::extract_properties() method.
Ensure that the correct service type is called.
"""
expected = "123"
mocked_service_type = MagicMock()
mocked_service_type.extract_properties.return_value = expected
mocked_data_source = MagicMock()
mocked_data_source.service.specific.get_type = MagicMock(
return_value=mocked_service_type
)
mocked_get_data_source.return_value = mocked_data_source
data_source_id = "1"
path = [data_source_id, expected]
result = DataSourceDataProviderType().extract_properties(path)
assert result == {mocked_data_source.service_id: expected}
mocked_get_data_source.assert_called_once_with(int(data_source_id))
mocked_service_type.extract_properties.assert_called_once_with([expected])
@pytest.mark.django_db
def test_data_source_data_extract_properties_returns_expected_results(data_fixture):
"""
Test the DataSourceDataProviderType::extract_properties() method. Ensure that
the expected Field name is returned.
"""
user, _ = data_fixture.create_user_and_token()
table, fields, _ = data_fixture.build_table(
user=user,
columns=[
("Food", "text"),
("Drink", "text"),
("Dessert", "text"),
],
rows=[
["Paneer Tikka", "Lassi", "Rasmalai"],
],
)
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
data_source = data_fixture.create_builder_local_baserow_get_row_data_source(
user=user,
page=page,
table=table,
row_id="1",
)
data_fixture.create_builder_table_element(
page=page,
data_source=data_source,
fields=[
{
"name": "Solids",
"type": "text",
"config": {
"value": f"get('data_source.{data_source.id}.field_{fields[0].id}')"
},
},
],
)
path = [data_source.id, f"field_{fields[0].id}"]
result = DataSourceDataProviderType().extract_properties(path)
expected = {data_source.service_id: [f"field_{fields[0].id}"]}
assert result == expected
@pytest.mark.parametrize("path", ([], [""], ["foo"]))
@pytest.mark.django_db
def test_data_source_context_extract_properties_returns_none_if_invalid_data_source_id(
path,
):
"""
Test the DataSourceContextDataProviderType::extract_properties() method.
Ensure that {} is returned if the data_source_id cannot be inferred or
is invalid.
"""
result = DataSourceContextDataProviderType().extract_properties(path)
assert result == {}
@patch.object(DataSourceHandler, "get_data_source")
@pytest.mark.django_db
def test_data_source_context_extract_properties_calls_correct_service_type(
mocked_get_data_source,
):
"""
Test the DataSourceContextDataProviderType::extract_properties() method.
Ensure that the correct service type is called.
"""
expected = "123"
mocked_service_type = MagicMock()
mocked_service_type.extract_properties.return_value = expected
mocked_data_source = MagicMock()
mocked_data_source.service.specific.get_type = MagicMock(
return_value=mocked_service_type
)
mocked_get_data_source.return_value = mocked_data_source
data_source_id = "1"
path = [data_source_id, expected]
result = DataSourceContextDataProviderType().extract_properties(path)
assert result == {mocked_data_source.service_id: expected}
mocked_get_data_source.assert_called_once_with(int(data_source_id))
mocked_service_type.extract_properties.assert_called_once_with([expected])
@pytest.mark.django_db
def test_data_source_context_extract_properties_returns_expected_results(data_fixture):
"""
Test the DataSourceContextDataProviderType::extract_properties() method. Ensure that
the expected Field name is returned.
"""
user, _ = data_fixture.create_user_and_token()
table, fields, _ = data_fixture.build_table(
user=user,
columns=[
("Food", "text"),
("Drink", "text"),
("Dessert", "text"),
],
rows=[
["Paneer Tikka", "Lassi", "Rasmalai"],
],
)
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
data_source = data_fixture.create_builder_local_baserow_get_row_data_source(
user=user,
page=page,
table=table,
row_id="1",
)
data_fixture.create_builder_table_element(
page=page,
data_source=data_source,
fields=[
{
"name": "Solids",
"type": "text",
"config": {
"value": f"get('data_source.{data_source.id}.field_{fields[0].id}')"
},
},
],
)
path = [str(data_source.id), f"field_{fields[0].id}"]
result = DataSourceContextDataProviderType().extract_properties(path)
expected = {data_source.service_id: [f"field_{fields[0].id}"]}
assert result == expected
@pytest.mark.parametrize(
"path",
(
["10", 999],
[20, 888],
),
)
@pytest.mark.django_db
@patch.object(DataSourceHandler, "get_data_source")
def test_data_source_context_data_provider_extract_properties_raises_if_data_source_doesnt_exist(
mock_get_data_source,
path,
):
"""
Test the DataSourceContextDataProviderType::extract_properties() method.
Ensure that InvalidBaserowFormula is raised if the Data Source doesn't exist.
"""
mock_get_data_source.side_effect = DataSourceDoesNotExist()
with pytest.raises(InvalidBaserowFormula):
DataSourceContextDataProviderType().extract_properties(path)
mock_get_data_source.assert_called_once_with(int(path[0]))
@pytest.mark.parametrize(
"path",
(
["10", 999],
[20, 888],
),
)
@pytest.mark.django_db
@patch.object(DataSourceHandler, "get_data_source")
def test_data_source_data_provider_extract_properties_raises_if_data_source_doesnt_exist(
mock_get_data_source,
path,
):
"""
Test the DataSourceDataProviderType::extract_properties() method.
Ensure that InvalidBaserowFormula is raised if the Data Source doesn't exist.
"""
mock_get_data_source.side_effect = DataSourceDoesNotExist()
with pytest.raises(InvalidBaserowFormula):
DataSourceDataProviderType().extract_properties(path)
mock_get_data_source.assert_called_once_with(int(path[0]))
@pytest.mark.parametrize("path", ([], [""], ["foo"]))
@pytest.mark.django_db
def test_current_record_extract_properties_returns_none_if_data_source_id_missing(path):
"""
Test the CurrentRecordDataProviderType::extract_properties() method.
Ensure that None is returned if the data_source_id is misssing in the
import context.
"""
result = CurrentRecordDataProviderType().extract_properties(path)
assert result == {}
@pytest.mark.parametrize(
"path,invalid_data_source_id",
(
["10", 999],
[20, 888],
),
)
@pytest.mark.django_db
@patch.object(DataSourceHandler, "get_data_source")
def test_current_record_extract_properties_raises_if_data_source_doesnt_exist(
mock_get_data_source,
path,
invalid_data_source_id,
):
"""
Test the CurrentRecordDataProviderType::extract_properties() method.
Ensure that InvalidBaserowFormula is raised if the Data Source doesn't exist.
"""
mock_get_data_source.side_effect = DataSourceDoesNotExist()
with pytest.raises(InvalidBaserowFormula):
CurrentRecordDataProviderType().extract_properties(path, invalid_data_source_id)
mock_get_data_source.assert_called_once_with(invalid_data_source_id)
@pytest.mark.django_db
@patch.object(ElementHandlerToMock, "get_import_context_addition")
@patch.object(DataSourceHandler, "get_data_source")
def test_current_record_extract_properties_calls_correct_service_type(
mock_get_data_source,
mock_get_import_context_addition,
):
"""
Test the CurrentRecordDataProviderType::extract_properties() method.
Ensure that the correct service type is called.
"""
fake_data_source_id = 100
mock_get_import_context_addition.return_value = {
"data_source_id": fake_data_source_id
}
expected_field = "field_123"
mocked_service_type = MagicMock()
mocked_service_type.extract_properties.return_value = expected_field
mocked_data_source = MagicMock()
mocked_data_source.service.specific.get_type = MagicMock(
return_value=mocked_service_type
)
mock_get_data_source.return_value = mocked_data_source
fake_element_id = 10
path = [expected_field]
result = CurrentRecordDataProviderType().extract_properties(path, fake_element_id)
assert result == {mocked_data_source.service_id: expected_field}
mock_get_data_source.assert_called_once_with(fake_element_id)
mocked_service_type.extract_properties.assert_called_once_with(
["0", expected_field]
)
@pytest.mark.django_db
@pytest.mark.parametrize(
"returns_list,schema_property",
[
(
True,
"field_123",
),
(
True,
None,
),
(
False,
"field_123",
),
(
False,
None,
),
],
)
@patch.object(DataSourceHandler, "get_data_source")
def test_current_record_extract_properties_called_with_correct_path(
mock_get_data_source, returns_list, schema_property
):
"""
Test the CurrentRecordDataProviderType::extract_properties() method.
Ensure that the `path` is generated correctly and passed to the service type.
"""
service_id = 100
data_source_id = 50
mock_service_type = MagicMock()
mock_service_type.returns_list = returns_list
mock_service_type.extract_properties.return_value = ["field_999"]
mock_data_source = MagicMock()
mock_data_source.service_id = service_id
mock_data_source.service.specific.get_type.return_value = mock_service_type
mock_get_data_source.return_value = mock_data_source
path = ["*"]
result = CurrentRecordDataProviderType().extract_properties(
path,
data_source_id,
schema_property,
)
mock_get_data_source.assert_called_once_with(data_source_id)
if returns_list:
if schema_property:
mock_service_type.extract_properties.assert_called_once_with(
["0", schema_property, *path]
)
else:
mock_service_type.extract_properties.assert_called_once_with(["0", *path])
assert result == {service_id: ["field_999"]}
else:
if schema_property:
mock_service_type.extract_properties.assert_called_once_with(
[schema_property, *path]
)
assert result == {service_id: ["field_999"]}
else:
# If service type doesn't return a list (e.g. Get Row) and
# there is no schema_property, ensure we return early with an
# empty dict, since there are no fields to extract.
mock_service_type.extract_properties.assert_not_called()
assert result == {}
@pytest.mark.django_db
@patch.object(DataSourceHandler, "get_data_source")
def test_current_record_extract_properties_returns_empty_if_invalid_data_source_id(
mock_get_data_source,
):
"""
Test the CurrentRecordDataProviderType::extract_properties() method. Ensure that
an empty dict is returned if the Data Source ID is invalid.
"""
invalid_data_source_id = None
path = ["field_123"]
result = CurrentRecordDataProviderType().extract_properties(
path, invalid_data_source_id
)
assert result == {}
mock_get_data_source.assert_not_called()
@pytest.mark.django_db
def test_current_record_extract_properties_returns_expected_results(data_fixture):
"""
Test the CurrentRecordDataProviderType::extract_properties() method. Ensure that
the expected Field name is returned.
"""
user, _ = data_fixture.create_user_and_token()
table, fields, rows = data_fixture.build_table(
user=user,
columns=[
("Food", "text"),
("Drink", "text"),
("Dessert", "text"),
],
rows=[
["Paneer Tikka", "Lassi", "Rasmalai"],
],
)
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
)
data_fixture.create_builder_table_element(
page=page,
data_source=data_source,
fields=[
{
"name": "Solids",
"type": "text",
"config": {"value": f"get('current_record.field_{fields[0].id}')"},
},
],
)
path = [f"field_{fields[0].id}"]
result = CurrentRecordDataProviderType().extract_properties(path, data_source.id)
expected = {data_source.service_id: [f"field_{fields[0].id}"]}
assert result == expected
def test_data_provider_type_extract_properties_base_method():
"""Test the DataProviderType::extract_properties() base method."""
class FakeDataProviderType(DataProviderType):
type = "fake_data_provider_type"
def get_data_chunk(self, *args, **kwargs):
return None
result = FakeDataProviderType().extract_properties([])
assert result == {}

View file

@ -1,10 +1,14 @@
from decimal import Decimal
from unittest.mock import patch
from django.http import HttpRequest
from django.shortcuts import reverse
from django.test import override_settings
import pytest
from baserow.contrib.builder.data_sources.builder_dispatch_context import (
FEATURE_FLAG_EXCLUDE_UNUSED_FIELDS,
BuilderDispatchContext,
)
from baserow.contrib.builder.data_sources.exceptions import DataSourceDoesNotExist
@ -164,7 +168,9 @@ def test_dispatch_data_source(data_fixture):
row_id="2",
)
dispatch_context = BuilderDispatchContext(HttpRequest(), page)
dispatch_context = BuilderDispatchContext(
HttpRequest(), page, only_expose_public_formula_fields=False
)
result = DataSourceHandler().dispatch_data_source(data_source, dispatch_context)
assert result == {
@ -222,7 +228,9 @@ def test_dispatch_data_sources(data_fixture):
row_id="b",
)
dispatch_context = BuilderDispatchContext(HttpRequest(), page)
dispatch_context = BuilderDispatchContext(
HttpRequest(), page, only_expose_public_formula_fields=False
)
result = DataSourceHandler().dispatch_data_sources(
[data_source, data_source2, data_source3], dispatch_context
)
@ -385,3 +393,100 @@ def test_recalculate_full_orders(data_fixture):
assert data_sources[1].id == data_sourceB.id
assert data_sources[1].order == Decimal("2.00300000000000000000")
@override_settings(FEATURE_FLAGS=[FEATURE_FLAG_EXCLUDE_UNUSED_FIELDS])
@pytest.mark.django_db
@patch(
"baserow.contrib.builder.data_sources.builder_dispatch_context.get_formula_field_names"
)
def test_dispatch_data_source_returns_formula_field_names(
mock_get_formula_field_names, data_fixture, api_request_factory
):
"""
Integration test to ensure get_formula_field_names() is called without errors.
"""
user = data_fixture.create_user()
workspace = data_fixture.create_workspace(user=user)
table, fields, rows = data_fixture.build_table(
user=user,
columns=[
("Food", "text"),
("Spiciness", "number"),
],
rows=[
["Paneer Tikka", 5],
["Gobi Manchurian", 8],
],
)
builder = data_fixture.create_builder_application(user=user, workspace=workspace)
integration = data_fixture.create_local_baserow_integration(
user=user, application=builder
)
page = data_fixture.create_builder_page(user=user, builder=builder)
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
integration=integration,
table=table,
)
data_fixture.create_builder_table_element(
user=user,
page=page,
data_source=data_source,
fields=[
{
"name": "FieldA",
"type": "text",
"config": {
"value": f"get('data_source.{data_source.id}.field_{fields[0].id}')"
},
},
{
"name": "FieldB",
"type": "text",
"config": {
"value": f"get('data_source.{data_source.id}.field_{fields[1].id}')"
},
},
],
)
user_source = data_fixture.create_user_source_with_first_type(application=builder)
user_source_user = data_fixture.create_user_source_user(
user_source=user_source,
)
token = user_source_user.get_refresh_token().access_token
fake_request = api_request_factory.post(
reverse("api:builder:domains:public_dispatch_all", kwargs={"page_id": page.id}),
{},
HTTP_USERSOURCEAUTHORIZATION=f"JWT {token}",
)
fake_request.user = user_source_user
dispatch_context = BuilderDispatchContext(fake_request, page)
mock_get_formula_field_names.return_value = {
"external": {data_source.service.id: [f"field_{field.id}" for field in fields]}
}
result = DataSourceHandler().dispatch_data_source(data_source, dispatch_context)
assert result == {
"has_next_page": False,
"results": [
{
"id": 1,
"order": "1.00000000000000000000",
f"field_{fields[0].id}": "Paneer Tikka",
f"field_{fields[1].id}": "5",
},
{
"id": 2,
"order": "2.00000000000000000000",
f"field_{fields[0].id}": "Gobi Manchurian",
f"field_{fields[1].id}": "8",
},
],
}

View file

@ -1,5 +1,5 @@
from decimal import Decimal
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, PropertyMock, patch
from django.http import HttpRequest
@ -400,7 +400,9 @@ def test_dispatch_data_source(data_fixture):
row_id="2",
)
dispatch_context = BuilderDispatchContext(HttpRequest(), page)
dispatch_context = BuilderDispatchContext(
HttpRequest(), page, only_expose_public_formula_fields=False
)
result = DataSourceService().dispatch_data_source(
user, data_source, dispatch_context
)
@ -460,7 +462,9 @@ def test_dispatch_page_data_sources(data_fixture):
row_id="b",
)
dispatch_context = BuilderDispatchContext(HttpRequest(), page)
dispatch_context = BuilderDispatchContext(
HttpRequest(), page, only_expose_public_formula_fields=False
)
result = DataSourceService().dispatch_page_data_sources(
user, page, dispatch_context
)
@ -512,6 +516,8 @@ def test_dispatch_data_source_permission_denied(data_fixture, stub_check_permiss
formula_context = MagicMock()
formula_context.cache = {}
type(formula_context.request).user = PropertyMock(return_value=user)
type(formula_context).page = PropertyMock(return_value=page)
with stub_check_permissions(raise_permission_denied=True), pytest.raises(
PermissionException

View file

@ -1,4 +1,4 @@
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch
from django.http import HttpRequest
@ -6,6 +6,7 @@ import pytest
from rest_framework.request import Request
from baserow.contrib.builder.data_sources.builder_dispatch_context import (
FEATURE_FLAG_EXCLUDE_UNUSED_FIELDS,
BuilderDispatchContext,
)
@ -32,14 +33,22 @@ def test_dispatch_context_page_range():
@pytest.mark.django_db
def test_dispatch_context_page_from_context(data_fixture):
@patch(
"baserow.contrib.builder.data_sources.builder_dispatch_context.get_formula_field_names"
)
def test_dispatch_context_page_from_context(mock_get_field_names, data_fixture):
mock_get_field_names.return_value = {"all": {}, "external": {}, "internal": {}}
user = data_fixture.create_user()
page = data_fixture.create_builder_page(user=user)
request = Request(HttpRequest())
request.user = user
dispatch_context = BuilderDispatchContext(request, page, offset=0, count=5)
dispatch_context = BuilderDispatchContext(
request, page, offset=0, count=5, only_expose_public_formula_fields=True
)
dispatch_context.annotated_data = "foobar"
dispatch_context.cache = {"key": "value"}
new_dispatch_context = BuilderDispatchContext.from_context(
dispatch_context, offset=5, count=1
@ -50,6 +59,11 @@ def test_dispatch_context_page_from_context(data_fixture):
assert new_dispatch_context.page == page
assert new_dispatch_context.offset == 5
assert new_dispatch_context.count == 1
assert new_dispatch_context.public_formula_fields == {
"all": {},
"external": {},
"internal": {},
}
def test_dispatch_context_search_query():
@ -82,3 +96,58 @@ def test_dispatch_context_sortings():
request.GET["order_by"] = "-field_1,-field_2"
dispatch_context = BuilderDispatchContext(request, None)
assert dispatch_context.sortings() == "-field_1,-field_2"
@pytest.mark.parametrize(
"feature_flag_is_set,only_expose_public_formula_fields",
(
[False, True],
[True, True],
[False, False],
[True, False],
),
)
@patch(
"baserow.contrib.builder.data_sources.builder_dispatch_context.get_formula_field_names"
)
@patch(
"baserow.contrib.builder.data_sources.builder_dispatch_context.feature_flag_is_enabled"
)
def test_builder_dispatch_context_field_names_computed_on_feature_flag(
mock_feature_flag_is_enabled,
mock_get_formula_field_names,
feature_flag_is_set,
only_expose_public_formula_fields,
):
"""
Test the BuilderDispatchContext::field_names property.
Ensure that the field_names property is computed only when the feature
flag is on.
"""
mock_feature_flag_is_enabled.return_value = True if feature_flag_is_set else False
mock_field_names = MagicMock()
mock_get_formula_field_names.return_value = mock_field_names
mock_request = MagicMock()
mock_page = MagicMock()
dispatch_context = BuilderDispatchContext(
mock_request,
mock_page,
only_expose_public_formula_fields=only_expose_public_formula_fields,
)
if feature_flag_is_set and only_expose_public_formula_fields:
assert dispatch_context.public_formula_fields == mock_field_names
mock_get_formula_field_names.assert_called_once_with(
mock_request.user, mock_page
)
mock_feature_flag_is_enabled.assert_called_once_with(
FEATURE_FLAG_EXCLUDE_UNUSED_FIELDS
)
else:
assert dispatch_context.public_formula_fields is None
mock_get_formula_field_names.assert_not_called()

View file

@ -0,0 +1,40 @@
"""
Test the CollectionElementTypeMixin class.
"""
from unittest.mock import MagicMock
import pytest
from baserow.contrib.builder.elements.mixins import CollectionElementTypeMixin
MODULE_PATH = "baserow.contrib.builder.elements.collection_field_types"
@pytest.mark.parametrize(
"schema_property",
[
"field_123",
None,
],
)
def test_import_context_addition_sets_schema_property(schema_property):
"""
Test the import_context_addition() method.
Ensure that the schema_property is set when the element has a schema property.
"""
data_source_id = 100
mock_element = MagicMock()
mock_element.schema_property = schema_property
mock_element.data_source_id = data_source_id
result = CollectionElementTypeMixin().import_context_addition(mock_element)
assert result["data_source_id"] == data_source_id
if schema_property:
assert result["schema_property"] == schema_property
else:
assert "schema_property" not in result

View file

@ -626,12 +626,26 @@ def test_choice_element_is_valid_formula_data_source(data_fixture):
)
# Call is_valid with an option that is not present in the list raises an exception
dispatch_context = BuilderDispatchContext(HttpRequest(), page, offset=0, count=20)
dispatch_context = BuilderDispatchContext(
HttpRequest(),
page,
offset=0,
count=20,
only_expose_public_formula_fields=False,
)
with pytest.raises(FormDataProviderChunkInvalidException):
ChoiceElementType().is_valid(choice, "Invalid", dispatch_context)
# Call is_valid with a valid option simply returns its value
dispatch_context = BuilderDispatchContext(HttpRequest(), page, offset=0, count=20)
dispatch_context = BuilderDispatchContext(
HttpRequest(),
page,
offset=0,
count=20,
only_expose_public_formula_fields=False,
)
assert ChoiceElementType().is_valid(choice, "BMW", dispatch_context) == "BMW"
@ -1245,7 +1259,15 @@ def test_choice_element_integer_option_values(data_fixture):
)
expected_choices = [row.id for row in rows]
dispatch_context = BuilderDispatchContext(HttpRequest(), page, offset=0, count=20)
dispatch_context = BuilderDispatchContext(
HttpRequest(),
page,
offset=0,
count=20,
only_expose_public_formula_fields=False,
)
for value in expected_choices:
dispatch_context.reset_call_stack()
assert ChoiceElementType().is_valid(choice, value, dispatch_context) is value

View file

@ -490,3 +490,22 @@ def test_table_element_import_field_with_formula_with_current_record(data_fixtur
table_element.fields.first().config["label"]
== f"get('current_record.field_{fields[0].id}')"
)
@pytest.mark.django_db
def test_import_context_addition_returns_data_source_id(data_fixture):
"""
Test the TableElementType::import_context_addition() method.
Ensure the data_source_id is included in the returned dict.
"""
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source()
table_element = data_fixture.create_builder_table_element(
data_source=data_source,
)
table_element_type = table_element.get_type()
context = table_element_type.import_context_addition(table_element)
assert context["data_source_id"] == data_source.id

View file

@ -0,0 +1,873 @@
from typing import List
from unittest.mock import MagicMock, patch
import pytest
from baserow.contrib.builder.formula_property_extractor import (
FormulaFieldVisitor,
get_data_source_field_names,
get_element_field_names,
get_formula_field_names,
get_workflow_action_field_names,
)
from baserow.contrib.builder.workflow_actions.models import EventTypes
from baserow.contrib.builder.workflow_actions.service import (
BuilderWorkflowActionService,
)
from baserow.contrib.builder.workflow_actions.workflow_action_types import (
CreateRowWorkflowActionType,
DeleteRowWorkflowActionType,
NotificationWorkflowActionType,
OpenPageWorkflowActionType,
UpdateRowWorkflowActionType,
)
from baserow.core.formula import BaserowFormula
from baserow.core.formula.exceptions import InvalidBaserowFormula
from baserow.core.formula.parser.exceptions import BaserowFormulaSyntaxError
from baserow.core.formula.registries import DataProviderType
from baserow.core.formula.runtime_formula_context import RuntimeFormulaContext
class TestDataProviderType(DataProviderType):
type = "test_provider"
def get_data_chunk(
self, runtime_formula_context: RuntimeFormulaContext, path: List[str]
):
return super().get_data_chunk(runtime_formula_context, path)
@pytest.mark.django_db
def test_get_formula_field_names_returns_empty_list(data_fixture):
"""
Test the get_formula_field_names() function.
Ensure that an empty dict is returned if no Elements are found.
"""
user = data_fixture.create_user()
page = data_fixture.create_builder_page(user=user)
results = get_formula_field_names(user, page)
assert results == {
"all": {},
"external": {},
"internal": {},
}
@pytest.mark.django_db
def test_get_formula_field_names_returns_all_field_names(data_fixture):
"""
Test the get_formula_field_names() function.
Ensure that all the expected field names are returned.
"""
user = data_fixture.create_user()
workspace = data_fixture.create_workspace(user=user)
table, fields, rows = data_fixture.build_table(
user=user,
columns=[
("Food", "text"),
("Spiciness", "number"),
],
rows=[
["Paneer Tikka", 5],
["Gobi Manchurian", 8],
],
)
builder = data_fixture.create_builder_application(user=user, workspace=workspace)
integration = data_fixture.create_local_baserow_integration(
user=user, application=builder
)
page = data_fixture.create_builder_page(user=user)
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
integration=integration,
table=table,
)
data_fixture.create_builder_table_element(
user=user,
page=page,
data_source=data_source,
fields=[
{
"name": "FieldA",
"type": "text",
"config": {"value": f"get('current_record.field_{fields[0].id}')"},
},
{
"name": "FieldB",
"type": "text",
"config": {"value": f"get('current_record.field_{fields[1].id}')"},
},
],
)
results = get_formula_field_names(user, page)
assert sorted(list(results)) == ["all", "external", "internal"]
assert sorted(results["all"][data_source.service_id]) == [
f"field_{field.id}" for field in fields
]
assert sorted(results["external"][data_source.service_id]) == [
f"field_{field.id}" for field in fields
]
assert results["internal"] == {}
@pytest.mark.django_db
def test_get_formula_field_names_returns_some_field_names(data_fixture):
"""
Test the get_formula_field_names() function.
Ensure that only some of the field names are returned. A field name should
only be returned if it is used in the page.
"""
user = data_fixture.create_user()
workspace = data_fixture.create_workspace(user=user)
table, fields, rows = data_fixture.build_table(
user=user,
columns=[
("Food", "text"),
("Spiciness", "number"),
],
rows=[
["Paneer Tikka", 5],
["Gobi Manchurian", 8],
],
)
builder = data_fixture.create_builder_application(user=user, workspace=workspace)
integration = data_fixture.create_local_baserow_integration(
user=user, application=builder
)
page = data_fixture.create_builder_page(user=user)
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
integration=integration,
table=table,
)
data_fixture.create_builder_table_element(
user=user,
page=page,
data_source=data_source,
fields=[
# Although there are two fields, this Table element only uses one.
{
"name": "FieldA",
"type": "text",
"config": {"value": f"get('current_record.field_{fields[0].id}')"},
},
],
)
results = get_formula_field_names(user, page)
# Since the Table element (which is the only element in the Page) uses
# only one field, ensure that specific field is the only one returned.
assert results == {
"all": {
data_source.service_id: [f"field_{fields[0].id}"],
},
"external": {
data_source.service_id: [f"field_{fields[0].id}"],
},
"internal": {},
}
def test_extract_properties_returns_none():
"""
Ensure the default implementation of extract_properties() returns {}.
"""
assert TestDataProviderType().extract_properties([]) == {}
@patch("baserow.contrib.builder.mixins.get_parse_tree_for_formula")
def test_get_element_field_names_returns_empty_if_no_elements(mock_parse_tree):
"""
Ensure the get_element_field_names() function returns an empty dict if
there are no elements.
"""
results = get_element_field_names([], {})
assert results == {"external": {}}
mock_parse_tree.assert_not_called()
@pytest.mark.django_db
@patch("baserow.contrib.builder.mixins.get_parse_tree_for_formula")
def test_get_element_field_names_returns_empty_if_no_formulas(
mock_parse_tree, data_fixture
):
"""
Ensure the get_element_field_names() function returns an empty dict if
the element has no formula.
"""
user = data_fixture.create_user()
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
heading_element_1 = data_fixture.create_builder_heading_element(page=page, value="")
heading_element_2 = data_fixture.create_builder_heading_element(page=page, value="")
results = get_element_field_names([heading_element_1, heading_element_2], {})
assert results == {"external": {}}
mock_parse_tree.assert_not_called()
@pytest.mark.django_db
@patch("baserow.contrib.builder.mixins.get_parse_tree_for_formula")
def test_get_element_field_names_returns_empty_if_invalid_formula(
mock_parse_tree, data_fixture
):
"""
Ensure the get_element_field_names() function returns an empty dict if
the elements has an invalid formula.
"""
user = data_fixture.create_user()
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
heading_element = data_fixture.create_builder_heading_element(
page=page, value="foo"
)
# Simulate an "invalid formula" error
mock_parse_tree.side_effect = BaserowFormulaSyntaxError("Invalid formula!")
result = get_element_field_names([heading_element], {})
assert result == {"external": {}}
mock_parse_tree.assert_called_once_with("foo")
@pytest.mark.django_db
def test_get_element_field_names_returns_field_names(data_fixture):
"""
Ensure the get_element_field_names() function returns the expected field names.
"""
user = data_fixture.create_user()
table, fields, _ = data_fixture.build_table(
user=user,
columns=[
("Fruit", "text"),
("Color", "text"),
("Country", "text"),
],
rows=[
["Apple", "Green", "China"],
["Banana", "Yellow", "Ecuador"],
["Cherry", "Red", "Turkey"],
],
)
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
data_source_1 = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
)
data_source_2 = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
)
heading_element_1 = data_fixture.create_builder_heading_element(
page=page,
value=f"get('data_source.{data_source_1.id}.0.field_{fields[0].id}')",
)
heading_element_2 = data_fixture.create_builder_heading_element(
page=page,
value=f"get('data_source.{data_source_1.id}.0.field_{fields[1].id}')",
)
heading_element_3 = data_fixture.create_builder_heading_element(
page=page,
value=f"get('data_source.{data_source_2.id}.0.field_{fields[2].id}')",
)
result = get_element_field_names(
[heading_element_1, heading_element_2, heading_element_3],
{},
)
assert list(result) == ["external"]
assert sorted(list(result["external"])) == [
data_source_1.service.id,
data_source_2.service.id,
]
assert sorted(list(result["external"][data_source_1.service.id])) == [
# Since only the first two fields are used by elements in this page,
# we expect to see _only_ those two fields.
f"field_{fields[0].id}",
f"field_{fields[1].id}",
]
assert sorted(list(result["external"][data_source_2.service.id])) == [
f"field_{fields[2].id}",
]
@pytest.mark.django_db
@patch("baserow.contrib.builder.mixins.get_parse_tree_for_formula")
def test_get_workflow_action_field_names_returns_empty_if_no_workflow_actions(
mock_parse_tree, data_fixture
):
"""
Ensure the get_workflow_action_field_names() function returns an empty dict if
there are no workflow actions.
"""
user = data_fixture.create_user()
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
results = get_workflow_action_field_names(user, page, {})
assert results == {"internal": {}, "external": {}}
mock_parse_tree.assert_not_called()
@pytest.mark.django_db
@patch("baserow.contrib.builder.mixins.get_parse_tree_for_formula")
def test_get_workflow_action_field_names_returns_empty_if_no_formulas(
mock_parse_tree, data_fixture
):
"""
Ensure the get_workflow_action_field_names() function returns an empty dict if
the workflow action has no formula.
"""
user = data_fixture.create_user()
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
button_element = data_fixture.create_builder_button_element(page=page)
workflow_action = (
BuilderWorkflowActionService()
.create_workflow_action(
user,
NotificationWorkflowActionType(),
page=page,
element=button_element,
event=EventTypes.CLICK,
description="",
)
.specific
)
results = get_workflow_action_field_names(user, page, {})
assert results == {"external": {}, "internal": {}}
mock_parse_tree.assert_not_called()
@pytest.mark.django_db
@patch("baserow.contrib.builder.mixins.get_parse_tree_for_formula")
def test_get_workflow_action_field_names_returns_empty_if_invalid_formula(
mock_parse_tree, data_fixture
):
"""
Ensure the get_workflow_action_field_names() function returns an empty dict if
the workflow action has an invalid formula.
"""
user = data_fixture.create_user()
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
button_element = data_fixture.create_builder_button_element(page=page)
workflow_action = (
BuilderWorkflowActionService()
.create_workflow_action(
user,
NotificationWorkflowActionType(),
page=page,
element=button_element,
event=EventTypes.CLICK,
description="foo",
)
.specific
)
# Simulate an "invalid formula" error
mock_parse_tree.side_effect = BaserowFormulaSyntaxError("Invalid formula!")
results = get_workflow_action_field_names(user, page, {})
assert results == {"external": {}, "internal": {}}
mock_parse_tree.assert_called_once_with("foo")
@pytest.mark.django_db
def test_get_workflow_action_field_names_returns_field_names(data_fixture):
"""
Ensure the get_workflow_action_field_names() function returns field names.
"""
user = data_fixture.create_user()
table, fields, _ = data_fixture.build_table(
user=user,
columns=[
("Fruit", "text"),
("Color", "text"),
("Country", "text"),
],
rows=[
["Apple", "Green", "China"],
["Banana", "Yellow", "Ecuador"],
["Cherry", "Red", "Turkey"],
],
)
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
data_source_1 = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
)
data_source_2 = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
)
button_element = data_fixture.create_builder_button_element(page=page)
workflow_action = (
BuilderWorkflowActionService()
.create_workflow_action(
user,
NotificationWorkflowActionType(),
page=page,
element=button_element,
event=EventTypes.CLICK,
title=f"get('data_source.{data_source_1.id}.1.field_{fields[0].id}')",
description=f"get('data_source.{data_source_2.id}.1.field_{fields[1].id}')",
)
.specific
)
results = get_workflow_action_field_names(user, page, {})
assert sorted(list(results)) == ["external", "internal"]
# Since the third field is not used anywhere in the page, we do _not_
# expect to see that field in the results.
assert sorted(list(results["external"][data_source_1.service.id])) == [
f"field_{fields[0].id}"
]
assert sorted(list(results["external"][data_source_2.service.id])) == [
f"field_{fields[1].id}"
]
assert results["internal"] == {}
@pytest.mark.django_db
@pytest.mark.parametrize(
"workflow_action_type,formula_fields",
[
[
NotificationWorkflowActionType,
["title", "description"],
],
[
OpenPageWorkflowActionType,
["navigate_to_url"],
],
],
)
def test_get_workflow_action_field_names_returns_external_field_names(
data_fixture, workflow_action_type, formula_fields
):
"""
Ensure the get_workflow_action_field_names() function returns field names.
Test that the external and internal dicts are correctly segregated.
"""
user = data_fixture.create_user()
table, fields, _ = data_fixture.build_table(
user=user,
columns=[
("Fruit", "text"),
("Color", "text"),
("Country", "text"),
],
rows=[
["Apple", "Green", "China"],
["Banana", "Yellow", "Ecuador"],
["Cherry", "Red", "Turkey"],
],
)
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
)
button_element = data_fixture.create_builder_button_element(page=page)
workflow_action = (
BuilderWorkflowActionService()
.create_workflow_action(
user,
workflow_action_type(),
page=page,
element=button_element,
event=EventTypes.CLICK,
)
.specific
)
for field in formula_fields:
setattr(
workflow_action,
field,
f"get('data_source.{data_source.id}.1.field_{fields[0].id}')",
)
workflow_action.save()
results = get_workflow_action_field_names(user, page, {})
assert results == {
# Since the workflow action is public/safe, only the external dict
# should be populated.
"external": {
data_source.service.id: [f"field_{fields[0].id}"],
},
"internal": {},
}
@pytest.mark.django_db
@pytest.mark.parametrize(
"workflow_action_type",
[
CreateRowWorkflowActionType,
DeleteRowWorkflowActionType,
UpdateRowWorkflowActionType,
],
)
def test_get_workflow_action_field_names_returns_internal_field_names(
data_fixture, workflow_action_type
):
"""
Ensure the get_workflow_action_field_names() function returns field names.
Test that the external and internal dicts are correctly segregated.
"""
user = data_fixture.create_user()
table, fields, _ = data_fixture.build_table(
user=user,
columns=[
("Fruit", "text"),
("Color", "text"),
("Country", "text"),
],
rows=[
["Apple", "Green", "China"],
["Banana", "Yellow", "Ecuador"],
["Cherry", "Red", "Turkey"],
],
)
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
)
button_element = data_fixture.create_builder_button_element(page=page)
workflow_action = (
BuilderWorkflowActionService()
.create_workflow_action(
user,
workflow_action_type(),
page=page,
element=button_element,
event=EventTypes.CLICK,
)
.specific
)
workflow_action.service.row_id = (
f"get('data_source.{data_source.id}.1.field_{fields[0].id}')"
)
workflow_action.service.save()
results = get_workflow_action_field_names(user, page, {})
assert results == {
"external": {},
# Since the workflow action is public/safe, only the external dict
# should be populated.
"internal": {
data_source.service.id: [f"field_{fields[0].id}"],
},
}
@pytest.mark.django_db
@patch("baserow.contrib.builder.mixins.get_parse_tree_for_formula")
def test_get_data_source_field_names_returns_empty_if_no_data_sources(
mock_parse_tree, data_fixture
):
"""
Ensure the get_data_source_field_names() function returns an empty dict if
there are no data sources.
"""
user = data_fixture.create_user()
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
result = get_data_source_field_names(page)
assert result == {"internal": {}}
mock_parse_tree.assert_not_called()
@pytest.mark.django_db
@patch("baserow.contrib.builder.mixins.get_parse_tree_for_formula")
def test_get_data_source_field_names_returns_empty_if_invalid_formula(
mock_parse_tree, data_fixture
):
"""
Ensure the get_data_source_field_names() function returns an empty dict if
the data source has an invalid formula.
"""
user = data_fixture.create_user()
table, fields, _ = data_fixture.build_table(
user=user,
columns=[
("Fruit", "text"),
("Color", "text"),
("Country", "text"),
],
rows=[
["Apple", "Green", "China"],
["Banana", "Yellow", "Ecuador"],
["Cherry", "Red", "Turkey"],
],
)
builder = data_fixture.create_builder_application(user=user)
integration = data_fixture.create_local_baserow_integration(
application=builder, user=user
)
page = data_fixture.create_builder_page(user=user, builder=builder)
list_rows_service = data_fixture.create_local_baserow_list_rows_service(
integration=integration,
search_query="foo",
)
data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
service=list_rows_service,
)
# Simulate an "invalid formula" error
mock_parse_tree.side_effect = BaserowFormulaSyntaxError("Invalid formula!")
results = get_data_source_field_names(page)
assert results == {"internal": {}}
mock_parse_tree.assert_called_once_with("foo")
@pytest.mark.django_db
def test_get_data_source_field_names_list_rows_returns_field_names(data_fixture):
"""
Ensure the get_data_source_field_names() function returns the expected
field_names for the List Rows service type.
"""
user = data_fixture.create_user()
table, fields, _ = data_fixture.build_table(
user=user,
columns=[
("Fruit", "text"),
("Color", "text"),
("Country", "text"),
],
rows=[
["Apple", "Green", "China"],
["Banana", "Yellow", "Ecuador"],
["Cherry", "Red", "Turkey"],
],
)
builder = data_fixture.create_builder_application(user=user)
integration = data_fixture.create_local_baserow_integration(
application=builder, user=user
)
page = data_fixture.create_builder_page(user=user, builder=builder)
list_rows_service = data_fixture.create_local_baserow_list_rows_service(
integration=integration,
)
data_source_1 = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
)
data_source_2 = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
service=list_rows_service,
)
list_rows_service.search_query = (
f"get('data_source.{data_source_1.id}.0.field_{fields[0].id}')"
)
list_rows_service.save()
results = get_data_source_field_names(page)
assert list(results) == ["internal"]
assert sorted(list(results["internal"][data_source_1.service.id])) == [
f"field_{fields[0].id}"
]
@pytest.mark.django_db
def test_get_data_source_field_names_get_row_returns_field_names(data_fixture):
"""
Ensure the get_data_source_field_names() function returns the expected
field_names for the Get Row service type.
"""
user = data_fixture.create_user()
table, fields, _ = data_fixture.build_table(
user=user,
columns=[
("Fruit", "text"),
("Color", "text"),
("Country", "text"),
],
rows=[
["Apple", "Green", "China"],
["Banana", "Yellow", "Ecuador"],
["Cherry", "Red", "Turkey"],
],
)
builder = data_fixture.create_builder_application(user=user)
integration = data_fixture.create_local_baserow_integration(
application=builder, user=user
)
page = data_fixture.create_builder_page(user=user, builder=builder)
get_row_service = data_fixture.create_local_baserow_get_row_service(
integration=integration
)
data_source_1 = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
)
data_source_2 = data_fixture.create_builder_local_baserow_get_row_data_source(
user=user,
page=page,
table=table,
service=get_row_service,
)
get_row_service.row_id = (
f"get('data_source.{data_source_1.id}.0.field_{fields[1].id}')"
)
get_row_service.save()
results = get_data_source_field_names(page)
assert results == {
"internal": {data_source_1.service.id: [f"field_{fields[1].id}"]},
}
@pytest.mark.django_db
@patch("baserow.contrib.builder.mixins.get_parse_tree_for_formula")
def test_get_data_source_field_names_skips_if_no_service(mock_parse_tree, data_fixture):
"""
Ensure the get_data_source_field_names() function skips processing a
Data Source if its service is not configured.
The user can create a Data Source and not fully configure it. Thus it is
important that the code doesn't assume a Data Source always has a Service.
"""
user = data_fixture.create_user()
table, _, _ = data_fixture.build_table(
user=user,
columns=[
("Fruit", "text"),
],
rows=[
["Apple", "Green", "China"],
],
)
builder = data_fixture.create_builder_application(user=user)
page = data_fixture.create_builder_page(user=user, builder=builder)
# Create a Data Source, but don't create a Service
data_source = data_fixture.create_builder_local_baserow_list_rows_data_source(
user=user,
page=page,
table=table,
)
data_source.service = None
data_source.save()
results = get_data_source_field_names(page)
assert results == {"internal": {}}
mock_parse_tree.assert_not_called()
@patch(
"baserow.contrib.builder.formula_property_extractor.builder_data_provider_type_registry"
)
def test_formula_field_visitor_visit_function_call_handles_formula_error(
mock_data_provider_registry,
):
"""
Test the FormulaFieldVisitor::visitFunctionCall() method.
Ensure that formula errors are handled and ignored.
"""
mock_data_provider_type = MagicMock()
mock_data_provider_type.extract_properties.side_effect = InvalidBaserowFormula()
mock_data_provider_registry.get.return_value = mock_data_provider_type
mock_expression = MagicMock(spec=BaserowFormula.StringLiteralContext)
mock_expression.accept.return_value = "'current_record.field_999'"
mock_func = MagicMock()
mock_func.accept.return_value = "get"
context = MagicMock()
context.func_name.return_value = mock_func
context.expr.return_value = [mock_expression]
visitor = FormulaFieldVisitor()
visitor.visitFunctionCall(context)
assert visitor.results == {}
mock_data_provider_type.extract_properties.assert_called_once_with(["field_999"])

View file

@ -7,6 +7,7 @@ from baserow.contrib.builder.data_sources.service import DataSourceService
from baserow.contrib.builder.elements.registries import element_type_registry
from baserow.contrib.builder.elements.service import ElementService
from baserow.contrib.builder.pages.service import PageService
from baserow.contrib.database.api.rows.serializers import RowSerializer
from baserow.contrib.integrations.local_baserow.models import LocalBaserowGetRow
from baserow.contrib.integrations.local_baserow.service_types import (
LocalBaserowGetRowUserServiceType,
@ -195,11 +196,13 @@ def test_local_baserow_get_row_service_dispatch_transform(data_fixture):
)
service = data_fixture.create_local_baserow_get_row_service(
integration=integration, view=view, table=table, row_id="'2'"
integration=integration, view=view, table=table, row_id=f"{rows[1].id}"
)
service_type = LocalBaserowGetRowUserServiceType()
dispatch_context = FakeDispatchContext()
dispatch_values = LocalBaserowUpsertRowServiceType().resolve_service_formulas(
service, dispatch_context
)
@ -246,6 +249,7 @@ def test_local_baserow_get_row_service_dispatch_data_with_view_filter(data_fixtu
service_type = service.get_type()
dispatch_context = FakeDispatchContext()
dispatch_values = service_type.resolve_service_formulas(service, dispatch_context)
with pytest.raises(DoesNotExist):
service_type.dispatch_data(service, dispatch_values, dispatch_context)
@ -279,6 +283,7 @@ def test_local_baserow_get_row_service_dispatch_data_with_service_search(
service_type = service.get_type()
dispatch_context = FakeDispatchContext()
dispatch_values = service_type.resolve_service_formulas(service, dispatch_context)
with pytest.raises(DoesNotExist):
service_type.dispatch_data(service, dispatch_values, dispatch_context)
@ -301,6 +306,7 @@ def test_local_baserow_get_row_service_dispatch_data_with_service_integer_search
["42"],
],
)
integration = data_fixture.create_local_baserow_integration(
application=page.builder, user=user
)
@ -308,8 +314,8 @@ def test_local_baserow_get_row_service_dispatch_data_with_service_integer_search
service = data_fixture.create_local_baserow_get_row_service(
integration=integration, table=table, row_id="", search_query="42"
)
service_type = service.get_type()
service_type = service.get_type()
dispatch_context = FakeDispatchContext()
dispatch_values = service_type.resolve_service_formulas(service, dispatch_context)
@ -730,3 +736,202 @@ def test_order_by_is_applied_depending_on_views_sorts(
mock_queryset.order_by.assert_called_once_with(*view_sorts)
else:
mock_queryset.order_by.assert_not_called()
@pytest.mark.django_db
@patch("baserow.contrib.integrations.local_baserow.service_types.CoreHandler")
@pytest.mark.parametrize(
"field_name_checks,expect_only_applied",
(
[
(
{"all": ["field_foo", "field_bar"], "external": None, "internal": None},
True,
),
(
{"all": ["field_foo", "field_bar"], "external": [], "internal": []},
True,
),
(
{
"all": ["field_foo", "field_bar"],
"external": ["foo"],
"internal": ["bar"],
},
True,
),
(
{"all": None, "external": None, "internal": None},
False,
),
(
{"all": None, "external": [], "internal": []},
False,
),
(
{"all": None, "external": ["field_foo"], "internal": ["field_bar"]},
False,
),
]
),
)
def test_only_is_applied_to_queryset_if_field_names(
mock_core_handler, field_name_checks, expect_only_applied, data_fixture
):
"""
Test to ensure that the queryset's only() is applied if
field_names exists.
"""
user = data_fixture.create_user()
page = data_fixture.create_builder_page(user=user)
table, _, _ = data_fixture.build_table(
user=user,
columns=[
("Name", "text"),
("My Color", "text"),
],
rows=[
["BMW", "Blue"],
["Audi", "Orange"],
],
)
view = data_fixture.create_grid_view(user, table=table)
integration = data_fixture.create_local_baserow_integration(
application=page.builder, user=user
)
service = data_fixture.create_local_baserow_list_rows_service(
integration=integration,
view=view,
table=table,
)
service_type = LocalBaserowGetRowUserServiceType()
mock_queryset = MagicMock()
mock_objects = MagicMock()
mock_objects.enhance_by_fields.return_value = mock_queryset
mock_model = MagicMock()
mock_objects = mock_model.objects.all.return_value = mock_objects
mock_table = MagicMock()
mock_table.get_model.return_value = mock_model
resolved_values = {
"table": mock_table,
}
service_type.get_dispatch_search = MagicMock(return_value=None)
service_type.get_dispatch_filters = MagicMock(return_value=mock_queryset)
service_type.get_dispatch_sorts = MagicMock(return_value=(None, mock_queryset))
field_names = {"all": {}, "external": {}, "internal": {}}
for key, value in field_name_checks.items():
field_names[key] = {service.id: value}
dispatch_context = FakeDispatchContext()
with patch(
"baserow.test_utils.pytest_conftest.FakeDispatchContext.public_formula_fields",
None,
):
dispatch_context.public_formula_fields = field_names
service_type.dispatch_data(service, resolved_values, dispatch_context)
if expect_only_applied:
mock_queryset.only.assert_called_once_with(
field_names["all"][service.id][0],
field_names["all"][service.id][1],
)
else:
mock_queryset.only.assert_not_called()
@pytest.mark.parametrize(
"field_names",
[
None,
{"external": {1: ["field_123"]}},
],
)
@patch(
"baserow.contrib.integrations.local_baserow.service_types.get_row_serializer_class"
)
def test_dispatch_transform_passes_field_ids(mock_get_serializer, field_names):
"""
Test the LocalBaserowGetRowUserServiceType::dispatch_transform() method.
Ensure that the field_ids parameter is passed to the serializer class.
"""
mock_serializer_instance = MagicMock()
mock_serializer_instance.data.return_value = "foo"
mock_serializer = MagicMock(return_value=mock_serializer_instance)
mock_get_serializer.return_value = mock_serializer
service_type = LocalBaserowGetRowUserServiceType()
service_type.extract_field_ids = MagicMock(return_value=[])
dispatch_data = {
"baserow_table_model": MagicMock(),
"data": [],
}
if field_names:
dispatch_data["public_formula_fields"] = field_names
results = service_type.dispatch_transform(dispatch_data)
assert results == mock_serializer_instance.data
mock_get_serializer.assert_called_once_with(
dispatch_data["baserow_table_model"],
RowSerializer,
is_response=True,
field_ids=[],
)
service_type.extract_field_ids.assert_called_once_with(field_names)
@pytest.mark.parametrize(
"path,expected",
[
(
[],
[],
),
(
["foo"],
[],
),
(
["", "foo"],
[],
),
(
["field_123"],
["field_123"],
),
(
["field_456", ""],
["field_456"],
),
(
["field_789", "", ""],
["field_789"],
),
],
)
def test_extract_properties(path, expected):
"""
Test the extract_properties() method.
Given the input path, ensure the expected field name is returned.
"""
service_type = LocalBaserowGetRowUserServiceType()
result = service_type.extract_properties(path)
assert result == expected

View file

@ -7,6 +7,7 @@ from baserow.contrib.builder.data_sources.service import DataSourceService
from baserow.contrib.builder.elements.registries import element_type_registry
from baserow.contrib.builder.elements.service import ElementService
from baserow.contrib.builder.pages.service import PageService
from baserow.contrib.database.api.rows.serializers import RowSerializer
from baserow.contrib.database.rows.handler import RowHandler
from baserow.contrib.database.table.handler import TableHandler
from baserow.contrib.database.views.models import SORT_ORDER_ASC, SORT_ORDER_DESC
@ -399,6 +400,7 @@ def test_local_baserow_list_rows_service_dispatch_data_with_varying_filter_types
view = data_fixture.create_grid_view(
user, table=table, owned_by=user, filter_type="OR"
)
dispatch_context = FakeDispatchContext()
service_type = LocalBaserowListRowsUserServiceType()
service = data_fixture.create_local_baserow_list_rows_service(
@ -534,9 +536,7 @@ def test_local_baserow_list_rows_service_dispatch_data_with_view_and_service_sor
@pytest.mark.django_db
def test_local_baserow_list_rows_service_dispatch_data_with_pagination(
data_fixture,
):
def test_local_baserow_list_rows_service_dispatch_data_with_pagination(data_fixture):
user = data_fixture.create_user()
builder = data_fixture.create_builder_application(user=user)
integration = data_fixture.create_local_baserow_integration(
@ -895,3 +895,230 @@ def test_order_by_is_applied_depending_on_views_sorts(
mock_queryset.order_by.assert_called_once_with(*view_sorts)
else:
mock_queryset.order_by.assert_not_called()
@pytest.mark.django_db
@patch("baserow.contrib.integrations.local_baserow.service_types.CoreHandler")
@pytest.mark.parametrize(
"field_name_checks,expect_only_applied",
(
[
(
{"all": ["field_foo", "field_bar"], "external": None, "internal": None},
True,
),
(
{"all": ["field_foo", "field_bar"], "external": [], "internal": []},
True,
),
(
{
"all": ["field_foo", "field_bar"],
"external": ["foo"],
"internal": ["bar"],
},
True,
),
(
{"all": None, "external": None, "internal": None},
False,
),
(
{"all": None, "external": [], "internal": []},
False,
),
(
{"all": None, "external": ["field_foo"], "internal": ["field_bar"]},
False,
),
]
),
)
def test_only_is_applied_to_queryset_if_field_names(
mock_core_handler, field_name_checks, expect_only_applied, data_fixture
):
"""
Test to ensure that the queryset's only() is applied if
field_names exists.
"""
user = data_fixture.create_user()
page = data_fixture.create_builder_page(user=user)
table, _, _ = data_fixture.build_table(
user=user,
columns=[
("Name", "text"),
("My Color", "text"),
],
rows=[
["BMW", "Blue"],
["Audi", "Orange"],
],
)
view = data_fixture.create_grid_view(user, table=table)
integration = data_fixture.create_local_baserow_integration(
application=page.builder, user=user
)
service = data_fixture.create_local_baserow_list_rows_service(
integration=integration,
view=view,
table=table,
)
service_type = LocalBaserowListRowsUserServiceType()
mock_queryset = MagicMock()
mock_objects = MagicMock()
mock_objects.enhance_by_fields.return_value = mock_queryset
mock_model = MagicMock()
mock_objects = mock_model.objects.all.return_value = mock_objects
mock_table = MagicMock()
mock_table.get_model.return_value = mock_model
resolved_values = {
"table": mock_table,
}
service_type.get_dispatch_search = MagicMock(return_value=None)
service_type.get_dispatch_filters = MagicMock(return_value=mock_queryset)
service_type.get_dispatch_sorts = MagicMock(return_value=(None, mock_queryset))
field_names = {"all": {}, "external": {}, "internal": {}}
for key, value in field_name_checks.items():
field_names[key] = {service.id: value}
dispatch_context = FakeDispatchContext()
with patch(
"baserow.test_utils.pytest_conftest.FakeDispatchContext.public_formula_fields",
None,
):
dispatch_context.public_formula_fields = field_names
service_type.dispatch_data(service, resolved_values, dispatch_context)
if expect_only_applied:
mock_queryset.only.assert_called_once_with(
field_names["all"][service.id][0],
field_names["all"][service.id][1],
)
else:
mock_queryset.only.assert_not_called()
@pytest.mark.parametrize(
"field_names",
[
None,
{"external": {1: ["field_123"]}},
],
)
@patch(
"baserow.contrib.integrations.local_baserow.service_types.get_row_serializer_class"
)
def test_dispatch_transform_passes_field_ids(mock_get_serializer, field_names):
"""
Test the LocalBaserowListRowsUserServiceType::dispatch_transform() method.
Ensure that the field_ids parameter is passed to the serializer class.
"""
mock_serializer_instance = MagicMock()
mock_serializer_instance.data.return_value = "foo"
mock_serializer = MagicMock(return_value=mock_serializer_instance)
mock_get_serializer.return_value = mock_serializer
service_type = LocalBaserowListRowsUserServiceType()
service_type.extract_field_ids = MagicMock(return_value=[])
dispatch_data = {
"baserow_table_model": MagicMock(),
"results": [],
"has_next_page": False,
}
if field_names:
dispatch_data["public_formula_fields"] = field_names
results = service_type.dispatch_transform(dispatch_data)
assert results == {
"has_next_page": False,
"results": mock_serializer_instance.data,
}
mock_get_serializer.assert_called_once_with(
dispatch_data["baserow_table_model"],
RowSerializer,
is_response=True,
field_ids=[],
)
service_type.extract_field_ids.assert_called_once_with(field_names)
@pytest.mark.parametrize(
"path,expected",
[
(
[],
[],
),
(
["foo"],
[],
),
(
["", "foo"],
[],
),
(
["field_123"],
[],
),
(
["", "field_456"],
["field_456"],
),
(
["*", "field_456"],
["field_456"],
),
(
["1", "field_456"],
["field_456"],
),
(
["0", "field_456", "0", "value"],
["field_456"],
),
(
["0", "field_456", "0", "value", "", "", ""],
["field_456"],
),
],
)
def test_extract_properties(path, expected):
"""
Test the extract_properties() method.
Given the input path, ensure the expected field name is returned.
An element that specifies a specific row and field:
['1', 'field_5439']
An element that specifies a field and all rows:
['*', 'field_5439']
A collection element (e.g. Table):
['field_5439']
An element that uses a Link Row Field formula
['0', 'field_5569', '0', 'value']
"""
service_type = LocalBaserowListRowsUserServiceType()
result = service_type.extract_properties(path)
assert result == expected

View file

@ -243,6 +243,7 @@ def test_local_baserow_upsert_row_service_dispatch_data_with_multiple_formulas(
}
dispatch_context = FakeDispatchContext(context=formula_context)
dispatch_values = service_type.resolve_service_formulas(service, dispatch_context)
service_type.dispatch_data(service, dispatch_values, dispatch_context)
@ -351,7 +352,6 @@ def test_local_baserow_upsert_row_service_dispatch_transform(
service.field_mappings.create(field=ingredient, value='get("page_parameter.id")')
dispatch_context = FakeDispatchContext(context={"page_parameter": {"id": 2}})
dispatch_values = service_type.resolve_service_formulas(service, dispatch_context)
dispatch_data = service_type.dispatch_data(
service, dispatch_values, dispatch_context

View file

@ -1,4 +1,4 @@
from unittest.mock import Mock
from unittest.mock import MagicMock, Mock
import pytest
@ -9,6 +9,7 @@ from baserow.contrib.integrations.local_baserow.service_types import (
LocalBaserowListRowsUserServiceType,
LocalBaserowServiceType,
LocalBaserowTableServiceType,
LocalBaserowViewServiceType,
)
from baserow.core.services.exceptions import ServiceImproperlyConfigured
from baserow.test_utils.helpers import setup_interesting_test_table
@ -866,3 +867,43 @@ def test_local_baserow_table_service_type_get_context_data_schema(data_fixture):
},
},
}
@pytest.mark.parametrize(
"field_names,expected",
[
(
None,
None,
),
(
[],
[],
),
(
[""],
[],
),
(
["", "field_123"],
[123],
),
(
["", "field_123", "foo", "field_456"],
[123, 456],
),
],
)
def test_base_service_type_extract_field_ids(field_names, expected):
"""
Test the base implementation of LocalBaserowViewServiceType::extract_field_ids().
Given the input field_names, ensure the expected list of field IDs are returned.
"""
service_type_cls = LocalBaserowViewServiceType
service_type_cls.model_class = MagicMock()
result = service_type_cls().extract_field_ids(field_names)
assert result == expected

View file

@ -1,4 +1,6 @@
from unittest.mock import Mock
from unittest.mock import MagicMock, Mock, PropertyMock
import pytest
from baserow.core.services.registries import ServiceType
@ -15,3 +17,53 @@ def test_service_type_generate_schema():
service_type_cls = ServiceType
service_type_cls.model_class = Mock()
assert service_type_cls().generate_schema(mock_service) is None
@pytest.mark.parametrize(
"field_names,expected_field_names",
[
(
{"external": {}},
[],
),
(
{"external": {100: ["field_123"]}},
["field_123"],
),
],
)
def test_dispatch_passes_field_names(field_names, expected_field_names):
"""
Test the base implementation of dispatch(). Ensure it passes field_names
to dispatch_transform().
"""
service_type_cls = ServiceType
service_type_cls.model_class = MagicMock()
service_type = service_type_cls()
service_type.resolve_service_formulas = MagicMock()
mock_data = MagicMock()
service_type.dispatch_data = MagicMock(return_value=mock_data)
service_type.dispatch_transform = MagicMock()
mock_service = MagicMock()
type(mock_service).id = PropertyMock(return_value=100)
mock_dispatch_context = MagicMock()
mock_dispatch_context.public_formula_fields = field_names
service_type.dispatch(mock_service, mock_dispatch_context)
service_type.dispatch_transform.assert_called_once_with(mock_data)
def test_extract_properties():
"""Test the base implementation of extract_properties()."""
service_type_cls = ServiceType
service_type_cls.model_class = MagicMock()
service_type = service_type_cls()
result = service_type.extract_properties(["foo"])
assert result == []

View file

@ -90,6 +90,7 @@ export default {
{
page: this.page,
data: newDispatchContext,
mode: this.mode,
}
)
}

View file

@ -40,6 +40,7 @@ export class DataSourceDataProviderType extends DataProviderType {
applicationContext
),
dataSources,
mode: applicationContext.mode,
}
)
}
@ -317,6 +318,7 @@ export class CurrentRecordDataProviderType extends DataProviderType {
dataSource,
data: dispatchContext,
range: [0, element.items_per_page],
mode: applicationContext.mode,
}
)
} catch (e) {

View file

@ -115,6 +115,7 @@ export default {
dataSource: this.dataSource,
data: this.dispatchContext,
range,
mode: this.applicationContext.mode,
replace,
})
this.currentOffset += this.element.items_per_page

View file

@ -165,6 +165,7 @@ export default {
{
page: this.page,
data: this.dispatchContext,
mode: this.mode,
}
)
},
@ -181,6 +182,7 @@ export default {
{
page: this.page,
data: newDispatchContext,
mode: this.mode,
}
)
}

View file

@ -259,6 +259,7 @@ export default {
{
page: this.page,
data: newDispatchContext,
mode: this.mode,
}
)
}

View file

@ -24,5 +24,27 @@ export default (client) => {
`builder/domains/published/page/${pageId}/workflow_actions/`
)
},
dispatch(dataSourceId, dispatchContext, { range }) {
// Using POST Http method here is not Restful but it the cleanest way to send
// data with the call without relying on GET parameter and serialization of
// complex object.
const params = {}
if (range) {
params.offset = range[0]
params.count = range[1]
}
return client.post(
`builder/domains/published/data-source/${dataSourceId}/dispatch/`,
dispatchContext,
{ params }
)
},
dispatchAll(pageId, params) {
return client.post(
`builder/domains/published/page/${pageId}/dispatch-data-sources/`,
params
)
},
}
}

View file

@ -1,6 +1,7 @@
import _ from 'lodash'
import Vue from 'vue'
import DataSourceService from '@baserow/modules/builder/services/dataSource'
import PublishedBuilderService from '@baserow/modules/builder/services/publishedBuilder'
const state = {}
@ -40,10 +41,19 @@ const actions = {
/**
* Fetch the content for every data sources of the given page.
*/
async fetchPageDataSourceContent({ commit }, { page, data: queryData }) {
async fetchPageDataSourceContent(
{ commit },
{ page, data: queryData, mode }
) {
commit('SET_LOADING', { page, value: true })
let service = DataSourceService
if (['preview', 'public'].includes(mode)) {
service = PublishedBuilderService
}
try {
const { data } = await DataSourceService(this.app.$client).dispatchAll(
const { data } = await service(this.app.$client).dispatchAll(
page.id,
queryData
)
@ -65,12 +75,17 @@ const actions = {
async fetchPageDataSourceContentById(
{ commit },
{ page, dataSourceId, dispatchContext, replace = false }
{ page, dataSourceId, dispatchContext, mode, replace = false }
) {
commit('SET_LOADING', { page, value: true })
let service = DataSourceService
if (['preview', 'public'].includes(mode)) {
service = PublishedBuilderService
}
try {
const { data } = await DataSourceService(this.app.$client).dispatch(
const { data } = await service(this.app.$client).dispatch(
dataSourceId,
dispatchContext,
{ range: null }
@ -91,12 +106,16 @@ const actions = {
}
},
debouncedFetchPageDataSourceContent({ dispatch }, { page, data: queryData }) {
debouncedFetchPageDataSourceContent(
{ dispatch },
{ page, data: queryData, mode }
) {
clearTimeout(pageFetchTimeout)
pageFetchTimeout = setTimeout(() => {
dispatch('fetchPageDataSourceContent', {
page,
data: queryData,
mode,
})
}, 500)
},

View file

@ -1,4 +1,5 @@
import DataSourceService from '@baserow/modules/builder/services/dataSource'
import PublishedBuilderService from '@baserow/modules/builder/services/publishedBuilder'
import { rangeDiff } from '@baserow/modules/core/utils/range'
const state = {}
@ -63,7 +64,15 @@ const actions = {
*/
async fetchElementContent(
{ commit, getters },
{ page, element, dataSource, range, data: dispatchContext, replace = false }
{
page,
element,
dataSource,
range,
mode,
data: dispatchContext,
replace = false,
}
) {
/**
* If `dataSource` is `null`, this means that we are trying to fetch the content
@ -186,7 +195,12 @@ const actions = {
rangeToFetch = [rangeToFetch[0], rangeToFetch[1] - rangeToFetch[0]]
}
const { data } = await DataSourceService(this.app.$client).dispatch(
let service = DataSourceService
if (['preview', 'public'].includes(mode)) {
service = PublishedBuilderService
}
const { data } = await service(this.app.$client).dispatch(
dataSource.id,
dispatchContext,
{ range: rangeToFetch }

View file

@ -145,6 +145,7 @@ export class RefreshDataSourceWorkflowActionType extends WorkflowActionType {
page: applicationContext.page,
dataSourceId: workflowAction.data_source_id,
dispatchContext,
mode: applicationContext.mode,
replace: true,
}
)