1
0
Fork 0
mirror of https://gitlab.com/bramw/baserow.git synced 2025-04-15 09:34:13 +00:00

Merge branch '3173-management-command-for-adding-removing-public-keys-for-trusted-soruces' into 'develop'

Resolve "Management command for adding/removing public keys for trusted sources"

Closes 

See merge request 
This commit is contained in:
Przemyslaw Kukulski 2024-11-12 16:00:11 +00:00
commit 861a2729e5
16 changed files with 277 additions and 161 deletions

View file

@ -52,7 +52,6 @@ from baserow.core.exceptions import (
WorkspaceDoesNotExist,
WorkspaceUserIsLastAdmin,
)
from baserow.core.feature_flags import FF_EXPORT_WORKSPACE, feature_flag_is_enabled
from baserow.core.handler import CoreHandler
from baserow.core.import_export.exceptions import (
ImportExportResourceDoesNotExist,
@ -516,8 +515,6 @@ class ListExportWorkspaceApplicationsView(APIView):
Lists all available exports created for a given workspace.
"""
feature_flag_is_enabled(FF_EXPORT_WORKSPACE, raise_if_disabled=True)
exports = ImportExportHandler().list_exports(request.user, workspace_id)
return Response(
ListExportWorkspaceApplicationsSerializer({"results": exports}).data
@ -584,8 +581,6 @@ class AsyncExportWorkspaceApplicationsView(APIView):
the workspace are exported.
"""
feature_flag_is_enabled(FF_EXPORT_WORKSPACE, raise_if_disabled=True)
job = JobHandler().create_and_start_job(
request.user,
ExportApplicationsJobType.type,
@ -645,8 +640,6 @@ class ImportExportResourceUploadFileView(APIView):
}
)
def post(self, request, workspace_id: int) -> Response:
feature_flag_is_enabled(FF_EXPORT_WORKSPACE, raise_if_disabled=True)
handler = ImportExportHandler()
handler.get_workspace_or_raise(user=request.user, workspace_id=workspace_id)
@ -692,8 +685,6 @@ class ImportExportResourceView(APIView):
}
)
def delete(self, request, workspace_id, resource_id: str) -> Response:
feature_flag_is_enabled(FF_EXPORT_WORKSPACE, raise_if_disabled=True)
handler = ImportExportHandler()
handler.get_workspace_or_raise(user=request.user, workspace_id=workspace_id)
@ -749,8 +740,6 @@ class AsyncImportApplicationsView(APIView):
ImportApplicationsJobType().request_serializer_class, return_validated=True
)
def post(self, request, data: Dict, workspace_id: int) -> Response:
feature_flag_is_enabled(FF_EXPORT_WORKSPACE, raise_if_disabled=True)
job = JobHandler().create_and_start_job(
request.user,
ImportApplicationsJobType.type,

View file

@ -2,7 +2,6 @@ from django.conf import settings
from baserow.core.exceptions import FeatureDisabledException
FF_EXPORT_WORKSPACE = "export_workspace"
FF_DASHBOARDS = "dashboards"
FF_ENABLE_ALL = "*"

View file

@ -1148,3 +1148,102 @@ class ImportExportHandler(metaclass=baserow_trace_methods(tracer)):
ImportExportResource.objects_and_trash.filter(
id__in=resources_to_delete
).delete()
def add_trusted_public_key(self, name, public_key_data):
"""
Adds a new trusted public key to the `ImportExportTrustedSource` model.
:param name: The name of the trusted public key.
:param public_key_data: Public key in PEM format.
"""
try:
decoded_public_key = base64.b64decode(public_key_data)
public_key = serialization.load_pem_public_key(
decoded_public_key, backend=default_backend()
)
public_key_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("utf-8")
except Exception: # noqa
logger.error("Provided public key is invalid or in wrong format")
return
try:
source = ImportExportTrustedSource.objects.get(public_key=public_key_pem)
logger.warning(
f"Key with that public key already exists with ID #{source.id}"
)
except ImportExportTrustedSource.DoesNotExist:
ImportExportTrustedSource.objects.create(
name=name,
public_key=public_key_pem,
)
logger.info("Public key added", name)
except ImportExportTrustedSource.MultipleObjectsReturned:
logger.error("Multiple keys found with the same name", name)
def list_trusted_public_keys(self):
"""
Lists all trusted public keys.
This method retrieves and prints all the trusted public keys stored in the
database.
"""
col_widths = {
"ID": 10,
"Created At": 20,
"Name": 30,
"Public Key (last 50 chars)": 50,
}
headers = list(col_widths.keys())
divider = "-" * (sum(col_widths.values()) + len(col_widths) * 3)
print(divider)
print(
f"{headers[0]:<{col_widths['ID']}} | "
f"{headers[1]:<{col_widths['Created At']}} | "
f"{headers[2]:<{col_widths['Name']}} | "
f"{headers[3]:<{col_widths['Public Key (last 50 chars)']}}"
)
print(divider)
for record in ImportExportTrustedSource.objects.all():
public_key_str = record.public_key.replace(
"-----END PUBLIC KEY-----", ""
).replace("\n", "")[-50:]
created_at_str = record.created_at.strftime("%Y-%m-%d %H:%M:%S")
print(
f"{str(record.id):<{col_widths['ID']}} | "
f"{created_at_str:<{col_widths['Created At']}} | "
f"{record.name:<{col_widths['Name']}} | "
f"{public_key_str:<{col_widths['Public Key (last 50 chars)']}}"
)
print(divider)
def delete_trusted_public_key(self, source_id: str):
"""
Deletes a trusted public key by its ID.
This method attempts to delete a trusted public key from the
`ImportExportTrustedSource` model based on the provided ID.
Only keys without an associated private key can be deleted.
:param source_id: The ID of the trusted public key to be deleted.
"""
try:
source = ImportExportTrustedSource.objects.get(id=source_id)
except ImportExportTrustedSource.DoesNotExist:
logger.warning(f"Trusted public key for ID #{source_id} does not exist")
else:
if source.private_key:
logger.warning(
f"Trusted source cannot be removed as it has a private key"
)
else:
source.delete()
logger.info(f"Trusted public key for ID #{source_id} removed")

View file

@ -0,0 +1,55 @@
from django.core.management.base import BaseCommand
from baserow.core.import_export.handler import ImportExportHandler
class Command(BaseCommand):
help = (
"Provides management commands for handling trusted sources used in the "
"application's import and export processes. This includes listing all trusted "
"sources, adding new ones, deleting existing ones by name, and displaying the "
"public key of a trusted source."
)
def add_arguments(self, parser):
subparsers = parser.add_subparsers(
dest="action", help="Action to perform (list, add, remove)"
)
# Subparser for 'list'
subparsers.add_parser("list", help="List all trusted public keys.")
# Subparser for 'add'
add_parser = subparsers.add_parser("add", help="Add a new trusted public key.")
add_parser.add_argument(
"name", type=str, help="Name of the trusted public key."
)
add_parser.add_argument(
"public_key_data", type=str, help="Base64 encoded public key data."
)
# Subparser for 'remove'
remove_parser = subparsers.add_parser(
"remove", help="Remove an existing trusted public key."
)
remove_parser.add_argument(
"source_id", type=str, help="ID of the trusted public key to remove."
)
def handle(self, *args, **options):
handler = ImportExportHandler()
action = options.get("action")
if action == "list":
handler.list_trusted_public_keys()
elif action == "add":
name = options.get("name")
public_key_data = options.get("public_key_data")
handler.add_trusted_public_key(name, public_key_data)
elif action == "remove":
source_id = options.get("source_id")
handler.delete_trusted_public_key(source_id)
else:
self.stdout.write(
self.style.ERROR("Invalid action. Use 'list', 'add', or 'remove'.")
)

View file

@ -24,7 +24,7 @@ class ImportExportWorkspaceFixtures:
) as file_handler:
file_handler.write(content)
def create_import_export_trusted_source(self, user):
def create_import_export_trusted_source(self):
return ImportExportTrustedSource.objects.create(
name="Test trusted source",
private_key=TEST_IMPORT_EXPORT_PRIVATE_KEY,

View file

@ -1,7 +1,6 @@
import json
import zipfile
from django.test.utils import override_settings
from django.urls import reverse
import pytest
@ -10,7 +9,6 @@ from rest_framework.status import (
HTTP_200_OK,
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
)
@ -18,31 +16,6 @@ from baserow.core.import_export.handler import EXPORT_FORMAT_VERSION, MANIFEST_N
from baserow.version import VERSION
@pytest.mark.import_export_workspace
@pytest.mark.django_db
@override_settings(
FEATURE_FLAGS="",
)
def test_exporting_workspace_with_feature_flag_disabled(
data_fixture, api_client, tmpdir
):
user, token = data_fixture.create_user_and_token()
workspace = data_fixture.create_workspace(user=user)
data_fixture.create_database_application(workspace=workspace)
response = api_client.post(
reverse(
"api:workspaces:export_workspace_async",
kwargs={"workspace_id": workspace.id},
),
data={},
format="json",
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_403_FORBIDDEN
assert response.json()["error"] == "ERROR_FEATURE_DISABLED"
@pytest.mark.import_export_workspace
@pytest.mark.django_db
def test_exporting_missing_workspace_returns_error(data_fixture, api_client, tmpdir):
@ -279,29 +252,6 @@ def test_exporting_workspace_with_single_empty_database(
assert exported_database["files"]["media"]["checksum"] is not None
@pytest.mark.import_export_workspace
@pytest.mark.django_db
@override_settings(
FEATURE_FLAGS="",
)
def test_list_exports_with_feature_flag_disabled(data_fixture, api_client, tmpdir):
user, token = data_fixture.create_user_and_token()
workspace = data_fixture.create_workspace(user=user)
data_fixture.create_database_application(workspace=workspace)
response = api_client.get(
reverse(
"api:workspaces:export_workspace_list",
kwargs={"workspace_id": workspace.id},
),
data={},
format="json",
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_403_FORBIDDEN
assert response.json()["error"] == "ERROR_FEATURE_DISABLED"
@pytest.mark.import_export_workspace
@pytest.mark.django_db
def test_list_exports_with_missing_workspace(data_fixture, api_client, tmpdir):

View file

@ -1,37 +1,13 @@
from django.test.utils import override_settings
from django.urls import reverse
import pytest
from rest_framework.status import (
HTTP_204_NO_CONTENT,
HTTP_400_BAD_REQUEST,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
)
@pytest.mark.import_export_workspace
@pytest.mark.django_db
@override_settings(
FEATURE_FLAGS="",
)
def test_delete_resource_with_feature_flag_disabled(data_fixture, api_client, tmpdir):
user, token = data_fixture.create_user_and_token()
workspace = data_fixture.create_workspace(user=user)
response = api_client.delete(
reverse(
"api:workspaces:import_workspace_resource",
kwargs={"workspace_id": workspace.id, "resource_id": 1},
),
data={},
format="json",
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_403_FORBIDDEN
assert response.json()["error"] == "ERROR_FEATURE_DISABLED"
@pytest.mark.import_export_workspace
@pytest.mark.django_db
def test_delete_non_existing_resource(data_fixture, api_client, tmpdir):

View file

@ -2,38 +2,10 @@ import os
from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test.utils import override_settings
from django.urls import reverse
import pytest
from rest_framework.status import (
HTTP_200_OK,
HTTP_400_BAD_REQUEST,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
)
@pytest.mark.import_export_workspace
@pytest.mark.django_db
@override_settings(
FEATURE_FLAGS="",
)
def test_upload_file_with_feature_flag_disabled(data_fixture, api_client, tmpdir):
user, token = data_fixture.create_user_and_token()
workspace = data_fixture.create_workspace(user=user)
response = api_client.post(
reverse(
"api:workspaces:import_workspace_upload_file",
kwargs={"workspace_id": workspace.id},
),
data={},
format="json",
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_403_FORBIDDEN
assert response.json()["error"] == "ERROR_FEATURE_DISABLED"
from rest_framework.status import HTTP_200_OK, HTTP_400_BAD_REQUEST, HTTP_404_NOT_FOUND
@pytest.mark.import_export_workspace
@ -133,7 +105,7 @@ def test_upload_valid_file(data_fixture, api_client, tmpdir, use_tmp_media_root)
with open(f"{sources_path}/interesting_database_export.zip", "rb") as export_file:
file_content = export_file.read()
data_fixture.create_import_export_trusted_source(user=user)
data_fixture.create_import_export_trusted_source()
uploaded_file = SimpleUploadedFile(
"interesting_database_export.zip", file_content, content_type="application/zip"

View file

@ -1,7 +1,6 @@
import os
from django.conf import settings
from django.test.utils import override_settings
from django.urls import reverse
import pytest
@ -9,35 +8,10 @@ from rest_framework.status import (
HTTP_200_OK,
HTTP_202_ACCEPTED,
HTTP_400_BAD_REQUEST,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
)
@pytest.mark.import_export_workspace
@pytest.mark.django_db
@override_settings(
FEATURE_FLAGS="",
)
def test_import_applications_with_feature_flag_disabled(
data_fixture, api_client, tmpdir
):
user, token = data_fixture.create_user_and_token()
workspace = data_fixture.create_workspace(user=user)
response = api_client.post(
reverse(
"api:workspaces:import_workspace_async",
kwargs={"workspace_id": workspace.id},
),
data={"resource_id": 1},
format="json",
HTTP_AUTHORIZATION=f"JWT {token}",
)
assert response.status_code == HTTP_403_FORBIDDEN
assert response.json()["error"] == "ERROR_FEATURE_DISABLED"
@pytest.mark.import_export_workspace
@pytest.mark.django_db
def test_import_applications_into_non_existing_workspace(
@ -117,7 +91,7 @@ def test_import_applications(data_fixture, api_client, tmpdir, use_tmp_media_roo
settings.BASE_DIR, "../../../tests/baserow/api/import_export/sources"
)
data_fixture.create_import_export_trusted_source(user=user)
data_fixture.create_import_export_trusted_source()
resource = data_fixture.create_import_export_resource(
created_by=user, original_name="interesting_database.zip", is_valid=True

View file

@ -28,7 +28,7 @@ def test_exporting_interesting_database(
include_permission_data=False, reduce_disk_space_usage=False
)
data_fixture.create_import_export_trusted_source(user=user)
data_fixture.create_import_export_trusted_source()
database = setup_interesting_test_database(
data_fixture,

View file

@ -25,7 +25,7 @@ def test_import_with_missing_files(data_fixture, use_tmp_media_root, tmp_path):
user = data_fixture.create_user()
workspace = data_fixture.create_workspace()
data_fixture.create_import_export_trusted_source(user=user)
data_fixture.create_import_export_trusted_source()
zip_name = "interesting_database_export_missing_files.zip"
resource = data_fixture.create_import_export_resource(
@ -63,7 +63,7 @@ def test_import_with_modified_files(data_fixture, use_tmp_media_root, tmp_path):
user = data_fixture.create_user()
workspace = data_fixture.create_workspace()
data_fixture.create_import_export_trusted_source(user=user)
data_fixture.create_import_export_trusted_source()
zip_name = "interesting_database_export_modified_files.zip"
resource = data_fixture.create_import_export_resource(
@ -102,7 +102,7 @@ def test_import_with_unexpected_files(data_fixture, use_tmp_media_root, tmp_path
user = data_fixture.create_user()
workspace = data_fixture.create_workspace()
data_fixture.create_import_export_trusted_source(user=user)
data_fixture.create_import_export_trusted_source()
zip_name = "interesting_database_export_unexpected_files.zip"
resource = data_fixture.create_import_export_resource(

View file

@ -30,7 +30,7 @@ def test_import_without_signature_and_check_enabled(
):
user = data_fixture.create_user()
data_fixture.create_import_export_trusted_source(user=user)
data_fixture.create_import_export_trusted_source()
zip_name = "interesting_database_without_signature_disabled_check.zip"
resource = data_fixture.create_import_export_resource(
@ -70,7 +70,7 @@ def test_import_without_signature_and_check_disabled(
core_settings.verify_import_signature = False
core_settings.save()
data_fixture.create_import_export_trusted_source(user=user)
data_fixture.create_import_export_trusted_source()
zip_name = "interesting_database_without_signature_enabled_check.zip"
resource = data_fixture.create_import_export_resource(
@ -103,7 +103,7 @@ def test_import_without_signature_and_check_disabled(
def test_import_without_signature_data(data_fixture, use_tmp_media_root, tmp_path):
user = data_fixture.create_user()
data_fixture.create_import_export_trusted_source(user=user)
data_fixture.create_import_export_trusted_source()
zip_name = "interesting_database_without_signature_data.zip"

View file

@ -0,0 +1,101 @@
import pytest
from loguru import logger
from baserow.core.import_export.handler import ImportExportHandler
from baserow.core.models import ImportExportTrustedSource
SAMPLE_B64_PUBLIC_KEY = (
"LS0tLS1CRUdJTiBSU0EgUFVCTElDIEtFWS0tLS0tCk1JSUJpZ0tDQVlFQXZtcTc3UHRmclpPbXB"
"MQnFoSkZOVkdFMXdGYURPTmlLdnJhOE5ORWhMOWo1aFUzc0o4NFgKcmJpS3JMeFVES1R4ckFKaz"
"J1VFdzOEwrd1k3T2puS1ZLNXlHTmhWYTFiNHZpaXJraVlwYlhncHdwc2FKekl0RwpTVTlZZ3J0a"
"VBZTGhnRGYxQmtsbHZBTmVBOU80ZDRqNTR5dzVSK0JrYXNMVy9DMWptSUpiRWRuaFJ5QlM1SE41"
"CmFkdzd1QlE1SzBUWGkzcVBFaXo3KzZtUURXanB5VkV5V3RLSnBqQmtHcXZGNXlhU05ibi9rMEc"
"yWnhaU1ZiSHQKYTA5eHlPZDZOV0VUVWtmTzdYcGl3NWlTWG0yUm9LeU5IT2VPK3hQSjdDREFxcj"
"B6MkwwQXlVaDJDZVhhNURtUwo2cGpCZjlmUGRJcnhNNCt2L1lBSUdSWkE3NFBGZllkd1RteHlrY"
"01nQVhtcWlLaWx4SjNwbVZVWDlPZ3lnMFlOCnN4OHE0ejhNcHBIL0dJRjJLVlhsMW5CcXd3b1lZ"
"TGJHN1crM2MycjA4NmtGa2RzVEhnVkRoc0tNNUM0NURGdkMKNmFaTkkwS2VtcHZPOXpUaVMxQ3h"
"kM0xKbHhLQU9haCt4eUFaWkUwbm42cWFvKzFQYWN6YWlTOGVmd3c0VjVudApVVzNYVDRZYTUweG"
"hBZ01CQUFFPQotLS0tLUVORCBSU0EgUFVCTElDIEtFWS0tLS0t"
)
@pytest.mark.import_export_workspace
@pytest.mark.django_db
def test_add_invalid_public_key_fails(data_fixture, mocker):
mock_logger_error = mocker.patch.object(logger, "error")
ImportExportHandler().add_trusted_public_key("test #1", "Some invalid key")
log_messages = [args[0] for args, _ in mock_logger_error.call_args_list]
assert any(
"Provided public key is invalid or in wrong format" in message
for message in log_messages
)
assert not ImportExportTrustedSource.objects.filter(name="test #1").exists()
@pytest.mark.import_export_workspace
@pytest.mark.django_db
def test_add_valid_public_key(data_fixture, mocker):
mock_logger_error = mocker.patch.object(logger, "error")
mock_logger_warning = mocker.patch.object(logger, "warning")
ImportExportHandler().add_trusted_public_key("test #1", SAMPLE_B64_PUBLIC_KEY)
mock_logger_error.assert_not_called()
mock_logger_warning.assert_not_called()
assert ImportExportTrustedSource.objects.filter(name="test #1").exists()
@pytest.mark.import_export_workspace
@pytest.mark.django_db
def test_add_duplicated_public_key_fails(data_fixture, mocker):
mock_logger_error = mocker.patch.object(logger, "error")
mock_logger_warning = mocker.patch.object(logger, "warning")
ImportExportHandler().add_trusted_public_key("test #1", SAMPLE_B64_PUBLIC_KEY)
ImportExportHandler().add_trusted_public_key("test #1", SAMPLE_B64_PUBLIC_KEY)
mock_logger_error.assert_not_called()
source = ImportExportTrustedSource.objects.get(name="test #1")
log_messages = [args[0] for args, _ in mock_logger_warning.call_args_list]
assert (
f"Key with that public key already exists with ID #{source.id}" in log_messages
)
@pytest.mark.import_export_workspace
@pytest.mark.django_db
def test_remove_private_key_fails(data_fixture, mocker):
source = data_fixture.create_import_export_trusted_source()
mock_logger_error = mocker.patch.object(logger, "error")
mock_logger_warning = mocker.patch.object(logger, "warning")
ImportExportHandler().delete_trusted_public_key(source_id=source.id)
mock_logger_error.assert_not_called()
log_messages = [args[0] for args, _ in mock_logger_warning.call_args_list]
assert "Trusted source cannot be removed as it has a private key" in log_messages
@pytest.mark.import_export_workspace
@pytest.mark.django_db
def test_remove_public_key(data_fixture, mocker):
ImportExportHandler().add_trusted_public_key("test #1", SAMPLE_B64_PUBLIC_KEY)
source = ImportExportTrustedSource.objects.get(name="test #1")
mock_logger_error = mocker.patch.object(logger, "error")
mock_logger_warning = mocker.patch.object(logger, "warning")
ImportExportHandler().delete_trusted_public_key(source_id=source.id)
mock_logger_error.assert_not_called()
mock_logger_warning.assert_not_called()
assert not ImportExportTrustedSource.objects.filter(name="test #1").exists()

View file

@ -0,0 +1,7 @@
{
"type": "feature",
"message": "Add CLI command for managing public keys for trusted sources",
"issue_number": 3173,
"bullet_points": [],
"created_at": "2024-11-08"
}

View file

@ -15,14 +15,11 @@
></div>
<ul v-else class="context__menu">
<li
v-if="
$hasPermission('workspace.read', workspace, workspace.id) &&
$featureFlagIsEnabled(FF_EXPORT_WORKSPACE)
"
v-if="$hasPermission('workspace.read', workspace, workspace.id)"
class="context__menu-item"
>
<a class="context__menu-item-link" @click="openExportData">
<i class="context__menu-item-icon iconoir-arrow-up-circle"></i>
<i class="context__menu-item-icon iconoir-share-ios"></i>
{{ $t('workspaceContext.exportWorkspace') }}
</a>
</li>
@ -32,12 +29,12 @@
'workspace.create_application',
workspace,
workspace.id
) && $featureFlagIsEnabled(FF_EXPORT_WORKSPACE)
)
"
class="context__menu-item"
>
<a class="context__menu-item-link" @click="openImportData">
<i class="context__menu-item-icon iconoir-arrow-down-circle"></i>
<i class="context__menu-item-icon iconoir-import"></i>
{{ $t('workspaceContext.importWorkspace') }}
</a>
</li>
@ -156,7 +153,6 @@ import ImportWorkspaceModal from '@baserow/modules/core/components/import/Import
import TrashModal from '@baserow/modules/core/components/trash/TrashModal'
import LeaveWorkspaceModal from '@baserow/modules/core/components/workspace/LeaveWorkspaceModal'
import WorkspaceSettingsModal from '@baserow/modules/core/components/workspace/WorkspaceSettingsModal'
import { FF_EXPORT_WORKSPACE } from '@baserow/modules/core/plugins/featureFlags'
export default {
name: 'WorkspaceContext',
@ -177,7 +173,6 @@ export default {
data() {
return {
loading: false,
FF_EXPORT_WORKSPACE,
}
},
methods: {

View file

@ -1,5 +1,4 @@
const FF_ENABLE_ALL = '*'
export const FF_EXPORT_WORKSPACE = 'export_workspace'
export const FF_DASHBOARDS = 'dashboards'
/**