1
0
mirror of https://gitlab.com/bramw/baserow.git synced 2024-11-21 23:37:55 +00:00
bramw_baserow/backend/tests/baserow/ws/test_ws_consumers.py
2023-11-13 16:19:30 +00:00

460 lines
15 KiB
Python

from unittest.mock import AsyncMock, Mock
import pytest
from channels.testing import WebsocketCommunicator
from baserow.config.asgi import application
from baserow.ws.auth import ANONYMOUS_USER_TOKEN
from baserow.ws.consumers import CoreConsumer, PageContext, PageScope, SubscribedPages
from baserow.ws.registries import PageType, page_registry
class AcceptingTestPageType(PageType):
type = "test_page_type"
parameters = ["test_param"]
def can_add(self, user, web_socket_id, test_param, **kwargs):
return True
def get_group_name(self, test_param, **kwargs):
return f"test-page-{test_param}"
def get_permission_channel_group_name(self, test_param, **kwargs):
return f"permissions-test-page-{test_param}"
class NotAcceptingTestPageType(AcceptingTestPageType):
type = "test_page_type_not_accepting"
def can_add(self, user, web_socket_id, test_param, **kwargs):
return False
class DifferentPermissionsGroupTestPageType(PageType):
type = "diff_perm_page_type"
parameters = ["test_param"]
def get_group_name(self, test_param, **kwargs):
return f"different-perm-group-{test_param}"
def get_permission_channel_group_name(self, test_param, **kwargs):
return f"permissions-different-perm-group-{test_param}"
@pytest.fixture
def test_page_types():
page_types = (
AcceptingTestPageType(),
NotAcceptingTestPageType(),
DifferentPermissionsGroupTestPageType(),
)
page_registry.register(page_types[0])
page_registry.register(page_types[1])
page_registry.register(page_types[2])
yield page_types
page_registry.unregister(AcceptingTestPageType.type)
page_registry.unregister(NotAcceptingTestPageType.type)
page_registry.unregister(DifferentPermissionsGroupTestPageType.type)
# Core consumer
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_core_consumer_connect_not_authenticated(data_fixture):
communicator = WebsocketCommunicator(
application,
f"ws/core/?jwt_token=",
headers=[(b"origin", b"http://localhost")],
)
connected, subprotocol = await communicator.connect()
assert connected is True
response = await communicator.receive_json_from()
assert response["type"] == "authentication"
assert response["success"] is False
assert response["web_socket_id"] is None
await communicator.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_core_consumer_connect_authenticated(data_fixture):
user_1, token_1 = data_fixture.create_user_and_token()
communicator = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
connected, subprotocol = await communicator.connect()
assert connected is True
response = await communicator.receive_json_from()
assert response["type"] == "authentication"
assert response["success"] is True
assert response["web_socket_id"] is not None
await communicator.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_core_consumer_connect_authenticated_anonymous(data_fixture):
user_1, token_1 = data_fixture.create_user_and_token()
communicator = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={ANONYMOUS_USER_TOKEN}",
headers=[(b"origin", b"http://localhost")],
)
connected, subprotocol = await communicator.connect()
assert connected is True
response = await communicator.receive_json_from()
assert response["type"] == "authentication"
assert response["success"] is True
assert response["web_socket_id"] is not None
await communicator.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_core_consumer_add_to_page_success(data_fixture, test_page_types):
user_1, token_1 = data_fixture.create_user_and_token()
communicator = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator.connect()
await communicator.receive_json_from()
await communicator.send_json_to({"page": "test_page_type", "test_param": 1})
response = await communicator.receive_json_from(timeout=0.1)
assert response["type"] == "page_add"
assert response["page"] == "test_page_type"
assert response["parameters"]["test_param"] == 1
# Adding again will have the same behavior
# but the page will still be subscribed only once
await communicator.send_json_to({"page": "test_page_type", "test_param": 1})
response = await communicator.receive_json_from(timeout=0.1)
assert response["type"] == "page_add"
assert response["page"] == "test_page_type"
assert response["parameters"]["test_param"] == 1
await communicator.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_core_consumer_add_page_doesnt_exist(data_fixture):
# When trying to subscribe to not existing page
# we do not expect the confirmation
communicator = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={ANONYMOUS_USER_TOKEN}",
headers=[(b"origin", b"http://localhost")],
)
await communicator.connect()
await communicator.receive_json_from()
await communicator.send_json_to({"page": "doesnt_exist", "test_param": 1})
assert communicator.output_queue.qsize() == 0
await communicator.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_core_consumer_add_to_page_failure(data_fixture, test_page_types):
user_1, token_1 = data_fixture.create_user_and_token()
communicator = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator.connect()
await communicator.receive_json_from()
# Page that will return False from can_add()
# won't send the confirmation
await communicator.send_json_to(
{"page": "test_page_type_not_accepting", "test_param": 1}
)
assert communicator.output_queue.qsize() == 0
await communicator.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_core_consumer_remove_page_success(data_fixture, test_page_types):
user_1, token_1 = data_fixture.create_user_and_token()
communicator = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator.connect()
await communicator.receive_json_from()
await communicator.send_json_to({"page": "test_page_type", "test_param": 1})
response = await communicator.receive_json_from(timeout=0.1)
await communicator.send_json_to({"remove_page": "test_page_type", "test_param": 1})
response = await communicator.receive_json_from(timeout=0.1)
assert response["type"] == "page_discard"
assert response["page"] == "test_page_type"
assert response["parameters"]["test_param"] == 1
# Removing a page will send a confirmation again
# even if it is unsubscribed already
await communicator.send_json_to({"remove_page": "test_page_type", "test_param": 1})
response = await communicator.receive_json_from(timeout=0.1)
assert response["type"] == "page_discard"
assert response["page"] == "test_page_type"
assert response["parameters"]["test_param"] == 1
await communicator.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_core_consumer_remove_page_doesnt_exist(data_fixture):
# When trying to unsubscribe from not existing page
# we do not expect the confirmation
communicator = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={ANONYMOUS_USER_TOKEN}",
headers=[(b"origin", b"http://localhost")],
)
await communicator.connect()
await communicator.receive_json_from()
await communicator.send_json_to({"page_remove": "doesnt_exist", "test_param": 1})
assert communicator.output_queue.qsize() == 0
await communicator.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_get_page_context(data_fixture, test_page_types):
page_type = test_page_types[0]
user_1, token_1 = data_fixture.create_user_and_token()
consumer = CoreConsumer()
consumer.scope = {}
consumer.scope["user"] = user_1
consumer.scope["web_socket_id"] = 234
content = {
"page": page_type.type,
"test_param": 2,
}
result = await consumer._get_page_context(content, "page")
assert result == PageContext(
resolved_page_type=page_type,
page_scope=PageScope(
page_type=page_type.type, page_parameters={"test_param": 2}
),
user=user_1,
web_socket_id=234,
)
# Not existing page type
content = {
"page": "doesnt_exist",
"test_param": 2,
}
result = await consumer._get_page_context(content, "page")
assert result is None
# Missing user
consumer.scope["user"] = None
content = {
"page": page_type.type,
"test_param": 2,
}
result = await consumer._get_page_context(content, "page")
assert result is None
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_core_consumer_remove_all_page_scopes(data_fixture, test_page_types):
user_1, token_1 = data_fixture.create_user_and_token()
scope_1 = PageScope("test_page_type", {"test_param": 1})
scope_2 = PageScope("test_page_type", {"test_param": 2})
pages = SubscribedPages()
pages.add(scope_1)
pages.add(scope_2)
consumer = CoreConsumer()
consumer.scope = {"pages": pages, "user": user_1, "web_socket_id": 123}
consumer.channel_name = "test_channel_name"
consumer.channel_layer = AsyncMock()
async def base_send(message):
pass
consumer.base_send = base_send
assert len(consumer.scope["pages"]) == 2
await consumer._remove_all_page_scopes()
assert len(consumer.scope["pages"]) == 0
# SubscribedPages
@pytest.mark.websockets
def test_subscribed_pages_adds_page_without_duplicates():
scope_1 = PageScope("test_page_type", {"test_param": 1})
scope_2 = PageScope("test_page_type", {"test_param": 2})
scope_3 = PageScope("test_page_type_not_accepting", {"test_param": 1})
scope_4 = PageScope("test_page_type", {"test_param": 1})
pages = SubscribedPages()
pages.add(scope_1)
pages.add(scope_2)
pages.add(scope_3)
pages.add(scope_4)
assert len(pages) == 3
@pytest.mark.websockets
def test_subscribed_pages_removes_pages_by_parameters():
scope_1 = PageScope("test_page_type", {"test_param": 1})
scope_2 = PageScope("test_page_type", {"test_param": 2})
scope_3 = PageScope("test_page_type", {"test_param": 3})
pages = SubscribedPages()
pages.add(scope_1)
pages.add(scope_2)
pages.add(scope_3)
assert len(pages) == 3
pages.remove(scope_2)
assert scope_1 in pages.pages
assert scope_2 not in pages.pages
assert scope_3 in pages.pages
@pytest.mark.websockets
def test_subscribed_pages_removes_pages_without_error():
scope_1 = PageScope("test_page_type", {"test_param": 1})
scope_2 = PageScope("test_page_type", {"test_param": 2})
pages = SubscribedPages()
pages.add(scope_1)
pages.add(scope_2)
assert len(pages) == 2
# should not throw error
pages.remove(scope_1)
pages.remove(scope_1)
pages.remove(scope_2)
pages.remove(scope_2)
assert len(pages) == 0
@pytest.mark.websockets
def test_subscribed_pages_is_page_in_permission_group(test_page_types):
scope_1 = PageScope("test_page_type", {"test_param": 1})
pages = SubscribedPages()
assert pages.is_page_in_permission_group(scope_1, "permissions-test-page-1") is True
assert (
pages.is_page_in_permission_group(scope_1, "permissions-different-perm-group-1")
is False
)
@pytest.mark.websockets
def test_subscribed_pages_has_pages_with_permission_group(test_page_types):
scope_1 = PageScope("test_page_type", {"test_param": 1})
pages = SubscribedPages()
pages.add(scope_1)
assert pages.has_pages_with_permission_group("permissions-test-page-1") is True
assert (
pages.has_pages_with_permission_group("permissions-different-perm-group-1")
is False
)
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
@pytest.mark.websockets
@pytest.mark.parametrize(
"ignore_web_socket_id, web_socket_id, user, exclude_user_ids, expect_call",
[
# Sending the message is expected:
# ...because `ignore_web_socket_id` is empty:
(None, "some_web_socket_id", Mock(id=1), [], True),
# ...or when socket_ids are not matching and user ID is not exluded:
("some_web_socket_id", "another_web_socket_id", Mock(id=1), [], True),
# ...or when `ignore_web_socket_id` is empty and the user is not excluded:
(None, "some_web_socket_id", Mock(id=1), [2], True),
# Sending the message is NOT expected:
# ...because web socket IDs are matching and should be ignored:
("some_web_socket_id", "some_web_socket_id", Mock(id=1), [], False),
# ...or when user_id is exluded:
(None, "some_web_socket_id", Mock(id=1), [1], False),
# ...or when user ID is excluded (within multiple IDs):
(None, "some_web_socket_id", Mock(id=1), [1, 2], False),
# ...or when user ID is excluded (within multiple IDs) and web socket
# IDs are not matching:
("some_web_socket_id", "another_web_socket_id", Mock(id=1), [1, 2], False),
],
)
async def test_core_consumer_broadcast_to_group(
data_fixture,
test_page_types,
ignore_web_socket_id,
web_socket_id,
user,
exclude_user_ids,
expect_call,
):
consumer = CoreConsumer()
# Mock the required attributes and methods
consumer.scope = {
"web_socket_id": web_socket_id,
"user": user,
}
event = {
"payload": {
"message": "test message",
},
"ignore_web_socket_id": ignore_web_socket_id,
"exclude_user_ids": exclude_user_ids,
}
mock_send_json = AsyncMock()
consumer.send_json = mock_send_json
await consumer.broadcast_to_group(event)
if expect_call:
mock_send_json.assert_called_once_with(event["payload"])
else:
mock_send_json.assert_not_called()