1
0
Fork 0
mirror of https://gitlab.com/bramw/baserow.git synced 2025-04-15 09:34:13 +00:00

Fixed bug where table level role was ignored if database one was set

This commit is contained in:
Bram Wiepjes 2024-02-02 12:55:28 +00:00 committed by Jérémie Pardou
parent a5936e57d3
commit 2b4989a088
6 changed files with 251 additions and 39 deletions
backend/src/baserow/core
changelog/entries/unreleased/bug
enterprise
backend
src/baserow_enterprise/role
tests/baserow_enterprise_tests/role
web-frontend/modules/baserow_enterprise
web-frontend/modules/core/pages

View file

@ -854,7 +854,7 @@ class ObjectScopeType(Instance, ModelInstanceMixin):
# We have all the objects in the queryset, but now we want to sort them
# into buckets per original scope they are a child of.
for scope in scopes:
for scope in parent_scopes:
objects_per_scope[scope] = set()
scope_type = object_scope_type_registry.get_by_model(scope)
for obj in query_result:

View file

@ -0,0 +1,7 @@
{
"type": "bug",
"message": "Fix bug where combined database and table level roles would not be respected when listing permissions.",
"issue_number": null,
"bullet_points": [],
"created_at": "2024-01-31"
}

View file

@ -107,6 +107,7 @@ class RolePermissionManagerType(PermissionManagerType):
context,
scope_includes_cache,
)
computed_roles = computed_role_cache[cache_key]
if any(
@ -154,6 +155,7 @@ class RolePermissionManagerType(PermissionManagerType):
]
)
exceptions = set()
inclusions = set()
for scope, roles in roles_by_scope[1:]:
allowed_operations = set()
@ -175,13 +177,17 @@ class RolePermissionManagerType(PermissionManagerType):
if operation_type.type not in allowed_operations:
if default:
exceptions.add(context_exception)
inclusions.discard(context_exception)
else:
inclusions.add(context_exception)
exceptions.discard(context_exception)
else:
if default:
inclusions.add(context_exception)
exceptions.discard(context_exception)
else:
exceptions.add(context_exception)
inclusions.discard(context_exception)
# Second case
# The scope of the role assignment is included by the role of the operation
@ -203,15 +209,20 @@ class RolePermissionManagerType(PermissionManagerType):
)
if default:
if found_object in exceptions:
exceptions.discard(found_object)
exceptions.discard(found_object)
inclusions.add(found_object)
else:
exceptions.add(found_object)
inclusions.discard(found_object)
return default, exceptions
return default, exceptions, inclusions
def get_permissions_object(
self, actor: AbstractUser, workspace: Optional[Workspace] = None
self,
actor: AbstractUser,
workspace: Optional[Workspace] = None,
for_operation_types=None,
use_object_scope=False,
) -> List[Dict[str, OperationPermissionContent]]:
"""
Returns the permission object for this permission manager. The permission object
@ -236,41 +247,63 @@ class RolePermissionManagerType(PermissionManagerType):
policy_per_operation = defaultdict(lambda: {"default": False, "exceptions": []})
exceptions_with_mixed_types_per_scope = defaultdict(set)
scope_map_with_mixed_types_per_scope = defaultdict(set)
all_operations = for_operation_types or operation_type_registry.get_all()
# First, for each operation we want the default policy and exceptions
for operation_type in operation_type_registry.get_all():
default, exceptions = self.get_operation_policy(
roles_by_scope, operation_type
for operation_type in all_operations:
default, exceptions, inclusions = self.get_operation_policy(
roles_by_scope, operation_type, use_object_scope
)
if default or exceptions:
policy_per_operation[operation_type.type]["default"] = default
policy_per_operation[operation_type.type]["exceptions"] = exceptions
base_scope_type = (
operation_type.object_scope
if use_object_scope
else operation_type.context_scope
)
if exceptions:
# We store the exceptions by scope to get all objects at once later
exceptions_with_mixed_types_per_scope[
operation_type.context_scope
] |= exceptions
policy_per_operation[operation_type.type]["default"] = default
policy_per_operation[operation_type.type]["exceptions"] = exceptions
policy_per_operation[operation_type.type]["inclusions"] = inclusions
# Get all objects for all exceptions at once to improve perfs
# We store the exceptions/inclusions by scope to get all
# objects at once later
scope_map_with_mixed_types_per_scope[base_scope_type] |= (
exceptions | inclusions
)
# Get all objects for all exceptions/inclusions at once to improve perfs
exception_ids_per_scope = {}
for object_scope, exceptions in exceptions_with_mixed_types_per_scope.items():
for object_scope, exceptions in scope_map_with_mixed_types_per_scope.items():
exception_ids_per_scope[object_scope] = {
scope: {o.id for o in exc}
for scope, exc in object_scope.get_objects_in_scopes(exceptions).items()
}
# Dispatch actual context object ids for each exceptions scopes
# Dispatch actual context object ids for each exceptions/inclusions scopes
policy_per_operation_with_exception_ids = {}
for operation_type in operation_type_registry.get_all():
for operation_type in all_operations:
inclusions = policy_per_operation[operation_type.type]["inclusions"]
exclusions = policy_per_operation[operation_type.type]["exceptions"]
ordered_scope = sorted(
inclusions | exclusions,
key=lambda s: object_scope_type_registry.get_by_model(s).level,
)
base_scope_type = (
operation_type.object_scope
if use_object_scope
else operation_type.context_scope
)
# Gather all ids for all scopes of the exception list of this operation
exceptions_ids = set()
for scope in policy_per_operation[operation_type.type]["exceptions"]:
exceptions_ids |= exception_ids_per_scope[operation_type.context_scope][
scope
]
for scope in ordered_scope:
if scope in exclusions:
exceptions_ids |= exception_ids_per_scope[base_scope_type][scope]
if scope in inclusions:
exceptions_ids -= exception_ids_per_scope[base_scope_type][scope]
policy_per_operation_with_exception_ids[operation_type.type] = {
"default": policy_per_operation[operation_type.type]["default"],
@ -288,25 +321,25 @@ class RolePermissionManagerType(PermissionManagerType):
if workspace is None or not self.is_enabled(workspace):
return queryset
# Get all role assignments for this user into this workspace
roles_by_scope = RoleAssignmentHandler().get_roles_per_scope(workspace, actor)
operation_type = operation_type_registry.get(operation_name)
default, exceptions = self.get_operation_policy(
roles_by_scope, operation_type, True
permission_obj = self.get_permissions_object(
actor,
workspace,
for_operation_types=[operation_type],
use_object_scope=True,
)
exceptions_filter = operation_type.object_scope.get_filter_for_scopes(
exceptions
)
default = permission_obj[operation_type.type]["default"]
exceptions = permission_obj[operation_type.type]["exceptions"]
# Finally filter the queryset with the exception filter.
if default:
if exceptions:
queryset = queryset.exclude(exceptions_filter)
queryset = queryset.exclude(id__in=exceptions)
else:
if exceptions:
queryset = queryset.filter(exceptions_filter)
queryset = queryset.filter(id__in=exceptions)
else:
queryset = queryset.none()

View file

@ -1008,6 +1008,150 @@ def test_get_permissions_object(data_fixture, enterprise_data_fixture, synced_ro
assert perms[UpdateDatabaseRowOperationType.type]["exceptions"] == [table_1_1.id]
@pytest.mark.django_db(transaction=True)
@override_settings(
PERMISSION_MANAGERS=["core", "staff", "member", "role", "basic"],
)
def test_get_permissions_object_with_database_and_table_level_permissions(
data_fixture, enterprise_data_fixture, synced_roles
):
enterprise_data_fixture.enable_enterprise()
admin = data_fixture.create_user(email="admin@test.net")
editor = data_fixture.create_user(email="editor@test.net")
workspace_1 = data_fixture.create_workspace(
user=admin,
members=[editor],
)
data_fixture.create_database_application()
data_fixture.create_database_application()
data_fixture.create_database_application()
data_fixture.create_database_application()
data_fixture.create_database_application()
database_1 = data_fixture.create_database_application(
workspace=workspace_1, order=1
)
database_2 = data_fixture.create_database_application(
workspace=workspace_1, order=2
)
table_0, _, _ = data_fixture.build_table(
columns=[("number", "number"), ("text", "text")],
rows=[[1, "test"]],
database=database_1,
order=1,
)
table_1, _, _ = data_fixture.build_table(
columns=[("number", "number"), ("text", "text")],
rows=[[1, "test"]],
database=database_1,
order=1,
)
table_2, _, _ = data_fixture.build_table(
columns=[("number", "text"), ("text", "text")],
rows=[[1, "test"]],
database=database_2,
order=2,
)
table_3, _, _ = data_fixture.build_table(
columns=[("number", "text"), ("text", "text")],
rows=[[1, "test"]],
database=database_2,
order=3,
)
role_editor = Role.objects.get(uid="EDITOR")
role_no_access = Role.objects.get(uid="NO_ACCESS")
perm_manager = RolePermissionManagerType()
# Case 1 - default policy is False, we add the permission to db then we remove
# it from the table
# Workspace level assignments
RoleAssignmentHandler().assign_role(editor, workspace_1, role=role_no_access)
# Application level assignments
RoleAssignmentHandler().assign_role(
editor, workspace_1, role=role_editor, scope=database_1.application_ptr
)
# Table level assignments
RoleAssignmentHandler().assign_role(
editor, workspace_1, role=role_no_access, scope=table_1
)
perms = perm_manager.get_permissions_object(editor, workspace=workspace_1)
assert perms[CreateRowDatabaseTableOperationType.type]["default"] is False
# The user has editor permissions to `database_1` but no_role at `table_1`,
# so we should see only the `table_0` as an exception to the default (False)
# policy.
assert perms[CreateRowDatabaseTableOperationType.type]["exceptions"] == [
table_0.id,
]
# Case 2 - default policy is True and we remove then add the permission
# Now with the opposite default policy
RoleAssignmentHandler().assign_role(editor, workspace_1, role=role_editor)
# Application level assignments
RoleAssignmentHandler().assign_role(
editor, workspace_1, role=role_no_access, scope=database_1.application_ptr
)
# Table level assignments
RoleAssignmentHandler().assign_role(
editor, workspace_1, role=role_editor, scope=table_1
)
perms = perm_manager.get_permissions_object(editor, workspace=workspace_1)
assert perms[CreateRowDatabaseTableOperationType.type]["default"] is True
# The user has editor permissions to `table_1` but no_role at `database_1`,
# so we should see only the `table_0` as an exception to the default
# (True) policy.
assert perms[CreateRowDatabaseTableOperationType.type]["exceptions"] == [
table_0.id,
]
# case 3 - default is False and we add the permission on another database
# Workspace level assignments
RoleAssignmentHandler().assign_role(editor, workspace_1, role=role_no_access)
# Table level assignments
RoleAssignmentHandler().assign_role(
editor, workspace_1, role=role_editor, scope=table_1
)
# Database level assignments
RoleAssignmentHandler().assign_role(
editor, workspace_1, role=role_editor, scope=database_2
)
perms = perm_manager.get_permissions_object(editor, workspace=workspace_1)
assert perms[CreateRowDatabaseTableOperationType.type]["default"] is False
# The user has editor permissions to `table_1` and `database_2`, so we expect
# table 1, table 2 and table 3 to be in there because table 2,3 belong
# in database 2.
assert sorted(
perms[CreateRowDatabaseTableOperationType.type]["exceptions"]
) == sorted(
[
table_1.id,
table_2.id,
table_3.id,
]
)
@pytest.mark.django_db(transaction=True)
def test_get_permissions_object_with_teams(
data_fixture, enterprise_data_fixture, synced_roles
@ -1177,6 +1321,7 @@ def test_filter_queryset(data_fixture, enterprise_data_fixture):
assert list(builder_table_queryset) == [table_2_1]
viewer_role = Role.objects.get(uid="VIEWER")
editor_role = Role.objects.get(uid="EDITOR")
role_no_access = Role.objects.get(uid="NO_ACCESS")
# In this scenario the user is:
@ -1215,6 +1360,7 @@ def test_filter_queryset(data_fixture, enterprise_data_fixture):
# - no_access at application_2 level
# - builder at at table_2_1 level
# -> should still be able to see application_2 and application_3
# -> and table_2_1, table_1_x
RoleAssignmentHandler().assign_role(
builder, workspace_2, role=role_no_access, scope=database_2.application_ptr
)
@ -1231,6 +1377,32 @@ def test_filter_queryset(data_fixture, enterprise_data_fixture):
database_3.application_ptr,
]
builder_table_queryset = perm_manager.filter_queryset(
builder,
ListTablesDatabaseTableOperationType.type,
table_2_queryset,
workspace=workspace_2,
)
assert list(builder_table_queryset) == [table_2_1]
# In this scenario the user is:
# - Viewer at workspace_2 level
# - no_access at application_2 level
# -> should be able to see application_3 only
RoleAssignmentHandler().assign_role(builder, workspace_2, scope=table_2_1)
builder_application_queryset = perm_manager.filter_queryset(
builder,
ListApplicationsWorkspaceOperationType.type,
application_2_queryset,
workspace=workspace_2,
)
assert list(builder_application_queryset) == [
database_3.application_ptr,
]
# In this scenario the user is:
# - Viewer at workspace_2 level
# - no_access at application_2 level

View file

@ -12,7 +12,7 @@ export const registerRealtimeEvents = (realtime) => {
workspaceId
)
) {
store.dispatch('notification/setPermissionsUpdated', true)
store.dispatch('toast/setPermissionsUpdated', true)
}
}
)

View file

@ -173,7 +173,7 @@
value: {{ checkbox }}
<br />
<br />
<Checkbox class="margin-bottom-1" v-model="checkbox"></Checkbox>
<Checkbox v-model="checkbox" class="margin-bottom-1"></Checkbox>
<Checkbox v-model="checkbox">With text</Checkbox>
</div>
</div>
@ -285,19 +285,19 @@
class="margin-bottom-1"
small
></SwitchInput>
<SwitchInput class="margin-bottom-1" v-model="switchValue" large>
<SwitchInput v-model="switchValue" class="margin-bottom-1" large>
Small with text
</SwitchInput>
<SwitchInput
class="margin-bottom-1"
v-model="switchUnknown"
class="margin-bottom-1"
disabled
>
Disabled
</SwitchInput>
<SwitchInput
class="margin-bottom-1"
v-model="switchUnknown"
class="margin-bottom-1"
disabled
small
>