1
0
mirror of https://gitlab.com/bramw/baserow.git synced 2024-11-21 23:37:55 +00:00
bramw_baserow/backend/tests/baserow/ws/test_ws_tasks.py

510 lines
18 KiB
Python
Executable File

import pytest
from asgiref.sync import sync_to_async
from channels.db import database_sync_to_async
from channels.testing import WebsocketCommunicator
from baserow.config.asgi import application
from baserow.ws.tasks import (
broadcast_to_channel_group,
broadcast_to_group,
broadcast_to_groups,
broadcast_to_users,
broadcast_to_users_individual_payloads,
force_disconnect_users,
)
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_force_disconnect_users(data_fixture):
user_1, token_1 = data_fixture.create_user_and_token()
user_2, token_2 = data_fixture.create_user_and_token()
communicator_1 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_1.connect()
response_1 = await communicator_1.receive_json_from()
response_1["web_socket_id"]
communicator_2 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_2}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_2.connect()
response_2 = await communicator_2.receive_json_from()
response_2["web_socket_id"]
await sync_to_async(force_disconnect_users)([user_1.id])
await communicator_2.receive_nothing(0.1)
payload = await communicator_1.receive_output(0.1)
assert payload["type"] == "websocket.send"
assert payload["text"] == '{"type": "force_disconnect"}'
payload = await communicator_1.receive_output(0.1)
assert payload["type"] == "websocket.close"
assert communicator_1.output_queue.qsize() == 0
assert communicator_2.output_queue.qsize() == 0
await communicator_1.disconnect()
await communicator_2.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_broadcast_to_users(data_fixture):
user_1, token_1 = data_fixture.create_user_and_token()
user_2, token_2 = data_fixture.create_user_and_token()
communicator_1 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_1.connect()
response_1 = await communicator_1.receive_json_from()
web_socket_id_1 = response_1["web_socket_id"]
communicator_2 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_2}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_2.connect()
response_2 = await communicator_2.receive_json_from()
response_2["web_socket_id"]
await sync_to_async(broadcast_to_users)([user_1.id], {"message": "test"})
response_1 = await communicator_1.receive_json_from(0.1)
await communicator_2.receive_nothing(0.1)
assert response_1["message"] == "test"
await sync_to_async(broadcast_to_users)(
[user_1.id, user_2.id],
{"message": "test"},
ignore_web_socket_id=web_socket_id_1,
)
await communicator_1.receive_nothing(0.1)
response_2 = await communicator_2.receive_json_from(0.1)
assert response_2["message"] == "test"
assert communicator_1.output_queue.qsize() == 0
assert communicator_2.output_queue.qsize() == 0
await communicator_1.disconnect()
await communicator_2.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_broadcast_to_channel_group(data_fixture):
user_1, token_1 = data_fixture.create_user_and_token()
user_2, token_2 = data_fixture.create_user_and_token()
workspace_1 = data_fixture.create_workspace(users=[user_1, user_2])
database = data_fixture.create_database_application(workspace=workspace_1)
table_1 = data_fixture.create_database_table(user=user_1)
table_2 = data_fixture.create_database_table(user=user_2)
table_3 = data_fixture.create_database_table(database=database)
communicator_1 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_1.connect()
response_1 = await communicator_1.receive_json_from()
web_socket_id_1 = response_1["web_socket_id"]
communicator_2 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_2}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_2.connect()
response_2 = await communicator_2.receive_json_from()
response_2["web_socket_id"]
# We don't expect any communicator to receive anything because they didn't join a
# workspace.
await sync_to_async(broadcast_to_channel_group)(
f"table-{table_1.id}", {"message": "nothing2"}
)
await communicator_1.receive_nothing(0.1)
await communicator_2.receive_nothing(0.1)
# User 1 is not allowed to join table 2 so we don't expect any response.
await communicator_1.send_json_to({"page": "table", "table_id": table_2.id})
await communicator_1.receive_nothing(0.1)
# Because user 1 did not join table 2 we don't expect anything
await sync_to_async(broadcast_to_channel_group)(
f"table-{table_2.id}", {"message": "nothing"}
)
await communicator_1.receive_nothing(0.1)
await communicator_2.receive_nothing(0.1)
# Join the table page.
await communicator_1.send_json_to({"page": "table", "table_id": table_1.id})
response = await communicator_1.receive_json_from(0.1)
assert response["type"] == "page_add"
assert response["page"] == "table"
assert response["parameters"]["table_id"] == table_1.id
await sync_to_async(broadcast_to_channel_group)(
f"table-{table_1.id}", {"message": "test"}
)
response_1 = await communicator_1.receive_json_from(0.1)
assert response_1["message"] == "test"
await communicator_2.receive_nothing(0.1)
await communicator_1.send_json_to({"page": "table", "table_id": table_3.id})
response = await communicator_1.receive_json_from(0.1)
assert response["type"] == "page_add"
assert response["page"] == "table"
assert response["parameters"]["table_id"] == table_3.id
await communicator_2.send_json_to({"page": "table", "table_id": table_3.id})
response = await communicator_2.receive_json_from(0.1)
assert response["type"] == "page_add"
assert response["page"] == "table"
assert response["parameters"]["table_id"] == table_3.id
await sync_to_async(broadcast_to_channel_group)(
f"table-{table_3.id}", {"message": "test2"}
)
response_1 = await communicator_1.receive_json_from(0.1)
assert response_1["message"] == "test2"
response_1 = await communicator_2.receive_json_from(0.1)
assert response_1["message"] == "test2"
await sync_to_async(broadcast_to_channel_group)(
f"table-{table_3.id}", {"message": "test3"}, web_socket_id_1
)
await communicator_1.receive_nothing(0.1)
response_1 = await communicator_2.receive_json_from(0.1)
assert response_1["message"] == "test3"
await sync_to_async(broadcast_to_channel_group)(
f"table-{table_2.id}", {"message": "test4"}
)
await communicator_1.receive_nothing(0.1)
await communicator_2.receive_nothing(0.1)
assert communicator_1.output_queue.qsize() == 0
assert communicator_2.output_queue.qsize() == 0
await communicator_1.disconnect()
await communicator_2.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_broadcast_to_workspace(data_fixture):
user_1, token_1 = data_fixture.create_user_and_token()
user_2, token_2 = data_fixture.create_user_and_token()
user_3, token_3 = data_fixture.create_user_and_token()
user_4, token_4 = data_fixture.create_user_and_token()
workspace_1 = data_fixture.create_workspace(users=[user_1, user_2, user_4])
workspace_2 = data_fixture.create_workspace(users=[user_2, user_3])
communicator_1 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_1.connect()
response_1 = await communicator_1.receive_json_from()
web_socket_id_1 = response_1["web_socket_id"]
communicator_2 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_2}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_2.connect()
response_2 = await communicator_2.receive_json_from()
web_socket_id_2 = response_2["web_socket_id"]
communicator_3 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_3}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_3.connect()
await communicator_3.receive_json_from()
await database_sync_to_async(broadcast_to_group)(
workspace_1.id, {"message": "test"}
)
response_1 = await communicator_1.receive_json_from(0.1)
response_2 = await communicator_2.receive_json_from(0.1)
await communicator_3.receive_nothing(0.1)
assert response_1["message"] == "test"
assert response_2["message"] == "test"
await database_sync_to_async(broadcast_to_group)(
workspace_1.id, {"message": "test2"}, ignore_web_socket_id=web_socket_id_1
)
await communicator_1.receive_nothing(0.1)
response_2 = await communicator_2.receive_json_from(0.1)
await communicator_3.receive_nothing(0.1)
assert response_2["message"] == "test2"
await database_sync_to_async(broadcast_to_group)(
workspace_2.id, {"message": "test3"}, ignore_web_socket_id=web_socket_id_2
)
await communicator_1.receive_nothing(0.1)
await communicator_2.receive_nothing(0.1)
await communicator_3.receive_json_from(0.1)
assert communicator_1.output_queue.qsize() == 0
assert communicator_2.output_queue.qsize() == 0
assert communicator_3.output_queue.qsize() == 0
await communicator_1.disconnect()
await communicator_2.disconnect()
await communicator_3.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_broadcast_to_workspaces(data_fixture):
user_1, token_1 = data_fixture.create_user_and_token()
user_2, token_2 = data_fixture.create_user_and_token()
user_3, token_3 = data_fixture.create_user_and_token()
user_4, token_4 = data_fixture.create_user_and_token()
workspace_1 = data_fixture.create_workspace(users=[user_1, user_2, user_4])
workspace_2 = data_fixture.create_workspace(users=[user_2, user_3])
communicator_1 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_1.connect()
response_1 = await communicator_1.receive_json_from()
web_socket_id_1 = response_1["web_socket_id"]
communicator_2 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_2}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_2.connect()
response_2 = await communicator_2.receive_json_from()
web_socket_id_2 = response_2["web_socket_id"]
communicator_3 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_3}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_3.connect()
await communicator_3.receive_json_from()
await database_sync_to_async(broadcast_to_groups)(
[workspace_1.id], {"message": "test"}
)
response_1 = await communicator_1.receive_json_from(0.1)
response_2 = await communicator_2.receive_json_from(0.1)
await communicator_3.receive_nothing(0.1)
assert response_1["message"] == "test"
assert response_2["message"] == "test"
await database_sync_to_async(broadcast_to_groups)(
[workspace_1.id], {"message": "test2"}, ignore_web_socket_id=web_socket_id_1
)
await communicator_1.receive_nothing(0.1)
response_2 = await communicator_2.receive_json_from(0.1)
await communicator_3.receive_nothing(0.1)
assert response_2["message"] == "test2"
communicator_4 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_4}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_4.connect()
response_4 = await communicator_4.receive_json_from()
web_socket_id_4 = response_4["web_socket_id"]
await database_sync_to_async(broadcast_to_groups)(
[workspace_1.id, workspace_2.id],
{"message": "test3"},
ignore_web_socket_id=web_socket_id_4,
)
await communicator_1.receive_json_from(0.1)
await communicator_2.receive_json_from(0.1)
await communicator_3.receive_json_from(0.1)
await communicator_4.receive_nothing(0.1)
assert communicator_1.output_queue.qsize() == 0
assert communicator_2.output_queue.qsize() == 0
assert communicator_3.output_queue.qsize() == 0
assert communicator_4.output_queue.qsize() == 0
await communicator_1.disconnect()
await communicator_2.disconnect()
await communicator_3.disconnect()
await communicator_4.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_can_broadcast_to_every_single_user(data_fixture):
user_1, token_1 = data_fixture.create_user_and_token()
user_2, token_2 = data_fixture.create_user_and_token()
communicator_1 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_1.connect()
response_1 = await communicator_1.receive_json_from()
communicator_2 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_2}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_2.connect()
response_2 = await communicator_2.receive_json_from()
await sync_to_async(broadcast_to_users)(
[], {"message": "test"}, send_to_all_users=True
)
response_1 = await communicator_1.receive_json_from(0.1)
await communicator_2.receive_nothing(0.1)
assert response_1["message"] == "test"
await communicator_1.receive_nothing(0.1)
response_2 = await communicator_2.receive_json_from(0.1)
assert response_2["message"] == "test"
assert communicator_1.output_queue.qsize() == 0
assert communicator_2.output_queue.qsize() == 0
await communicator_1.disconnect()
await communicator_2.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_can_still_ignore_when_sending_to_all_users(data_fixture):
user_1, token_1 = data_fixture.create_user_and_token()
user_2, token_2 = data_fixture.create_user_and_token()
communicator_1 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_1.connect()
response_1 = await communicator_1.receive_json_from()
websocket_id_1 = response_1["web_socket_id"]
communicator_2 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_2}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_2.connect()
response_2 = await communicator_2.receive_json_from()
await sync_to_async(broadcast_to_users)(
[],
{"message": "test"},
ignore_web_socket_id=websocket_id_1,
send_to_all_users=True,
)
await communicator_1.receive_nothing(0.1)
response_2 = await communicator_2.receive_json_from(0.1)
assert response_2["message"] == "test"
assert communicator_1.output_queue.qsize() == 0
assert communicator_2.output_queue.qsize() == 0
await communicator_1.disconnect()
await communicator_2.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
@pytest.mark.websockets
async def test_broadcast_to_users_individual_payloads(data_fixture):
user_1, token_1 = data_fixture.create_user_and_token()
user_2, token_2 = data_fixture.create_user_and_token()
communicator_1 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_1}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_1.connect()
response_1 = await communicator_1.receive_json_from()
web_socket_id_1 = response_1["web_socket_id"]
communicator_2 = WebsocketCommunicator(
application,
f"ws/core/?jwt_token={token_2}",
headers=[(b"origin", b"http://localhost")],
)
await communicator_2.connect()
response_2 = await communicator_2.receive_json_from()
# Assert each user gets a unique message
await sync_to_async(broadcast_to_users_individual_payloads)(
{str(user_1.id): "payload1", str(user_2.id): "payload2"}
)
response_1 = await communicator_1.receive_json_from(0.1)
assert response_1 == "payload1"
response_2 = await communicator_2.receive_json_from(0.1)
assert response_2 == "payload2"
# Assert we can ignore a websocket for one user
await sync_to_async(broadcast_to_users_individual_payloads)(
{str(user_1.id): "payload1", str(user_2.id): "payload2"},
ignore_web_socket_id=web_socket_id_1,
)
await communicator_1.receive_nothing(0.1)
response_2 = await communicator_2.receive_json_from(0.1)
assert response_2 == "payload2"
# Assert not including a user id wont send them anything
await sync_to_async(broadcast_to_users_individual_payloads)(
{str(user_2.id): "payload2"},
)
await communicator_1.receive_nothing(0.1)
response_2 = await communicator_2.receive_json_from(0.1)
assert response_2 == "payload2"
assert communicator_1.output_queue.qsize() == 0
assert communicator_2.output_queue.qsize() == 0
await communicator_1.disconnect()
await communicator_2.disconnect()