From acf38f2002db61930c82b67a455d515851b92884 Mon Sep 17 00:00:00 2001
From: Davide Silvestri <davide@baserow.io>
Date: Tue, 26 Jul 2022 11:58:28 +0000
Subject: [PATCH] Resolve "Duplicate table"

---
 .../contrib/database/api/tables/urls.py       |   6 +
 .../contrib/database/api/tables/views.py      |  53 ++++
 .../contrib/database/application_types.py     | 215 ++++++++++------
 backend/src/baserow/contrib/database/apps.py  |   4 +
 .../src/baserow/contrib/database/db/atomic.py |  50 +++-
 .../contrib/database/fields/field_helpers.py  |   3 +-
 .../contrib/database/fields/field_types.py    |  66 ++++-
 .../management/commands/fill_table_fields.py  |   4 +-
 .../migrations/0084_duplicatetablejob.py      |  71 ++++++
 .../baserow/contrib/database/table/actions.py |  50 ++++
 .../baserow/contrib/database/table/handler.py |  81 +++++-
 .../contrib/database/table/job_types.py       |  64 +++++
 .../baserow/contrib/database/table/models.py  |  22 ++
 .../contrib/database/views/registries.py      |   2 +-
 backend/src/baserow/core/actions.py           |  10 +-
 backend/src/baserow/core/handler.py           |  18 +-
 backend/src/baserow/core/job_types.py         |   6 +-
 backend/src/baserow/core/models.py            |   3 +
 backend/src/baserow/core/utils.py             |  17 +-
 backend/src/baserow/test_utils/helpers.py     |  15 +-
 backend/templates/car-maintenance-log.json    |  14 --
 backend/templates/contract-management.json    |   7 -
 backend/templates/product-roadmap.json        |   9 -
 .../database/api/rows/test_row_serializers.py |   1 +
 .../database/api/tables/test_table_views.py   | 147 ++++++++++-
 .../database/export/test_export_handler.py    |   6 +-
 .../database/field/test_field_handler.py      |   1 +
 .../database/field/test_field_types.py        |   2 +
 .../database/table/test_table_actions.py      | 234 ++++++++++++++----
 .../database/table/test_table_handler.py      |  36 +++
 .../database/table/test_table_job_types.py    |  69 ++++++
 .../core/actions/test_application_actions.py  |  12 +-
 .../tests/baserow/core/test_core_job_types.py |  12 +-
 changelog.md                                  |   1 +
 .../export/test_premium_export_types.py       |   4 +
 web-frontend/locales/en.json                  |   1 +
 .../components/sidebar/SidebarItem.vue        |  18 +-
 .../SidebarDuplicateTableContextItem.vue      |  91 +++++++
 .../modules/database/services/table.js        |   3 +
 web-frontend/modules/database/store/table.js  |  16 ++
 40 files changed, 1234 insertions(+), 210 deletions(-)
 create mode 100644 backend/src/baserow/contrib/database/migrations/0084_duplicatetablejob.py
 create mode 100644 backend/src/baserow/contrib/database/table/job_types.py
 create mode 100644 backend/tests/baserow/contrib/database/table/test_table_job_types.py
 create mode 100644 web-frontend/modules/database/components/sidebar/table/SidebarDuplicateTableContextItem.vue

diff --git a/backend/src/baserow/contrib/database/api/tables/urls.py b/backend/src/baserow/contrib/database/api/tables/urls.py
index 057fc0f8c..9230d8558 100644
--- a/backend/src/baserow/contrib/database/api/tables/urls.py
+++ b/backend/src/baserow/contrib/database/api/tables/urls.py
@@ -6,6 +6,7 @@ from .views import (
     OrderTablesView,
     AsyncCreateTableView,
     AsyncTableImportView,
+    AsyncDuplicateTableView,
 )
 
 
@@ -23,6 +24,11 @@ urlpatterns = [
         OrderTablesView.as_view(),
         name="order",
     ),
+    re_path(
+        r"(?P<table_id>[0-9]+)/duplicate/async/$",
+        AsyncDuplicateTableView.as_view(),
+        name="async_duplicate",
+    ),
     re_path(r"(?P<table_id>[0-9]+)/$", TableView.as_view(), name="item"),
     re_path(
         r"(?P<table_id>[0-9]+)/import/async/$",
diff --git a/backend/src/baserow/contrib/database/api/tables/views.py b/backend/src/baserow/contrib/database/api/tables/views.py
index 8e74654e9..d5a31da00 100644
--- a/backend/src/baserow/contrib/database/api/tables/views.py
+++ b/backend/src/baserow/contrib/database/api/tables/views.py
@@ -2,6 +2,7 @@ from django.conf import settings
 from django.db import transaction
 from drf_spectacular.openapi import OpenApiParameter, OpenApiTypes
 from drf_spectacular.utils import extend_schema
+from rest_framework import status
 from rest_framework.permissions import IsAuthenticated
 from rest_framework.response import Response
 from rest_framework.views import APIView
@@ -23,6 +24,7 @@ from baserow.contrib.database.api.fields.errors import (
     ERROR_RESERVED_BASEROW_FIELD_NAME,
     ERROR_INVALID_BASEROW_FIELD_NAME,
 )
+from baserow.contrib.database.table.job_types import DuplicateTableJobType
 from baserow.contrib.database.fields.exceptions import (
     MaxFieldLimitExceeded,
     MaxFieldNameLengthExceeded,
@@ -500,3 +502,54 @@ class OrderTablesView(APIView):
         )
 
         return Response(status=204)
+
+
+class AsyncDuplicateTableView(APIView):
+    permission_classes = (IsAuthenticated,)
+
+    @extend_schema(
+        parameters=[
+            OpenApiParameter(
+                name="table_id",
+                location=OpenApiParameter.PATH,
+                type=OpenApiTypes.INT,
+                description="The table to duplicate.",
+            ),
+            CLIENT_SESSION_ID_SCHEMA_PARAMETER,
+            CLIENT_UNDO_REDO_ACTION_GROUP_ID_SCHEMA_PARAMETER,
+        ],
+        tags=["Database tables"],
+        operation_id="duplicate_database_table",
+        description=(
+            "Duplicates the table with the provided `table_id` parameter "
+            "if the authorized user has access to the database's group."
+        ),
+        responses={
+            202: DuplicateTableJobType().get_serializer_class(),
+            400: get_error_schema(
+                [
+                    "ERROR_USER_NOT_IN_GROUP",
+                    "ERROR_REQUEST_BODY_VALIDATION",
+                    "ERROR_MAX_JOB_COUNT_EXCEEDED",
+                ]
+            ),
+            404: get_error_schema(["ERROR_TABLE_DOES_NOT_EXIST"]),
+        },
+    )
+    @transaction.atomic
+    @map_exceptions(
+        {
+            TableDoesNotExist: ERROR_TABLE_DOES_NOT_EXIST,
+            UserNotInGroup: ERROR_USER_NOT_IN_GROUP,
+            MaxJobCountExceeded: ERROR_MAX_JOB_COUNT_EXCEEDED,
+        }
+    )
+    def post(self, request, table_id):
+        """Creates a job to duplicate a table in a database."""
+
+        job = JobHandler().create_and_start_job(
+            request.user, DuplicateTableJobType.type, table_id=table_id
+        )
+
+        serializer = job_type_registry.get_serializer(job, JobSerializer)
+        return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
diff --git a/backend/src/baserow/contrib/database/application_types.py b/backend/src/baserow/contrib/database/application_types.py
index f14dc969f..a6689bfdd 100644
--- a/backend/src/baserow/contrib/database/application_types.py
+++ b/backend/src/baserow/contrib/database/application_types.py
@@ -1,5 +1,5 @@
 from datetime import datetime
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, List
 from zipfile import ZipFile
 
 from django.core.files.storage import Storage
@@ -60,28 +60,18 @@ class DatabaseApplicationType(ApplicationType):
     def export_safe_transaction_context(self, application) -> Atomic:
         return read_repeatable_single_database_atomic_transaction(application.id)
 
-    def export_serialized(
+    def export_tables_serialized(
         self,
-        database: Database,
+        tables: List[Table],
         files_zip: Optional[ZipFile] = None,
         storage: Optional[Storage] = None,
-    ) -> Dict[str, Any]:
+    ) -> List[Dict[str, Any]]:
         """
-        Exports the database application type to a serialized format that can later be
-        be imported via the `import_serialized`.
-        Call this function in a transaction_atomic(isolation_level=REPEATABLE_READ)
-        to ensure to read rows as they were at the beginning of the transaction,
-        independently from subsequent committed changes.
+        Exports the tables provided  to a serialized format that can later be
+        be imported via the `import_tables_serialized`.
         """
 
-        tables = database.table_set.all().prefetch_related(
-            "field_set",
-            "view_set",
-            "view_set__viewfilter_set",
-            "view_set__viewsort_set",
-        )
-
-        serialized_tables = []
+        serialized_tables: List[Dict[str, Any]] = []
         for table in tables:
             fields = table.field_set.all()
             serialized_fields = []
@@ -100,7 +90,7 @@ class DatabaseApplicationType(ApplicationType):
 
             model = table.get_model(fields=fields, add_dependencies=False)
             serialized_rows = []
-            table_cache = {}
+            table_cache: Dict[str, Any] = {}
             for row in model.objects.all():
                 serialized_row = DatabaseExportSerializedStructure.row(
                     id=row.id,
@@ -126,6 +116,27 @@ class DatabaseApplicationType(ApplicationType):
                     rows=serialized_rows,
                 )
             )
+        return serialized_tables
+
+    def export_serialized(
+        self,
+        database: Database,
+        files_zip: Optional[ZipFile] = None,
+        storage: Optional[Storage] = None,
+    ) -> Dict[str, Any]:
+        """
+        Exports the database application type to a serialized format that can later be
+        be imported via the `import_serialized`.
+        """
+
+        tables = database.table_set.all().prefetch_related(
+            "field_set",
+            "view_set",
+            "view_set__viewfilter_set",
+            "view_set__viewsort_set",
+        )
+
+        serialized_tables = self.export_tables_serialized(tables, files_zip, storage)
 
         serialized = super().export_serialized(database, files_zip, storage)
         serialized.update(
@@ -133,30 +144,16 @@ class DatabaseApplicationType(ApplicationType):
         )
         return serialized
 
-    def import_serialized(
-        self,
-        group: Group,
-        serialized_values: Dict[str, Any],
-        id_mapping: Dict[str, Any],
-        files_zip: Optional[ZipFile] = None,
-        storage: Optional[Storage] = None,
-        progress_builder: Optional[ChildProgressBuilder] = None,
-    ) -> Application:
-        """
-        Imports a database application exported by the `export_serialized` method.
-        """
-
-        tables = serialized_values.pop("tables")
-
-        child_total = (
-            # For the super application
-            1
+    def _ops_count_for_import_tables_serialized(
+        self, serialized_tables: List[Dict[str, Any]]
+    ) -> int:
+        return (
             +
             # Creating each table
-            len(tables)
+            len(serialized_tables)
             +
             # Creating each model table
-            len(tables)
+            len(serialized_tables)
             + sum(
                 [
                     # Inserting every field
@@ -169,52 +166,65 @@ class DatabaseApplicationType(ApplicationType):
                     len(table["rows"]) +
                     # After each field
                     len(table["fields"])
-                    for table in tables
+                    for table in serialized_tables
                 ]
             )
         )
-        progress = ChildProgressBuilder.build(progress_builder, child_total=child_total)
 
-        database = super().import_serialized(
-            group,
-            serialized_values,
-            id_mapping,
-            files_zip,
-            storage,
-            progress.create_child_builder(represents_progress=1),
-        )
+    def import_tables_serialized(
+        self,
+        database: Database,
+        serialized_tables: List[Dict[str, Any]],
+        id_mapping: Dict[str, Any],
+        files_zip: Optional[ZipFile] = None,
+        storage: Optional[Storage] = None,
+        progress_builder: Optional[ChildProgressBuilder] = None,
+    ) -> List[Table]:
+        """
+        Imports tables exported by the `export_tables_serialized` method.
+        This method has been created in order to import single tables or partial
+        applications. Beware to have all the tables needed in the
+        id_mapping["database_tables"] to make this works for link-row fields.
+        Look at `import_serialized` to know how to call this function.
+        """
+
+        child_total = self._ops_count_for_import_tables_serialized(serialized_tables)
+        progress = ChildProgressBuilder.build(progress_builder, child_total=child_total)
 
         if "database_tables" not in id_mapping:
             id_mapping["database_tables"] = {}
 
+        tables: List[Table] = []
+
         # First, we want to create all the table instances because it could be that
         # field or view properties depend on the existence of a table.
-        for table in tables:
-            table_object = Table.objects.create(
+        for serialized_table in serialized_tables:
+            table_instance = Table.objects.create(
                 database=database,
-                name=table["name"],
-                order=table["order"],
+                name=serialized_table["name"],
+                order=serialized_table["order"],
             )
-            id_mapping["database_tables"][table["id"]] = table_object.id
-            table["_object"] = table_object
-            table["_field_objects"] = []
+            id_mapping["database_tables"][serialized_table["id"]] = table_instance.id
+            serialized_table["_object"] = table_instance
+            serialized_table["_field_objects"] = []
+            tables.append(table_instance)
             progress.increment(state=IMPORT_SERIALIZED_IMPORTING)
 
         # Because view properties might depend on fields, we first want to create all
         # the fields.
         fields_excluding_reversed_linked_fields = []
         none_field_count = 0
-        for table in tables:
-            for field in table["fields"]:
+        for serialized_table in serialized_tables:
+            for field in serialized_table["fields"]:
                 field_type = field_type_registry.get(field["type"])
-                field_object = field_type.import_serialized(
-                    table["_object"], field, id_mapping
+                field_instance = field_type.import_serialized(
+                    serialized_table["_object"], field, id_mapping
                 )
 
-                if field_object:
-                    table["_field_objects"].append(field_object)
+                if field_instance:
+                    serialized_table["_field_objects"].append(field_instance)
                     fields_excluding_reversed_linked_fields.append(
-                        (field_type, field_object)
+                        (field_type, field_instance)
                     )
                 else:
                     none_field_count += 1
@@ -227,30 +237,30 @@ class DatabaseApplicationType(ApplicationType):
 
         # Now that the all tables and fields exist, we can create the views and create
         # the table schema in the database.
-        for table in tables:
-            for view in table["views"]:
+        for serialized_table in serialized_tables:
+            for view in serialized_table["views"]:
                 view_type = view_type_registry.get(view["type"])
                 view_type.import_serialized(
-                    table["_object"], view, id_mapping, files_zip, storage
+                    serialized_table["_object"], view, id_mapping, files_zip, storage
                 )
                 progress.increment(state=IMPORT_SERIALIZED_IMPORTING)
 
             # We don't need to create all the fields individually because the schema
             # editor can handle the creation of the table schema in one go.
             with safe_django_schema_editor() as schema_editor:
-                model = table["_object"].get_model(
-                    fields=table["_field_objects"],
+                table_model = serialized_table["_object"].get_model(
+                    fields=serialized_table["_field_objects"],
                     field_ids=[],
                     managed=True,
                     add_dependencies=False,
                 )
-                table["_model"] = model
-                schema_editor.create_model(model)
+                serialized_table["_model"] = table_model
+                schema_editor.create_model(table_model)
 
                 # The auto_now_add and auto_now must be disabled for all fields
                 # because the export contains correct values and we don't want them
                 # to be overwritten when importing.
-                for model_field in table["_model"]._meta.get_fields():
+                for model_field in serialized_table["_model"]._meta.get_fields():
                     if hasattr(model_field, "auto_now_add"):
                         model_field.auto_now_add = False
 
@@ -261,12 +271,14 @@ class DatabaseApplicationType(ApplicationType):
 
         # Now that everything is in place we can start filling the table with the rows
         # in an efficient matter by using the bulk_create functionality.
-        for table in tables:
-            model = table["_model"]
-            field_ids = [field_object.id for field_object in table["_field_objects"]]
+        for serialized_table in serialized_tables:
+            table_model = serialized_table["_model"]
+            field_ids = [
+                field_object.id for field_object in serialized_table["_field_objects"]
+            ]
             rows_to_be_inserted = []
 
-            for row in table["rows"]:
+            for row in serialized_table["rows"]:
                 created_on = row.get("created_on")
                 updated_on = row.get("updated_on")
 
@@ -280,14 +292,14 @@ class DatabaseApplicationType(ApplicationType):
                 else:
                     updated_on = timezone.now()
 
-                row_object = model(
+                row_object = table_model(
                     id=row["id"],
                     order=row["order"],
                     created_on=created_on,
                     updated_on=updated_on,
                 )
 
-                for field in table["fields"]:
+                for field in serialized_table["fields"]:
                     field_type = field_type_registry.get(field["type"])
                     new_field_id = id_mapping["database_fields"][field["id"]]
                     field_name = f'field_{field["id"]}'
@@ -310,23 +322,23 @@ class DatabaseApplicationType(ApplicationType):
 
                 rows_to_be_inserted.append(row_object)
                 progress.increment(
-                    state=f"{IMPORT_SERIALIZED_IMPORTING_TABLE}{table['id']}"
+                    state=f"{IMPORT_SERIALIZED_IMPORTING_TABLE}{serialized_table['id']}"
                 )
 
             # We want to insert the rows in bulk because there could potentially be
             # hundreds of thousands of rows in there and this will result in better
             # performance.
             for chunk in grouper(512, rows_to_be_inserted):
-                model.objects.bulk_create(chunk, batch_size=512)
+                table_model.objects.bulk_create(chunk, batch_size=512)
                 progress.increment(
                     len(chunk),
-                    state=f"{IMPORT_SERIALIZED_IMPORTING_TABLE}{table['id']}",
+                    state=f"{IMPORT_SERIALIZED_IMPORTING_TABLE}{serialized_table['id']}",
                 )
 
             # When the rows are inserted we keep the provide the old ids and because of
             # that the auto increment is still set at `1`. This needs to be set to the
             # maximum value because otherwise creating a new row could later fail.
-            sequence_sql = connection.ops.sequence_reset_sql(no_style(), [model])
+            sequence_sql = connection.ops.sequence_reset_sql(no_style(), [table_model])
             with connection.cursor() as cursor:
                 cursor.execute(sequence_sql[0])
 
@@ -341,5 +353,48 @@ class DatabaseApplicationType(ApplicationType):
         # Add the remaining none fields that we must not include in the import
         # because they were for example reversed link row fields.
         progress.increment(none_field_count, state=IMPORT_SERIALIZED_IMPORTING)
+        return tables
+
+    def import_serialized(
+        self,
+        group: Group,
+        serialized_values: Dict[str, Any],
+        id_mapping: Dict[str, Any],
+        files_zip: Optional[ZipFile] = None,
+        storage: Optional[Storage] = None,
+        progress_builder: Optional[ChildProgressBuilder] = None,
+    ) -> Application:
+        """
+        Imports a database application exported by the `export_serialized` method.
+        """
+
+        serialized_tables = serialized_values.pop("tables")
+        database_progress, table_progress = 1, 99
+        progress = ChildProgressBuilder.build(
+            progress_builder, child_total=database_progress + table_progress
+        )
+
+        application = super().import_serialized(
+            group,
+            serialized_values,
+            id_mapping,
+            files_zip,
+            storage,
+            progress.create_child_builder(represents_progress=database_progress),
+        )
+
+        database = application.specific
+
+        if not serialized_tables:
+            progress.increment(state=IMPORT_SERIALIZED_IMPORTING, by=table_progress)
+        else:
+            self.import_tables_serialized(
+                database,
+                serialized_tables,
+                id_mapping,
+                files_zip,
+                storage,
+                progress.create_child_builder(represents_progress=table_progress),
+            )
 
         return database
diff --git a/backend/src/baserow/contrib/database/apps.py b/backend/src/baserow/contrib/database/apps.py
index aaa9ad0e2..9169a2972 100644
--- a/backend/src/baserow/contrib/database/apps.py
+++ b/backend/src/baserow/contrib/database/apps.py
@@ -68,12 +68,14 @@ class DatabaseConfig(AppConfig):
             DeleteTableActionType,
             OrderTableActionType,
             UpdateTableActionType,
+            DuplicateTableActionType,
         )
 
         action_type_registry.register(CreateTableActionType())
         action_type_registry.register(DeleteTableActionType())
         action_type_registry.register(OrderTableActionType())
         action_type_registry.register(UpdateTableActionType())
+        action_type_registry.register(DuplicateTableActionType())
 
         from .rows.actions import (
             CreateRowActionType,
@@ -410,9 +412,11 @@ class DatabaseConfig(AppConfig):
         from baserow.core.jobs.registries import job_type_registry
         from .airtable.job_type import AirtableImportJobType
         from .file_import.job_type import FileImportJobType
+        from baserow.contrib.database.table.job_types import DuplicateTableJobType
 
         job_type_registry.register(AirtableImportJobType())
         job_type_registry.register(FileImportJobType())
+        job_type_registry.register(DuplicateTableJobType())
 
         # The signals must always be imported last because they use the registries
         # which need to be filled first.
diff --git a/backend/src/baserow/contrib/database/db/atomic.py b/backend/src/baserow/contrib/database/db/atomic.py
index b7df9d885..7a51e6355 100644
--- a/backend/src/baserow/contrib/database/db/atomic.py
+++ b/backend/src/baserow/contrib/database/db/atomic.py
@@ -60,7 +60,7 @@ def read_committed_single_table_transaction(
     unsafe MVCC operations can occur during the transaction then use this context
     manager.
 
-    This manager does one things to ensure this:
+    This manager does one thing to ensure this:
     1. This query runs that first transaction itself and intentionally locks all field
        and the table's metadata row in this first SELECT statement FOR KEY SHARE. This
        means once the transaction has obtained this lock it can proceed safely without
@@ -93,3 +93,51 @@ def read_committed_single_table_transaction(
             first_statement_args,
         ),
     )
+
+
+def read_repeatable_read_single_table_transaction(
+    table_id: int,
+) -> Atomic:
+    """
+    If you want to safely read the contents of a Baserow table inside of a single
+    transaction and be guaranteed that the fields wont change during the transaction no
+    unsafe MVCC operations can occur during the transaction then use this context
+    manager.
+
+    This manager does two things to ensure this:
+    1. It runs in the REPEATABLE READ postgres isolation level, meaning all queries
+       will see a snapshot of the table starting at the first SELECT etc statement
+       run instead the transaction.
+    2. This query runs that first transaction itself and intentionally locks all field
+       and the table's metadata row in this first SELECT statement FOR SHARE. This
+       means once the transaction has obtained this lock it can proceed safely without
+       having to worry about fields being updated during the length of the transaction.
+       We need to lock these rows as otherwise Baserow's various endpoints can
+       execute ALTER TABLE and DROP TABLE statements which are not MVCC safe and can
+       cause
+       https://www.postgresql.org/docs/current/mvcc-caveats.html for more info.
+
+
+    This manager uses READ COMMITTED and as such has a lower overhead, but does not get
+    the snapshot like reading guarantees that REAPEATABLE READ does.
+
+    :param table_id: The table to obtain a table and field locks for to ensure
+        safe reading.
+    :return: An atomic context manager.
+    """
+
+    first_statement = sql.SQL(
+        """
+ SELECT * FROM database_field
+ INNER JOIN database_table ON database_field.table_id = database_table.id
+ WHERE database_table.id = %s FOR KEY SHARE OF database_field, database_table
+"""
+    )
+    first_statement_args = [table_id]
+    return transaction_atomic(
+        isolation_level=IsolationLevel.REPEATABLE_READ,
+        first_sql_to_run_in_transaction_with_args=(
+            first_statement,
+            first_statement_args,
+        ),
+    )
diff --git a/backend/src/baserow/contrib/database/fields/field_helpers.py b/backend/src/baserow/contrib/database/fields/field_helpers.py
index 8f22a2afe..fa21d68df 100644
--- a/backend/src/baserow/contrib/database/fields/field_helpers.py
+++ b/backend/src/baserow/contrib/database/fields/field_helpers.py
@@ -4,7 +4,7 @@ from baserow.contrib.database.fields.registries import field_type_registry
 
 
 def construct_all_possible_field_kwargs(
-    link_table, decimal_link_table, file_link_table
+    table, link_table, decimal_link_table, file_link_table
 ) -> Dict[str, List[Dict[str, Any]]]:
     """
     Some baserow field types have multiple different 'modes' which result in
@@ -104,6 +104,7 @@ def construct_all_possible_field_kwargs(
         ],
         "link_row": [
             {"name": "link_row", "link_row_table": link_table},
+            {"name": "self_link_row", "link_row_table": table},
             {"name": "decimal_link_row", "link_row_table": decimal_link_table},
             {"name": "file_link_row", "link_row_table": file_link_table},
         ],
diff --git a/backend/src/baserow/contrib/database/fields/field_types.py b/backend/src/baserow/contrib/database/fields/field_types.py
index 46f56335c..a31b82cbf 100644
--- a/backend/src/baserow/contrib/database/fields/field_types.py
+++ b/backend/src/baserow/contrib/database/fields/field_types.py
@@ -55,6 +55,7 @@ from baserow.contrib.database.formula import (
     FormulaHandler,
     literal,
 )
+from baserow.contrib.database.table.handler import TableHandler
 from baserow.contrib.database.validators import UnicodeRegexValidator
 from baserow.core.models import UserFile
 from baserow.core.user_files.exceptions import UserFileDoesNotExist
@@ -1514,12 +1515,70 @@ class LinkRowFieldType(FieldType):
         serialized["link_row_related_field_id"] = field.link_row_related_field_id
         return serialized
 
+    def import_serialized_for_table_duplication(
+        self,
+        table: "Table",
+        serialized_values: Dict[str, Any],
+        id_mapping: Dict[str, Any],
+    ) -> Field:
+        """
+        For table duplication we cannot just use the serialized_values, but we need to
+        create a brand new link row field and a new related field in the referenced
+        table.
+
+        :param table: The table to duplicate
+        :param serialized_values: The serialized exported values of the field
+        :param id_mapping: A dictionary mapping old table ids to new table ids
+        :return: The new field
+        """
+
+        serialized_copy = serialized_values.copy()
+        link_row_table_id = serialized_copy.get("link_row_table_id")
+        link_row_table = TableHandler().get_table(link_row_table_id)
+        original_table_id = [
+            k for k, v in id_mapping["database_tables"].items() if v == table.id
+        ][0]
+        original_link_row_related_field_id = serialized_copy.pop(
+            "link_row_related_field_id"
+        )
+
+        # if was a self-referencing link row field, update the link_row_table_id
+        if original_table_id == link_row_table_id:
+            serialized_copy["link_row_table_id"] = table.id
+            return super().import_serialized(table, serialized_copy, id_mapping)
+
+        field = super().import_serialized(table, serialized_copy, id_mapping)
+
+        related_field_name = self.find_next_unused_related_field_name(field)
+        last_order = Field.get_last_order(link_row_table)
+        related_serialized_copy = {
+            "id": original_link_row_related_field_id,
+            "name": related_field_name,
+            "type": serialized_copy.get("type"),
+            "link_row_table_id": table.id,
+            "link_row_related_field_id": field.id,
+            "link_row_relation_id": field.link_row_relation_id,
+            "order": last_order,
+        }
+        related_field = super().import_serialized(
+            link_row_table, related_serialized_copy, id_mapping
+        )
+        field.link_row_related_field = related_field
+        field.save()
+        return field
+
     def import_serialized(
         self,
         table: "Table",
         serialized_values: Dict[str, Any],
         id_mapping: Dict[str, Any],
     ) -> Optional[Field]:
+
+        if id_mapping.get("operation") == "duplicate_table":
+            return self.import_serialized_for_table_duplication(
+                table, serialized_values, id_mapping
+            )
+
         serialized_copy = serialized_values.copy()
         serialized_copy["link_row_table_id"] = id_mapping["database_tables"][
             serialized_copy["link_row_table_id"]
@@ -2935,9 +2994,10 @@ class FormulaFieldType(ReadOnlyFieldType):
         field_cache: "FieldCache",
         via_path_to_starting_table: Optional[List[LinkRowField]],
     ):
-        self._refresh_row_values_if_not_in_starting_table(
-            field, update_collector, field_cache, via_path_to_starting_table
-        )
+        if field.requires_refresh_after_insert:
+            self._refresh_row_values(
+                field, update_collector, field_cache, via_path_to_starting_table
+            )
         super().after_rows_imported(
             field, update_collector, field_cache, via_path_to_starting_table
         )
diff --git a/backend/src/baserow/contrib/database/management/commands/fill_table_fields.py b/backend/src/baserow/contrib/database/management/commands/fill_table_fields.py
index a2855ae37..de0a53300 100644
--- a/backend/src/baserow/contrib/database/management/commands/fill_table_fields.py
+++ b/backend/src/baserow/contrib/database/management/commands/fill_table_fields.py
@@ -54,7 +54,7 @@ class Command(BaseCommand):
 
 def fill_table_fields(limit, table):
     field_handler = FieldHandler()
-    all_kwargs_per_type = construct_all_possible_field_kwargs(None, None, None)
+    all_kwargs_per_type = construct_all_possible_field_kwargs(None, None, None, None)
     first_user = table.database.group.users.first()
     # Keep all fields but link_row and lookup
     allowed_field_list = [
@@ -81,7 +81,7 @@ def fill_table_fields(limit, table):
 
 def create_field_for_every_type(table):
     field_handler = FieldHandler()
-    all_kwargs_per_type = construct_all_possible_field_kwargs(None, None, None)
+    all_kwargs_per_type = construct_all_possible_field_kwargs(None, None, None, None)
     first_user = table.database.group.users.first()
     i = 0
     for field_type_name, all_possible_kwargs in all_kwargs_per_type.items():
diff --git a/backend/src/baserow/contrib/database/migrations/0084_duplicatetablejob.py b/backend/src/baserow/contrib/database/migrations/0084_duplicatetablejob.py
new file mode 100644
index 000000000..6704302a7
--- /dev/null
+++ b/backend/src/baserow/contrib/database/migrations/0084_duplicatetablejob.py
@@ -0,0 +1,71 @@
+# Generated by Django 3.2.13 on 2022-07-22 09:33
+
+from django.db import migrations, models
+import django.db.models.deletion
+
+
+class Migration(migrations.Migration):
+
+    dependencies = [
+        ("core", "0029_duplicateapplicationjob"),
+        ("database", "0083_form_field_options_conditions"),
+    ]
+
+    operations = [
+        migrations.CreateModel(
+            name="DuplicateTableJob",
+            fields=[
+                (
+                    "job_ptr",
+                    models.OneToOneField(
+                        auto_created=True,
+                        on_delete=django.db.models.deletion.CASCADE,
+                        parent_link=True,
+                        primary_key=True,
+                        serialize=False,
+                        to="core.job",
+                    ),
+                ),
+                (
+                    "user_session_id",
+                    models.CharField(
+                        help_text="The user session uuid needed for undo/redo functionality.",
+                        max_length=36,
+                        null=True,
+                    ),
+                ),
+                (
+                    "user_websocket_id",
+                    models.CharField(
+                        help_text="The user websocket uuid needed to manage signals sent correctly.",
+                        max_length=36,
+                        null=True,
+                    ),
+                ),
+                (
+                    "duplicated_table",
+                    models.OneToOneField(
+                        help_text="The duplicated Baserow table.",
+                        null=True,
+                        on_delete=django.db.models.deletion.SET_NULL,
+                        related_name="duplicated_from_jobs",
+                        to="database.table",
+                    ),
+                ),
+                (
+                    "original_table",
+                    models.ForeignKey(
+                        help_text="The Baserow table to duplicate.",
+                        null=True,
+                        on_delete=django.db.models.deletion.SET_NULL,
+                        related_name="duplicated_by_jobs",
+                        to="database.table",
+                    ),
+                ),
+            ],
+            options={
+                "abstract": False,
+            },
+            bases=("core.job", models.Model),
+        ),
+    ]
diff --git a/backend/src/baserow/contrib/database/table/actions.py b/backend/src/baserow/contrib/database/table/actions.py
index d4263764c..49dcc165b 100644
--- a/backend/src/baserow/contrib/database/table/actions.py
+++ b/backend/src/baserow/contrib/database/table/actions.py
@@ -12,6 +12,7 @@ from baserow.core.action.models import Action
 from baserow.core.action.registries import ActionType, ActionScopeStr
 from baserow.core.action.scopes import ApplicationActionScopeType
 from baserow.core.trash.handler import TrashHandler
+from baserow.core.utils import ChildProgressBuilder
 
 
 class CreateTableActionType(ActionType):
@@ -228,3 +229,52 @@ class UpdateTableActionType(ActionType):
         TableHandler().update_table_by_id(
             user, params.table_id, name=params.new_table_name
         )
+
+
+class DuplicateTableActionType(ActionType):
+    type = "duplicate_table"
+
+    @dataclasses.dataclass
+    class Params:
+        table_id: int
+
+    @classmethod
+    def do(
+        cls,
+        user: AbstractUser,
+        table: Table,
+        progress_builder: Optional[ChildProgressBuilder] = None,
+    ) -> Table:
+        """
+        Duplicate the table.
+        Undoing this action trashes the duplicated table and redoing restores it.
+
+        :param user: The user on whose behalf the table is created.
+        :param table: The name of the table is created.
+        :param progress_builder: A progress builder instance that can be used to
+            track the progress of the duplication.
+        :return: The duplicated table instance.
+        """
+
+        new_table_clone = TableHandler().duplicate_table(
+            user, table, progress_builder=progress_builder
+        )
+        cls.register_action(
+            user, cls.Params(new_table_clone.id), cls.scope(table.database_id)
+        )
+        return new_table_clone
+
+    @classmethod
+    def scope(cls, database_id) -> ActionScopeStr:
+        return ApplicationActionScopeType.value(database_id)
+
+    @classmethod
+    def undo(cls, user: AbstractUser, params: Params, action_being_undone: Action):
+        table = Table.objects.get(id=params.table_id)
+        TableHandler().delete_table(user, table)
+
+    @classmethod
+    def redo(cls, user: AbstractUser, params: Params, action_being_redone: Action):
+        TrashHandler.restore_item(
+            user, "table", params.table_id, parent_trash_item_id=None
+        )
diff --git a/backend/src/baserow/contrib/database/table/handler.py b/backend/src/baserow/contrib/database/table/handler.py
index b9b477ad0..5785a3a7f 100644
--- a/backend/src/baserow/contrib/database/table/handler.py
+++ b/backend/src/baserow/contrib/database/table/handler.py
@@ -11,9 +11,7 @@ from django.utils import translation
 from django.utils.translation import gettext as _
 
 from baserow.core.utils import Progress
-from baserow.contrib.database.db.schema import (
-    safe_django_schema_editor,
-)
+from baserow.contrib.database.db.schema import safe_django_schema_editor
 from baserow.contrib.database.fields.constants import RESERVED_BASEROW_FIELD_NAMES
 from baserow.contrib.database.fields.exceptions import (
     MaxFieldLimitExceeded,
@@ -26,7 +24,13 @@ from baserow.contrib.database.fields.registries import field_type_registry
 from baserow.contrib.database.models import Database
 from baserow.contrib.database.views.handler import ViewHandler
 from baserow.contrib.database.views.view_types import GridViewType
+from baserow.core.registries import application_type_registry
 from baserow.core.trash.handler import TrashHandler
+from baserow.core.utils import (
+    ChildProgressBuilder,
+    find_unused_name,
+    split_ending_number,
+)
 from .exceptions import (
     TableDoesNotExist,
     TableNotInDatabase,
@@ -397,6 +401,77 @@ class TableHandler:
         Table.order_objects(queryset, order)
         tables_reordered.send(self, database=database, order=order, user=user)
 
+    def find_unused_table_name(self, database: Database, proposed_name: str) -> str:
+        """
+        Finds an unused name for a table in a database.
+
+        :param database: The database that the table belongs to.
+        :param proposed_name: The name that is proposed to be used.
+        :return: A unique name to use.
+        """
+
+        existing_tables_names = list(database.table_set.values_list("name", flat=True))
+        name, _ = split_ending_number(proposed_name)
+        return find_unused_name([name], existing_tables_names, max_length=255)
+
+    def _setup_id_mapping_for_table_duplication(
+        self, serialized_table: Dict[str, Any]
+    ) -> Dict[str, Any]:
+        """
+        Sets up the id mapping for a table duplication.
+
+        :param serialized_table: The serialized table.
+        :return: The .
+        """
+
+        # TODO: fix this hack
+        return {"operation": "duplicate_table"}
+
+    def duplicate_table(
+        self,
+        user: AbstractUser,
+        table: Table,
+        progress_builder: Optional[ChildProgressBuilder] = None,
+    ) -> Table:
+        """
+        Duplicates an existing table instance.
+
+        :param user: The user on whose behalf the table is duplicated.
+        :param table: The table instance that needs to be duplicated.
+        :param progress: A progress object that can be used to report progress.
+        :raises ValueError: When the provided table is not an instance of Table.
+        :return: The duplicated table instance.
+        """
+
+        if not isinstance(table, Table):
+            raise ValueError("The table is not an instance of Table")
+
+        progress = ChildProgressBuilder.build(progress_builder, child_total=2)
+
+        database = table.database
+        database.group.has_user(user, raise_error=True)
+        database_type = application_type_registry.get_by_model(database)
+
+        serialized_tables = database_type.export_tables_serialized([table])
+        progress.increment()
+
+        # Set a unique name for the table to import back as a new one.
+        exported_table = serialized_tables[0]
+        exported_table["name"] = self.find_unused_table_name(database, table.name)
+
+        imported_tables = database_type.import_tables_serialized(
+            database,
+            [exported_table],
+            self._setup_id_mapping_for_table_duplication(exported_table),
+        )
+        progress.increment()
+
+        new_table_clone = imported_tables[0]
+
+        table_created.send(self, table=new_table_clone, user=user)
+
+        return new_table_clone
+
     def delete_table_by_id(self, user: AbstractUser, table_id: int):
         """
         Moves to the trash an existing an existing table instance if the user
diff --git a/backend/src/baserow/contrib/database/table/job_types.py b/backend/src/baserow/contrib/database/table/job_types.py
new file mode 100644
index 000000000..b5a296279
--- /dev/null
+++ b/backend/src/baserow/contrib/database/table/job_types.py
@@ -0,0 +1,64 @@
+from rest_framework import serializers
+from baserow.contrib.database.api.tables.serializers import TableSerializer
+from baserow.contrib.database.db.atomic import (
+    read_repeatable_read_single_table_transaction,
+)
+from baserow.contrib.database.table.actions import DuplicateTableActionType
+from baserow.contrib.database.table.handler import TableHandler
+from baserow.contrib.database.table.models import DuplicateTableJob
+
+from baserow.core.exceptions import UserNotInGroup, GroupDoesNotExist
+from baserow.core.jobs.registries import JobType
+from baserow.api.errors import ERROR_USER_NOT_IN_GROUP, ERROR_GROUP_DOES_NOT_EXIST
+
+from baserow.core.action.registries import action_type_registry
+
+
+class DuplicateTableJobType(JobType):
+    type = "duplicate_table"
+    model_class = DuplicateTableJob
+    max_count = 1
+
+    api_exceptions_map = {
+        UserNotInGroup: ERROR_USER_NOT_IN_GROUP,
+        GroupDoesNotExist: ERROR_GROUP_DOES_NOT_EXIST,
+    }
+
+    request_serializer_field_names = ["table_id"]
+
+    request_serializer_field_overrides = {
+        "table_id": serializers.IntegerField(
+            help_text="The ID of the table to duplicate.",
+        ),
+    }
+
+    serializer_field_names = ["original_table", "duplicated_table"]
+    serializer_field_overrides = {
+        "original_table": TableSerializer(read_only=True),
+        "duplicated_table": TableSerializer(read_only=True),
+    }
+
+    def transaction_atomic_context(self, job: "DuplicateTableJob"):
+        return read_repeatable_read_single_table_transaction(job.original_table.id)
+
+    def prepare_values(self, values, user):
+
+        table = TableHandler().get_table(values.pop("table_id"))
+        table.database.group.has_user(user, raise_error=True)
+
+        return {
+            "original_table": table,
+        }
+
+    def run(self, job, progress):
+        new_table_clone = action_type_registry.get_by_type(DuplicateTableActionType).do(
+            job.user,
+            job.original_table,
+            progress.create_child_builder(represents_progress=progress.total),
+        )
+
+        # update the job with the new duplicated table
+        job.duplicated_table = new_table_clone
+        job.save(update_fields=("duplicated_table",))
+
+        return new_table_clone
diff --git a/backend/src/baserow/contrib/database/table/models.py b/backend/src/baserow/contrib/database/table/models.py
index 90252bca9..d683e2b22 100644
--- a/backend/src/baserow/contrib/database/table/models.py
+++ b/backend/src/baserow/contrib/database/table/models.py
@@ -24,11 +24,13 @@ from baserow.contrib.database.table.cache import (
 from baserow.contrib.database.views.exceptions import ViewFilterTypeNotAllowedForField
 from baserow.contrib.database.views.registries import view_filter_type_registry
 from baserow.core.db import specific_iterator
+from baserow.core.jobs.models import Job
 from baserow.core.mixins import (
     OrderableMixin,
     CreatedAndUpdatedOnMixin,
     TrashableModelMixin,
 )
+from baserow.core.jobs.mixins import JobWithUserDataMixin
 from baserow.core.utils import split_comma_separated_string
 
 deconstruct_filter_key_regex = re.compile(r"filter__field_([0-9]+)__([a-zA-Z0-9_]*)$")
@@ -654,3 +656,23 @@ class Table(
     # tables.
     def get_collision_safe_order_id_idx_name(self):
         return f"tbl_order_id_{self.id}_idx"
+
+
+class DuplicateTableJob(JobWithUserDataMixin, Job):
+
+    user_data_to_save = ["user_websocket_id"]
+
+    original_table = models.ForeignKey(
+        Table,
+        null=True,
+        related_name="duplicated_by_jobs",
+        on_delete=models.SET_NULL,
+        help_text="The Baserow table to duplicate.",
+    )
+    duplicated_table = models.OneToOneField(
+        Table,
+        null=True,
+        related_name="duplicated_from_jobs",
+        on_delete=models.SET_NULL,
+        help_text="The duplicated Baserow table.",
+    )
diff --git a/backend/src/baserow/contrib/database/views/registries.py b/backend/src/baserow/contrib/database/views/registries.py
index 4a6f04c62..649409a90 100644
--- a/backend/src/baserow/contrib/database/views/registries.py
+++ b/backend/src/baserow/contrib/database/views/registries.py
@@ -238,7 +238,7 @@ class ViewType(
         table: "Table",
         serialized_values: Dict[str, Any],
         id_mapping: Dict[str, Any],
-        files_zip: ZipFile,
+        files_zip: Optional[ZipFile] = None,
         storage: Optional[Storage] = None,
     ) -> "View":
         """
diff --git a/backend/src/baserow/core/actions.py b/backend/src/baserow/core/actions.py
index b794c01b3..e043ad7d0 100644
--- a/backend/src/baserow/core/actions.py
+++ b/backend/src/baserow/core/actions.py
@@ -12,7 +12,7 @@ from baserow.core.action.scopes import (
 from baserow.core.handler import GroupForUpdate, CoreHandler
 from baserow.core.models import GroupUser, Group, Application
 from baserow.core.trash.handler import TrashHandler
-from baserow.core.utils import Progress
+from baserow.core.utils import ChildProgressBuilder
 
 
 class DeleteGroupActionType(ActionType):
@@ -448,7 +448,7 @@ class DuplicateApplicationActionType(ActionType):
         cls,
         user: AbstractUser,
         application: Application,
-        progress: Optional[Progress] = None,
+        progress_builder: Optional[ChildProgressBuilder] = None,
     ) -> Application:
         """
         Duplicate an existing application instance.
@@ -457,11 +457,15 @@ class DuplicateApplicationActionType(ActionType):
 
         :param user: The user on whose behalf the application is duplicated.
         :param application: The application instance that needs to be duplicated.
+        :param progress_builder: A progress builder instance that can be used to
+            track the progress of the duplication.
         :return: The new (duplicated) application instance.
         """
 
         new_application_clone = CoreHandler().duplicate_application(
-            user, application, progress
+            user,
+            application,
+            progress_builder,
         )
 
         params = cls.Params(new_application_clone.id)
diff --git a/backend/src/baserow/core/handler.py b/backend/src/baserow/core/handler.py
index 7b80fc5a2..305230662 100644
--- a/backend/src/baserow/core/handler.py
+++ b/backend/src/baserow/core/handler.py
@@ -19,9 +19,6 @@ from itsdangerous import URLSafeSerializer
 from tqdm import tqdm
 
 from baserow.core.user.utils import normalize_email_address
-from baserow.core.utils import (
-    ChildProgressBuilder,
-)
 from .emails import GroupInvitationEmail
 from .exceptions import (
     UserNotInGroup,
@@ -67,7 +64,7 @@ from .signals import (
     groups_reordered,
 )
 from .trash.handler import TrashHandler
-from .utils import Progress, find_unused_name, set_allowed_attrs
+from .utils import ChildProgressBuilder, find_unused_name, set_allowed_attrs
 
 User = get_user_model()
 
@@ -777,9 +774,9 @@ class CoreHandler:
         """
         Finds an unused name for an application.
 
+        :param group_id: The group id that the application belongs to.
         :param proposed_name: The name that is proposed to be used.
-        :param group: The group that the application belongs to.
-        :return: The name that is unused.
+        :return: A unique name to use.
         """
 
         existing_applications_names = self.list_applications_in_group(
@@ -814,7 +811,7 @@ class CoreHandler:
         self,
         user: AbstractUser,
         application: Application,
-        progress: Optional[Progress] = None,
+        progress_builder: Optional[ChildProgressBuilder] = None,
     ) -> Application:
         """
         Duplicates an existing application instance.
@@ -827,14 +824,13 @@ class CoreHandler:
         group = application.group
         group.has_user(user, raise_error=True)
 
-        if progress is None:
-            progress = Progress(100)
+        progress = ChildProgressBuilder.build(progress_builder, child_total=2)
 
         # export the application
         specific_application = application.specific
         application_type = application_type_registry.get_by_model(specific_application)
         serialized = application_type.export_serialized(specific_application)
-        progress.increment(50)
+        progress.increment()
 
         # Set a new unique name for the new application
         serialized["name"] = self.find_unused_application_name(
@@ -846,7 +842,7 @@ class CoreHandler:
         new_application_clone = application_type.import_serialized(
             group, serialized, id_mapping
         )
-        progress.increment(50)
+        progress.increment()
 
         # broadcast the application_created signal
         application_created.send(
diff --git a/backend/src/baserow/core/job_types.py b/backend/src/baserow/core/job_types.py
index fe85cc3bc..212233ece 100644
--- a/backend/src/baserow/core/job_types.py
+++ b/backend/src/baserow/core/job_types.py
@@ -63,7 +63,11 @@ class DuplicateApplicationJobType(JobType):
 
         new_application_clone = action_type_registry.get_by_type(
             DuplicateApplicationActionType
-        ).do(job.user, job.original_application, progress)
+        ).do(
+            job.user,
+            job.original_application,
+            progress.create_child_builder(represents_progress=progress.total),
+        )
 
         # update the job with the new duplicated application
         job.duplicated_application = new_application_clone
diff --git a/backend/src/baserow/core/models.py b/backend/src/baserow/core/models.py
index c3a5f1b4d..eac719625 100644
--- a/backend/src/baserow/core/models.py
+++ b/backend/src/baserow/core/models.py
@@ -421,6 +421,9 @@ class TrashEntry(models.Model):
 
 
 class DuplicateApplicationJob(JobWithUserDataMixin, Job):
+
+    user_data_to_save = ["user_websocket_id"]
+
     original_application = models.ForeignKey(
         Application,
         null=True,
diff --git a/backend/src/baserow/core/utils.py b/backend/src/baserow/core/utils.py
index 0dac3734f..f46fa2f7b 100644
--- a/backend/src/baserow/core/utils.py
+++ b/backend/src/baserow/core/utils.py
@@ -11,7 +11,7 @@ import io
 from collections import namedtuple
 from decimal import Decimal
 from itertools import islice
-from typing import List, Optional, Iterable
+from typing import List, Optional, Iterable, Tuple
 
 from django.db.models import ForeignKey
 from django.db.models.fields import NOT_PROVIDED
@@ -358,6 +358,21 @@ def remove_invalid_surrogate_characters(content: bytes) -> str:
     return re.sub(r"\\u(d|D)([a-z|A-Z|0-9]{3})", "", content.decode("utf-8", "ignore"))
 
 
+def split_ending_number(name: str) -> Tuple[str, str]:
+    """
+    Splits a string into two parts, the first part is the string before the last
+    number, the second part is the last number.
+
+    :param string: The string to split.
+    :return: A tuple with the first part and the second part.
+    """
+
+    match = re.search(r"(.+) (\d+)$", name)
+    if match:
+        return match.group(1), match.group(2)
+    return name, ""
+
+
 def find_unused_name(
     variants_to_try: Iterable[str],
     existing_names: Iterable[str],
diff --git a/backend/src/baserow/test_utils/helpers.py b/backend/src/baserow/test_utils/helpers.py
index 7dcbcd41e..a13e3d198 100644
--- a/backend/src/baserow/test_utils/helpers.py
+++ b/backend/src/baserow/test_utils/helpers.py
@@ -85,7 +85,7 @@ def setup_interesting_test_table(
     )
     handler = FieldHandler()
     all_possible_kwargs_per_type = construct_all_possible_field_kwargs(
-        link_table, decimal_link_table, file_link_table
+        table, link_table, decimal_link_table, file_link_table
     )
     name_to_field_id = {}
     i = 0
@@ -143,6 +143,7 @@ def setup_interesting_test_table(
         "created_on_date_eu": None,
         # We will setup link rows manually later
         "link_row": None,
+        "self_link_row": None,
         "decimal_link_row": None,
         "file_link_row": None,
         "file": [
@@ -182,6 +183,18 @@ def setup_interesting_test_table(
         "formula_email": "test@example.com",
     }
 
+    with freeze_time("2020-02-01 01:23"):
+        data_fixture.create_user_file(
+            original_name=f"a.txt",
+            unique=f"hashed{file_suffix}",
+            sha256_hash="name",
+        )
+        data_fixture.create_user_file(
+            original_name=f"b.txt",
+            unique=f"other{file_suffix}",
+            sha256_hash="name",
+        )
+
     missing_fields = set(name_to_field_id.keys()) - set(values.keys()) - {"lookup"}
     assert missing_fields == set(), (
         "Please update the dictionary above with interesting test values for your new "
diff --git a/backend/templates/car-maintenance-log.json b/backend/templates/car-maintenance-log.json
index 9d2471717..b2bfffbb9 100644
--- a/backend/templates/car-maintenance-log.json
+++ b/backend/templates/car-maintenance-log.json
@@ -1183,13 +1183,6 @@
                   "hidden": false,
                   "order": 32767
                 },
-                {
-                  "id": 212809,
-                  "field_id": 192289,
-                  "width": 200,
-                  "hidden": false,
-                  "order": 32767
-                },
                 {
                   "id": 212812,
                   "field_id": 192291,
@@ -1480,13 +1473,6 @@
                   "hidden": false,
                   "order": 32767
                 },
-                {
-                  "id": 212808,
-                  "field_id": 192287,
-                  "width": 200,
-                  "hidden": false,
-                  "order": 32767
-                },
                 {
                   "id": 212813,
                   "field_id": 192289,
diff --git a/backend/templates/contract-management.json b/backend/templates/contract-management.json
index 10456e76e..fe0a0ce01 100644
--- a/backend/templates/contract-management.json
+++ b/backend/templates/contract-management.json
@@ -2714,13 +2714,6 @@
                                     "hidden": false,
                                     "order": 32767
                                 },
-                                {
-                                    "id": 324485,
-                                    "field_id": 273340,
-                                    "width": 200,
-                                    "hidden": false,
-                                    "order": 32767
-                                },
                                 {
                                     "id": 324502,
                                     "field_id": 273382,
diff --git a/backend/templates/product-roadmap.json b/backend/templates/product-roadmap.json
index 3059dc9cb..b5be070c8 100644
--- a/backend/templates/product-roadmap.json
+++ b/backend/templates/product-roadmap.json
@@ -2789,15 +2789,6 @@
                                     "aggregation_type": "",
                                     "aggregation_raw_type": ""
                                 },
-                                {
-                                    "id": 376678,
-                                    "field_id": 308738,
-                                    "width": 256,
-                                    "hidden": false,
-                                    "order": 4,
-                                    "aggregation_type": "",
-                                    "aggregation_raw_type": ""
-                                },
                                 {
                                     "id": 377184,
                                     "field_id": 308986,
diff --git a/backend/tests/baserow/contrib/database/api/rows/test_row_serializers.py b/backend/tests/baserow/contrib/database/api/rows/test_row_serializers.py
index 86422132b..8b94e5aa8 100644
--- a/backend/tests/baserow/contrib/database/api/rows/test_row_serializers.py
+++ b/backend/tests/baserow/contrib/database/api/rows/test_row_serializers.py
@@ -272,6 +272,7 @@ def test_get_row_serializer_with_user_field_names(data_fixture):
                     {"id": 2, "value": ""},
                 ],
                 "id": 2,
+                "self_link_row": [],
                 "link_row": [
                     {"id": 1, "value": "linked_row_1"},
                     {"id": 2, "value": "linked_row_2"},
diff --git a/backend/tests/baserow/contrib/database/api/tables/test_table_views.py b/backend/tests/baserow/contrib/database/api/tables/test_table_views.py
index 72219fa85..2e323e5b7 100644
--- a/backend/tests/baserow/contrib/database/api/tables/test_table_views.py
+++ b/backend/tests/baserow/contrib/database/api/tables/test_table_views.py
@@ -1,4 +1,5 @@
 import json
+from pytest_unordered import unordered
 from unittest.mock import patch
 
 import pytest
@@ -8,6 +9,7 @@ from django.test.utils import CaptureQueriesContext
 from django.test.utils import override_settings
 from rest_framework.status import (
     HTTP_200_OK,
+    HTTP_202_ACCEPTED,
     HTTP_204_NO_CONTENT,
     HTTP_400_BAD_REQUEST,
     HTTP_404_NOT_FOUND,
@@ -15,7 +17,10 @@ from rest_framework.status import (
 
 from baserow.contrib.database.file_import.models import FileImportJob
 from baserow.contrib.database.table.models import Table
-from baserow.test_utils.helpers import independent_test_db_connection
+from baserow.test_utils.helpers import (
+    independent_test_db_connection,
+    setup_interesting_test_table,
+)
 
 
 @pytest.mark.django_db
@@ -489,3 +494,143 @@ def test_delete_table_still_if_locked_for_key_share(api_client, data_fixture):
                 HTTP_AUTHORIZATION=f"JWT {token}",
             )
     assert response.status_code == HTTP_204_NO_CONTENT
+
+
+@pytest.mark.django_db(transaction=True)
+def test_async_duplicate_table(api_client, data_fixture):
+    user_1, token_1 = data_fixture.create_user_and_token(
+        email="test_1@test.nl", password="password", first_name="Test1"
+    )
+    group_1 = data_fixture.create_group(user=user_1)
+    _, token_2 = data_fixture.create_user_and_token(
+        email="test_2@test.nl", password="password", first_name="Test2"
+    )
+    _, token_3 = data_fixture.create_user_and_token(
+        email="test_3@test.nl",
+        password="password",
+        first_name="Test3",
+        group=group_1,
+    )
+
+    database = data_fixture.create_database_application(group=group_1)
+    table_1, _, _, _ = setup_interesting_test_table(
+        data_fixture, database=database, user=user_1
+    )
+
+    # user_2 cannot duplicate a table of other groups
+    response = api_client.post(
+        reverse("api:database:tables:async_duplicate", kwargs={"table_id": table_1.id}),
+        format="json",
+        HTTP_AUTHORIZATION=f"JWT {token_2}",
+    )
+    assert response.status_code == HTTP_400_BAD_REQUEST
+    assert response.json()["error"] == "ERROR_USER_NOT_IN_GROUP"
+
+    # cannot duplicate non-existent application
+    response = api_client.post(
+        reverse("api:database:tables:async_duplicate", kwargs={"table_id": 99999}),
+        format="json",
+        HTTP_AUTHORIZATION=f"JWT {token_1}",
+    )
+    assert response.status_code == HTTP_404_NOT_FOUND
+    assert response.json()["error"] == "ERROR_TABLE_DOES_NOT_EXIST"
+
+    # user can duplicate an application created by other in the same group
+    response = api_client.post(
+        reverse("api:database:tables:async_duplicate", kwargs={"table_id": table_1.id}),
+        format="json",
+        HTTP_AUTHORIZATION=f"JWT {token_3}",
+    )
+    assert response.status_code == HTTP_202_ACCEPTED
+    job = response.json()
+    assert job["id"] is not None
+    assert job["state"] == "pending"
+    assert job["type"] == "duplicate_table"
+
+    # check that now the job ended correctly and the application was duplicated
+    response = api_client.get(
+        reverse(
+            "api:jobs:item",
+            kwargs={"job_id": job["id"]},
+        ),
+        HTTP_AUTHORIZATION=f"JWT {token_3}",
+    )
+    assert response.status_code == HTTP_200_OK
+    job = response.json()
+    assert job["state"] == "finished"
+    assert job["type"] == "duplicate_table"
+    assert job["original_table"]["id"] == table_1.id
+    assert job["original_table"]["name"] == table_1.name
+    assert job["duplicated_table"]["id"] != table_1.id
+    assert job["duplicated_table"]["name"] == f"{table_1.name} 2"
+
+    # check that old tables rows are still accessible
+    rows_url = reverse("api:database:rows:list", kwargs={"table_id": table_1.id})
+    response = api_client.get(
+        f"{rows_url}?user_field_names=true",
+        format="json",
+        HTTP_AUTHORIZATION=f"JWT {token_1}",
+    )
+    assert response.status_code == HTTP_200_OK
+    response_json = response.json()
+    assert len(response_json["results"]) > 0
+    original_rows = response_json["results"]
+
+    # check the new rows have the same values of the old
+    duplicated_table_id = job["duplicated_table"]["id"]
+    rows_url = reverse(
+        "api:database:rows:list", kwargs={"table_id": duplicated_table_id}
+    )
+    response = api_client.get(
+        f"{rows_url}?user_field_names=true",
+        format="json",
+        HTTP_AUTHORIZATION=f"JWT {token_1}",
+    )
+    assert response.status_code == HTTP_200_OK
+    response_json = response.json()
+    assert len(response_json["results"]) > 0
+    duplicated_rows = response_json["results"]
+
+    def assert_row_field_value(
+        field_name, duplicated_value, original_value, ordered=True
+    ):
+        if ordered:
+            assert (
+                duplicated_value == original_value
+            ), f"{field_name}: {duplicated_value} != {original_value}"
+        else:
+            assert unordered(duplicated_value, original_value)
+
+    for original_row, duplicated_row in zip(original_rows, duplicated_rows):
+        for field_name, original_value in original_row.items():
+
+            if not original_value:
+                assert_row_field_value(
+                    field_name, duplicated_row[field_name], original_value
+                )
+            elif field_name in ["single_select", "formula_singleselect"]:
+                assert_row_field_value(
+                    field_name,
+                    duplicated_row[field_name]["value"],
+                    original_value["value"],
+                )
+            elif field_name in ["multiple_select", "lookup"] or field_name.endswith(
+                "_link_row"
+            ):
+                assert_row_field_value(
+                    field_name,
+                    [v["value"] for v in duplicated_row[field_name]],
+                    [v["value"] for v in original_value],
+                    ordered=False,
+                )
+            elif field_name == "file":
+                assert_row_field_value(
+                    field_name,
+                    [f["name"] for f in duplicated_row[field_name]],
+                    [f["name"] for f in original_value],
+                    ordered=False,
+                )
+            else:
+                assert_row_field_value(
+                    field_name, duplicated_row[field_name], original_value
+                )
diff --git a/backend/tests/baserow/contrib/database/export/test_export_handler.py b/backend/tests/baserow/contrib/database/export/test_export_handler.py
index 3c4cb8059..c3dce0b03 100644
--- a/backend/tests/baserow/contrib/database/export/test_export_handler.py
+++ b/backend/tests/baserow/contrib/database/export/test_export_handler.py
@@ -224,19 +224,19 @@ def test_can_export_every_interesting_different_field_to_csv(
         "datetime_eu,date_eu,last_modified_datetime_us,last_modified_date_us,"
         "last_modified_datetime_eu,last_modified_date_eu,created_on_datetime_us,"
         "created_on_date_us,created_on_datetime_eu,created_on_date_eu,link_row,"
-        "decimal_link_row,file_link_row,file,single_select,"
+        "self_link_row,decimal_link_row,file_link_row,file,single_select,"
         "multiple_select,phone_number,formula_text,formula_int,formula_bool,"
         "formula_decimal,formula_dateinterval,formula_date,formula_singleselect,"
         "formula_email,lookup\r\n"
         "1,,,,,,,,,0,False,,,,,01/02/2021 13:00,01/02/2021,02/01/2021 13:00,02/01/2021,"
-        "01/02/2021 13:00,01/02/2021,02/01/2021 13:00,02/01/2021,,,,,,,,test FORMULA,"
+        "01/02/2021 13:00,01/02/2021,02/01/2021 13:00,02/01/2021,,,,,,,,,test FORMULA,"
         "1,True,33.3333333333,1 day,2020-01-01,,,"
         "\r\n"
         "2,text,long_text,https://www.google.com,test@example.com,-1,1,-1.2,1.2,3,True,"
         "02/01/2020 01:23,02/01/2020,01/02/2020 01:23,01/02/2020,"
         "01/02/2021 13:00,01/02/2021,02/01/2021 13:00,02/01/2021,"
         "01/02/2021 13:00,01/02/2021,02/01/2021 13:00,02/01/2021,"
-        '"linked_row_1,linked_row_2,unnamed row 3","1.234,-123.456,unnamed row 3",'
+        '"linked_row_1,linked_row_2,unnamed row 3",,"1.234,-123.456,unnamed row 3",'
         '"visible_name=name.txt url=http://localhost:8000/media/user_files/test_hash'
         '.txt,unnamed row 2",'
         '"visible_name=a.txt url=http://localhost:8000/media/user_files/hashed_name.txt'
diff --git a/backend/tests/baserow/contrib/database/field/test_field_handler.py b/backend/tests/baserow/contrib/database/field/test_field_handler.py
index a36eec638..56e042f2a 100644
--- a/backend/tests/baserow/contrib/database/field/test_field_handler.py
+++ b/backend/tests/baserow/contrib/database/field/test_field_handler.py
@@ -75,6 +75,7 @@ def test_can_convert_between_all_fields(data_fixture):
     # created. Here the kwargs which control these modes are enumerated so we can then
     # generate every possible type of conversion.
     all_possible_kwargs_per_type = construct_all_possible_field_kwargs(
+        table,
         Table.objects.get(name="link_table"),
         Table.objects.get(name="decimal_link_table"),
         Table.objects.get(name="file_link_table"),
diff --git a/backend/tests/baserow/contrib/database/field/test_field_types.py b/backend/tests/baserow/contrib/database/field/test_field_types.py
index 7c1d17687..c333c8d99 100644
--- a/backend/tests/baserow/contrib/database/field/test_field_types.py
+++ b/backend/tests/baserow/contrib/database/field/test_field_types.py
@@ -544,6 +544,7 @@ def test_human_readable_values(data_fixture):
         "positive_decimal": "",
         "positive_int": "",
         "rating": "0",
+        "self_link_row": "",
         "single_select": "",
         "multiple_select": "",
         "text": "",
@@ -584,6 +585,7 @@ def test_human_readable_values(data_fixture):
         "positive_decimal": "1.2",
         "positive_int": "1",
         "rating": "3",
+        "self_link_row": "",
         "single_select": "A",
         "multiple_select": "D, C, E",
         "text": "text",
diff --git a/backend/tests/baserow/contrib/database/table/test_table_actions.py b/backend/tests/baserow/contrib/database/table/test_table_actions.py
index d93ea50f8..a7a2d85e1 100644
--- a/backend/tests/baserow/contrib/database/table/test_table_actions.py
+++ b/backend/tests/baserow/contrib/database/table/test_table_actions.py
@@ -1,4 +1,6 @@
 import pytest
+from baserow.contrib.database.table.exceptions import TableDoesNotExist
+from baserow.contrib.database.table.handler import TableHandler
 
 from baserow.core.action.scopes import (
     ApplicationActionScopeType,
@@ -10,15 +12,38 @@ from baserow.core.action.registries import (
 from baserow.contrib.database.table.actions import (
     CreateTableActionType,
     DeleteTableActionType,
+    DuplicateTableActionType,
     OrderTableActionType,
     UpdateTableActionType,
 )
 from baserow.contrib.database.table.models import Table
+from baserow.test_utils.helpers import (
+    assert_undo_redo_actions_are_valid,
+    setup_interesting_test_table,
+)
 
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_creating_table(data_fixture):
+def test_can_undo_create_table(data_fixture):
+    session_id = "session-id"
+    user = data_fixture.create_user(session_id=session_id)
+    database = data_fixture.create_database_application(user=user)
+
+    table, _ = action_type_registry.get_by_type(CreateTableActionType).do(
+        user, database, name="Test 1"
+    )
+
+    actions_undone = ActionHandler.undo(
+        user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
+    )
+    assert_undo_redo_actions_are_valid(actions_undone, [CreateTableActionType])
+    assert Table.objects.count() == 0
+
+
+@pytest.mark.django_db
+@pytest.mark.undo_redo
+def test_can_undo_redo_create_table(data_fixture):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     database = data_fixture.create_database_application(user=user)
@@ -33,34 +58,34 @@ def test_can_undo_creating_table(data_fixture):
 
     assert Table.objects.filter(pk=table.id).count() == 0
 
-
-@pytest.mark.django_db
-@pytest.mark.undo_redo
-def test_can_undo_redo_creating_table(data_fixture):
-    session_id = "session-id"
-    user = data_fixture.create_user(session_id=session_id)
-    database = data_fixture.create_database_application(user=user)
-
-    table, _ = action_type_registry.get_by_type(CreateTableActionType).do(
-        user, database, name="Test 1"
-    )
-
-    ActionHandler.undo(
+    actions_redone = ActionHandler.redo(
         user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
     )
-
-    assert Table.objects.filter(pk=table.id).count() == 0
-
-    ActionHandler.redo(
-        user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
-    )
-
+    assert_undo_redo_actions_are_valid(actions_redone, [CreateTableActionType])
     assert Table.objects.filter(pk=table.id).count() == 1
 
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_deleting_table(data_fixture):
+def test_can_undo_delete_table(data_fixture):
+    session_id = "session-id"
+    user = data_fixture.create_user(session_id=session_id)
+    database = data_fixture.create_database_application(user=user)
+    table = data_fixture.create_database_table(database=database, user=user)
+
+    action_type_registry.get_by_type(DeleteTableActionType).do(user, table)
+    assert Table.objects.count() == 0
+
+    actions_undone = ActionHandler.undo(
+        user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
+    )
+    assert_undo_redo_actions_are_valid(actions_undone, [DeleteTableActionType])
+    assert Table.objects.count() == 1
+
+
+@pytest.mark.django_db
+@pytest.mark.undo_redo
+def test_can_undo_redo_delete_table(data_fixture):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     database = data_fixture.create_database_application(user=user)
@@ -74,33 +99,16 @@ def test_can_undo_deleting_table(data_fixture):
 
     assert Table.objects.filter(pk=table.id).count() == 1
 
-
-@pytest.mark.django_db
-@pytest.mark.undo_redo
-def test_can_undo_redo_deleting_table(data_fixture):
-    session_id = "session-id"
-    user = data_fixture.create_user(session_id=session_id)
-    database = data_fixture.create_database_application(user=user)
-    table = data_fixture.create_database_table(database=database, user=user)
-
-    action_type_registry.get_by_type(DeleteTableActionType).do(user, table)
-
-    ActionHandler.undo(
+    actions_redone = ActionHandler.redo(
         user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
     )
-
-    assert Table.objects.filter(pk=table.id).count() == 1
-
-    ActionHandler.redo(
-        user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
-    )
-
+    assert_undo_redo_actions_are_valid(actions_redone, [DeleteTableActionType])
     assert Table.objects.filter(pk=table.id).count() == 0
 
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_ordering_tables(data_fixture):
+def test_can_undo_order_tables(data_fixture):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     database = data_fixture.create_database_application(user=user)
@@ -121,15 +129,16 @@ def test_can_undo_ordering_tables(data_fixture):
     )
     assert get_tables_order() == new_order
 
-    ActionHandler.undo(
+    actions_undone = ActionHandler.undo(
         user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
     )
+    assert_undo_redo_actions_are_valid(actions_undone, [OrderTableActionType])
     assert get_tables_order() == original_order
 
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_redo_ordering_tables(data_fixture):
+def test_can_undo_redo_order_tables(data_fixture):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     database = data_fixture.create_database_application(user=user)
@@ -155,15 +164,16 @@ def test_can_undo_redo_ordering_tables(data_fixture):
     )
     assert get_tables_order() == original_order
 
-    ActionHandler.redo(
+    actions_redone = ActionHandler.redo(
         user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
     )
+    assert_undo_redo_actions_are_valid(actions_redone, [OrderTableActionType])
     assert get_tables_order() == new_order
 
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_updating_table(data_fixture):
+def test_can_undo_update_table(data_fixture):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     database = data_fixture.create_database_application(user=user)
@@ -178,17 +188,17 @@ def test_can_undo_updating_table(data_fixture):
     )
     assert table.name == new_table_name
 
-    ActionHandler.undo(
+    actions_undone = ActionHandler.undo(
         user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
     )
-
+    assert_undo_redo_actions_are_valid(actions_undone, [UpdateTableActionType])
     table.refresh_from_db()
     assert table.name == original_table_name
 
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_redo_updating_table(data_fixture):
+def test_can_undo_redo_update_table(data_fixture):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     database = data_fixture.create_database_application(user=user)
@@ -210,9 +220,129 @@ def test_can_undo_redo_updating_table(data_fixture):
     table.refresh_from_db()
     assert table.name == original_table_name
 
-    ActionHandler.redo(
+    actions_redone = ActionHandler.redo(
+        user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
+    )
+    assert_undo_redo_actions_are_valid(actions_redone, [UpdateTableActionType])
+    table.refresh_from_db()
+    assert table.name == new_table_name
+
+
+@pytest.mark.django_db
+@pytest.mark.undo_redo
+def test_can_undo_duplicate_simple_table(data_fixture):
+    session_id = "session-id"
+    user = data_fixture.create_user(session_id=session_id)
+    database = data_fixture.create_database_application(user=user)
+
+    original_table_name = "original-table-name"
+    table = data_fixture.create_database_table(
+        database=database, user=user, name=original_table_name
+    )
+
+    duplicated_table = action_type_registry.get_by_type(DuplicateTableActionType).do(
+        user, table
+    )
+    assert Table.objects.count() == 2
+    assert duplicated_table.name == f"{original_table_name} 2"
+
+    actions_undone = ActionHandler.undo(
         user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
     )
 
-    table.refresh_from_db()
-    assert table.name == new_table_name
+    assert_undo_redo_actions_are_valid(actions_undone, [DuplicateTableActionType])
+    assert Table.objects.count() == 1
+
+
+@pytest.mark.django_db
+@pytest.mark.undo_redo
+def test_can_undo_redo_duplicate_simple_table(data_fixture):
+    session_id = "session-id"
+    user = data_fixture.create_user(session_id=session_id)
+    database = data_fixture.create_database_application(user=user)
+
+    original_table_name = "original-table-name"
+    table = data_fixture.create_database_table(
+        database=database, user=user, name=original_table_name
+    )
+
+    duplicated_table = action_type_registry.get_by_type(DuplicateTableActionType).do(
+        user, table
+    )
+    assert Table.objects.count() == 2
+    assert duplicated_table.name == f"{original_table_name} 2"
+
+    ActionHandler.undo(
+        user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
+    )
+
+    assert Table.objects.count() == 1
+
+    actions_redone = ActionHandler.redo(
+        user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
+    )
+    assert_undo_redo_actions_are_valid(actions_redone, [DuplicateTableActionType])
+    assert Table.objects.count() == 2
+
+
+@pytest.mark.django_db
+@pytest.mark.undo_redo
+def test_can_undo_duplicate_interesting_table(data_fixture):
+    session_id = "session-id"
+    user = data_fixture.create_user(session_id=session_id)
+    database = data_fixture.create_database_application(user=user)
+
+    original_table_name = "original-table-name"
+    table, _, _, _ = setup_interesting_test_table(
+        data_fixture, user, database, original_table_name
+    )
+
+    duplicated_table = action_type_registry.get_by_type(DuplicateTableActionType).do(
+        user, table
+    )
+    table_handler = TableHandler()
+    assert (
+        table_handler.get_table(duplicated_table.id).name == f"{original_table_name} 2"
+    )
+
+    actions_undone = ActionHandler.undo(
+        user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
+    )
+
+    assert_undo_redo_actions_are_valid(actions_undone, [DuplicateTableActionType])
+    with pytest.raises(TableDoesNotExist):
+        table_handler.get_table(duplicated_table.id)
+
+
+@pytest.mark.django_db
+@pytest.mark.undo_redo
+def test_can_undo_redo_duplicate_interesting_table(data_fixture):
+    session_id = "session-id"
+    user = data_fixture.create_user(session_id=session_id)
+    database = data_fixture.create_database_application(user=user)
+
+    original_table_name = "original-table-name"
+    table, _, _, _ = setup_interesting_test_table(
+        data_fixture, user, database, original_table_name
+    )
+
+    duplicated_table = action_type_registry.get_by_type(DuplicateTableActionType).do(
+        user, table
+    )
+    table_handler = TableHandler()
+    assert (
+        table_handler.get_table(duplicated_table.id).name == f"{original_table_name} 2"
+    )
+
+    ActionHandler.undo(
+        user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
+    )
+
+    actions_redone = ActionHandler.redo(
+        user, [ApplicationActionScopeType.value(application_id=database.id)], session_id
+    )
+
+    assert_undo_redo_actions_are_valid(actions_redone, [DuplicateTableActionType])
+    assert (
+        table_handler.get_table(duplicated_table.id).name == f"{original_table_name} 2"
+    )
diff --git a/backend/tests/baserow/contrib/database/table/test_table_handler.py b/backend/tests/baserow/contrib/database/table/test_table_handler.py
index e0748b1c4..54a268789 100644
--- a/backend/tests/baserow/contrib/database/table/test_table_handler.py
+++ b/backend/tests/baserow/contrib/database/table/test_table_handler.py
@@ -28,6 +28,7 @@ from baserow.contrib.database.table.exceptions import (
     InitialTableDataLimitExceeded,
 )
 from baserow.contrib.database.fields.models import (
+    LinkRowField,
     TextField,
     LongTextField,
     BooleanField,
@@ -36,6 +37,7 @@ from baserow.contrib.database.views.models import GridView, GridViewFieldOptions
 from baserow.core.handler import CoreHandler
 from baserow.core.models import TrashEntry
 from baserow.core.trash.handler import TrashHandler
+from baserow.test_utils.helpers import setup_interesting_test_table
 
 
 @pytest.mark.django_db
@@ -629,3 +631,37 @@ def test_get_total_row_count_of_group(data_fixture):
     TableHandler.count_rows()
 
     assert TableHandler.get_total_row_count_of_group(group.id) == 10
+
+
+@pytest.mark.django_db
+@pytest.mark.undo_redo
+def test_duplicate_interesting_table(data_fixture):
+    session_id = "session-id"
+    user = data_fixture.create_user(session_id=session_id)
+    database = data_fixture.create_database_application(user=user)
+
+    original_table_name = "original-table-name"
+    table, _, _, _ = setup_interesting_test_table(
+        data_fixture, user, database, original_table_name
+    )
+
+    table_handler = TableHandler()
+    duplicated_table = table_handler.duplicate_table(user, table)
+    assert (
+        table_handler.get_table(duplicated_table.id).name == f"{original_table_name} 2"
+    )
+
+    # check link_row fields referencing other tables has been cloned correctly,
+    # while self-referencing fields now points to the new duplicated table
+    for field_object in duplicated_table.get_model()._field_objects.values():
+        field_instance = field_object["field"]
+        if not isinstance(field_instance, LinkRowField):
+            continue
+
+        if field_instance.name == "self_link_row":
+            assert field_instance.link_row_table_id == duplicated_table.id
+        else:
+            linkrow_fields = field_instance.link_row_table.linkrowfield_set.all()
+            original_link, duplicated_link = linkrow_fields
+            assert original_link.name == duplicated_link.name
+            assert original_link.link_row_table_id == duplicated_link.link_row_table_id
diff --git a/backend/tests/baserow/contrib/database/table/test_table_job_types.py b/backend/tests/baserow/contrib/database/table/test_table_job_types.py
new file mode 100644
index 000000000..77786d374
--- /dev/null
+++ b/backend/tests/baserow/contrib/database/table/test_table_job_types.py
@@ -0,0 +1,69 @@
+import pytest
+
+from django.db import transaction
+from baserow.contrib.database.table.handler import TableHandler
+from baserow.contrib.database.table.models import Table
+from baserow.core.action.handler import ActionHandler
+from baserow.core.action.scopes import ApplicationActionScopeType
+from baserow.contrib.database.table.job_types import DuplicateTableJobType
+from baserow.core.jobs.constants import JOB_FINISHED
+from baserow.core.jobs.handler import JobHandler
+
+
+@pytest.mark.django_db(transaction=True)
+def test_can_submit_duplicate_table_job(data_fixture):
+    session_id = "session-id"
+    user = data_fixture.create_user(session_id=session_id)
+
+    database = data_fixture.create_database_application(user=user)
+    table = data_fixture.create_database_table(database=database, user=user)
+
+    assert Table.objects.count() == 1
+
+    duplicate_table_job = JobHandler().create_and_start_job(
+        user,
+        DuplicateTableJobType.type,
+        table_id=table.id,
+    )
+
+    assert Table.objects.count() == 2
+
+    duplicate_table_job.refresh_from_db()
+    assert duplicate_table_job.state == JOB_FINISHED
+    assert duplicate_table_job.original_table_id == table.id
+    assert duplicate_table_job.duplicated_table_id is not None
+
+    duplicated_table = TableHandler().get_table(duplicate_table_job.duplicated_table_id)
+    assert duplicated_table.name == f"{table.name} 2"
+
+    # assert table_id is mandatory
+    with pytest.raises(KeyError):
+        JobHandler().create_and_start_job(user, DuplicateTableJobType.type)
+
+    assert Table.objects.count() == 2
+
+
+@pytest.mark.django_db(transaction=True)
+@pytest.mark.undo_redo
+def test_cannot_undo_duplicate_table_job(data_fixture):
+    session_id = "session-id"
+    user = data_fixture.create_user(session_id=session_id)
+    database = data_fixture.create_database_application(user=user)
+    table = data_fixture.create_database_table(database=database, user=user)
+
+    JobHandler().create_and_start_job(
+        user,
+        DuplicateTableJobType.type,
+        table_id=table.id,
+        user_session_id=session_id,
+    )
+
+    assert Table.objects.count() == 2
+
+    with transaction.atomic():
+        actions_undone = ActionHandler.undo(
+            user,
+            [ApplicationActionScopeType.value(application_id=database.id)],
+            session_id,
+        )
+        assert actions_undone == []
diff --git a/backend/tests/baserow/core/actions/test_application_actions.py b/backend/tests/baserow/core/actions/test_application_actions.py
index f0c2529cf..02895366d 100644
--- a/backend/tests/baserow/core/actions/test_application_actions.py
+++ b/backend/tests/baserow/core/actions/test_application_actions.py
@@ -49,7 +49,7 @@ def test_can_undo_redo_order_applications(data_fixture, django_assert_num_querie
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_creating_application(data_fixture, django_assert_num_queries):
+def test_can_undo_create_application(data_fixture, django_assert_num_queries):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     group = data_fixture.create_group(user=user)
@@ -69,7 +69,7 @@ def test_can_undo_creating_application(data_fixture, django_assert_num_queries):
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_redo_creating_application(data_fixture, django_assert_num_queries):
+def test_can_undo_redo_create_application(data_fixture, django_assert_num_queries):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     group = data_fixture.create_group(user=user)
@@ -92,7 +92,7 @@ def test_can_undo_redo_creating_application(data_fixture, django_assert_num_quer
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_deleteing_application(data_fixture):
+def test_can_undo_delete_application(data_fixture):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     group = data_fixture.create_group(user=user)
@@ -116,7 +116,7 @@ def test_can_undo_deleteing_application(data_fixture):
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_redo_deleting_application(data_fixture, django_assert_num_queries):
+def test_can_undo_redo_delete_application(data_fixture, django_assert_num_queries):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     group = data_fixture.create_group(user=user)
@@ -144,7 +144,7 @@ def test_can_undo_redo_deleting_application(data_fixture, django_assert_num_quer
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_updating_application(data_fixture, django_assert_num_queries):
+def test_can_undo_update_application(data_fixture, django_assert_num_queries):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     group = data_fixture.create_group(user=user)
@@ -171,7 +171,7 @@ def test_can_undo_updating_application(data_fixture, django_assert_num_queries):
 
 @pytest.mark.django_db
 @pytest.mark.undo_redo
-def test_can_undo_redo_updating_application(data_fixture, django_assert_num_queries):
+def test_can_undo_redo_update_application(data_fixture, django_assert_num_queries):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     group = data_fixture.create_group(user=user)
diff --git a/backend/tests/baserow/core/test_core_job_types.py b/backend/tests/baserow/core/test_core_job_types.py
index 4742ae278..69a51bde6 100644
--- a/backend/tests/baserow/core/test_core_job_types.py
+++ b/backend/tests/baserow/core/test_core_job_types.py
@@ -105,7 +105,7 @@ def test_can_submit_duplicate_application_job(data_fixture):
 
 
 @pytest.mark.django_db(transaction=True)
-def test_can_undo_redo_duplicate_application_job(data_fixture):
+def test_cannot_undo_duplicate_application_job(data_fixture):
     session_id = "session-id"
     user = data_fixture.create_user(session_id=session_id)
     group = data_fixture.create_group(user=user)
@@ -126,14 +126,8 @@ def test_can_undo_redo_duplicate_application_job(data_fixture):
     assert Application.objects.count() == 2
 
     with transaction.atomic():
-        ActionHandler.undo(
+        actions_undone = ActionHandler.undo(
             user, [GroupActionScopeType.value(group_id=group.id)], session_id
         )
 
-        assert Application.objects.count() == 1
-
-        ActionHandler.redo(
-            user, [GroupActionScopeType.value(group_id=group.id)], session_id
-        )
-
-        assert Application.objects.count() == 2
+        assert actions_undone == []
diff --git a/changelog.md b/changelog.md
index 72c9e53e5..72ed6d9cd 100644
--- a/changelog.md
+++ b/changelog.md
@@ -22,6 +22,7 @@ For example:
 * Added option to use view's filters and sorting when listing rows. [#190](https://gitlab.com/bramw/baserow/-/issues/190)
 * Fixed bug with 404 middleware returning different 404 error messages based on the endpoint.
 * Made it possible to import data into an existing table. [#342](https://gitlab.com/bramw/baserow/-/issues/342)
+* Tables can now be duplicated. [#961](https://gitlab.com/bramw/baserow/-/issues/961)
 
 ### Bug Fixes
 
diff --git a/premium/backend/tests/baserow_premium/export/test_premium_export_types.py b/premium/backend/tests/baserow_premium/export/test_premium_export_types.py
index d563ddd56..90b9858a9 100644
--- a/premium/backend/tests/baserow_premium/export/test_premium_export_types.py
+++ b/premium/backend/tests/baserow_premium/export/test_premium_export_types.py
@@ -62,6 +62,7 @@ def test_can_export_every_interesting_different_field_to_json(
     "created_on_datetime_eu": "02/01/2021 13:00",
     "created_on_date_eu": "02/01/2021",
     "link_row": [],
+    "self_link_row": [],
     "decimal_link_row": [],
     "file_link_row": [],
     "file": [],
@@ -107,6 +108,7 @@ def test_can_export_every_interesting_different_field_to_json(
         "linked_row_2",
         "unnamed row 3"
     ],
+    "self_link_row": [],
     "decimal_link_row": [
         "1.234",
         "-123.456",
@@ -260,6 +262,7 @@ def test_can_export_every_interesting_different_field_to_xml(
     <created-on-datetime-eu>02/01/2021 13:00</created-on-datetime-eu>
     <created-on-date-eu>02/01/2021</created-on-date-eu>
     <link-row/>
+    <self-link-row/>
     <decimal-link-row/>
     <file-link-row/>
     <file/>
@@ -305,6 +308,7 @@ def test_can_export_every_interesting_different_field_to_xml(
         <item>linked_row_2</item>
         <item>unnamed row 3</item>
     </link-row>
+    <self-link-row/>
     <decimal-link-row>
         <item>1.234</item>
         <item>-123.456</item>
diff --git a/web-frontend/locales/en.json b/web-frontend/locales/en.json
index c9d007d21..85fc7dac9 100644
--- a/web-frontend/locales/en.json
+++ b/web-frontend/locales/en.json
@@ -17,6 +17,7 @@
         "change": "Change",
         "delete": "Delete",
         "rename": "Rename",
+        "duplicate": "Duplicate",
         "add": "Add",
         "makeChoice": "Make a choice",
         "cancel": "Cancel",
diff --git a/web-frontend/modules/database/components/sidebar/SidebarItem.vue b/web-frontend/modules/database/components/sidebar/SidebarItem.vue
index c961b7fe2..120a61a2b 100644
--- a/web-frontend/modules/database/components/sidebar/SidebarItem.vue
+++ b/web-frontend/modules/database/components/sidebar/SidebarItem.vue
@@ -41,6 +41,17 @@
             {{ $t('action.rename') }}
           </a>
         </li>
+        <li>
+          <SidebarDuplicateTableContextItem
+            :database="database"
+            :table="table"
+            :disabled="deleteLoading"
+            @table-duplicated="
+              $refs.context.hide()
+              selectTable(database, $event.table)
+            "
+          ></SidebarDuplicateTableContextItem>
+        </li>
         <li>
           <a
             :class="{ 'context__menu-item--loading': deleteLoading }"
@@ -65,10 +76,15 @@
 import { notifyIf } from '@baserow/modules/core/utils/error'
 import ExportTableModal from '@baserow/modules/database/components/export/ExportTableModal'
 import WebhookModal from '@baserow/modules/database/components/webhook/WebhookModal'
+import SidebarDuplicateTableContextItem from '@baserow/modules/database/components/sidebar/table/SidebarDuplicateTableContextItem'
 
 export default {
   name: 'SidebarItem',
-  components: { ExportTableModal, WebhookModal },
+  components: {
+    ExportTableModal,
+    WebhookModal,
+    SidebarDuplicateTableContextItem,
+  },
   props: {
     database: {
       type: Object,
diff --git a/web-frontend/modules/database/components/sidebar/table/SidebarDuplicateTableContextItem.vue b/web-frontend/modules/database/components/sidebar/table/SidebarDuplicateTableContextItem.vue
new file mode 100644
index 000000000..d5d80a2aa
--- /dev/null
+++ b/web-frontend/modules/database/components/sidebar/table/SidebarDuplicateTableContextItem.vue
@@ -0,0 +1,91 @@
+<template>
+  <a
+    :class="{
+      'context__menu-item--loading': loading,
+      disabled: disabled || loading,
+    }"
+    @click="duplicateTable()"
+  >
+    <i class="context__menu-icon fas fa-fw fa-copy"></i>
+    {{ $t('action.duplicate') }}
+  </a>
+</template>
+
+<script>
+import { notifyIf } from '@baserow/modules/core/utils/error'
+import TableService from '@baserow/modules/database/services/table'
+import jobProgress from '@baserow/modules/core/mixins/jobProgress'
+
+export default {
+  name: 'SidebarDuplicateTableContextItem',
+  mixins: [jobProgress],
+  props: {
+    database: {
+      type: Object,
+      required: true,
+    },
+    table: {
+      type: Object,
+      required: true,
+    },
+    disabled: {
+      type: Boolean,
+      required: false,
+      default: false,
+    },
+  },
+  data() {
+    return {
+      loading: false,
+    }
+  },
+  methods: {
+    showError(title, message) {
+      this.$store.dispatch(
+        'notification/error',
+        { title, message },
+        { root: true }
+      )
+    },
+    // eslint-disable-next-line require-await
+    async onJobFailed() {
+      this.loading = false
+      this.showError(
+        this.$t('clientHandler.notCompletedTitle'),
+        this.$t('clientHandler.notCompletedDescription')
+      )
+    },
+    // eslint-disable-next-line require-await
+    async onJobPollingError(error) {
+      this.loading = false
+      notifyIf(error, 'table')
+    },
+    async onJobDone() {
+      const database = this.database
+      const table = this.job.duplicated_table
+      await this.$store.dispatch('table/forceCreate', {
+        database,
+        data: table,
+      })
+      this.loading = false
+      this.$emit('table-duplicated', { table })
+    },
+    async duplicateTable() {
+      if (this.loading || this.disabled) {
+        return
+      }
+
+      this.loading = true
+      try {
+        const { data: job } = await TableService(this.$client).asyncDuplicate(
+          this.table.id
+        )
+        this.startJobPoller(job)
+      } catch (error) {
+        this.loading = false
+        notifyIf(error, 'table')
+      }
+    },
+  },
+}
+</script>
diff --git a/web-frontend/modules/database/services/table.js b/web-frontend/modules/database/services/table.js
index f221f8d5a..ca79e6705 100644
--- a/web-frontend/modules/database/services/table.js
+++ b/web-frontend/modules/database/services/table.js
@@ -39,6 +39,9 @@ export default (client) => {
         table_ids: order,
       })
     },
+    asyncDuplicate(tableId) {
+      return client.post(`/database/tables/${tableId}/duplicate/async/`)
+    },
     delete(tableId) {
       return client.delete(`/database/tables/${tableId}/`)
     },
diff --git a/web-frontend/modules/database/store/table.js b/web-frontend/modules/database/store/table.js
index a4d65ade9..caf7f921b 100644
--- a/web-frontend/modules/database/store/table.js
+++ b/web-frontend/modules/database/store/table.js
@@ -99,6 +99,22 @@ export const actions = {
     // The returned data is a table creation job
     return data
   },
+  /**
+   * Fetches one table for the authenticated user.
+   */
+  async fetch({ commit, dispatch }, { database, tableId }) {
+    commit('SET_LOADING', true)
+
+    try {
+      const { data } = await TableService(this.$client).get(tableId)
+      dispatch('forceCreate', { database, data })
+      commit('SET_LOADING', false)
+      return data
+    } catch (error) {
+      commit('SET_LOADING', false)
+      throw error
+    }
+  },
   /**
    * Forcefully create an item in the store without making a call to the server.
    */