diff --git a/enterprise/backend/src/baserow_enterprise/api/integrations/local_baserow/serializers.py b/enterprise/backend/src/baserow_enterprise/api/integrations/local_baserow/serializers.py
index 1fb0d376a..a1919915c 100644
--- a/enterprise/backend/src/baserow_enterprise/api/integrations/local_baserow/serializers.py
+++ b/enterprise/backend/src/baserow_enterprise/api/integrations/local_baserow/serializers.py
@@ -3,6 +3,7 @@ from rest_framework import serializers
 from baserow_enterprise.integrations.local_baserow.models import (
     LocalBaserowTableServiceAggregationGroupBy,
     LocalBaserowTableServiceAggregationSeries,
+    LocalBaserowTableServiceAggregationSortBy,
 )
 
 
@@ -22,3 +23,11 @@ class LocalBaserowTableServiceAggregationGroupBySerializer(serializers.ModelSeri
     class Meta:
         model = LocalBaserowTableServiceAggregationGroupBy
         fields = ("order", "field_id")
+
+
+class LocalBaserowTableServiceAggregationSortBySerializer(serializers.ModelSerializer):
+    order = serializers.IntegerField(read_only=True)
+
+    class Meta:
+        model = LocalBaserowTableServiceAggregationSortBy
+        fields = ("order", "sort_on", "reference", "direction")
diff --git a/enterprise/backend/src/baserow_enterprise/integrations/local_baserow/models.py b/enterprise/backend/src/baserow_enterprise/integrations/local_baserow/models.py
index 00f9c1c83..a10e2b621 100644
--- a/enterprise/backend/src/baserow_enterprise/integrations/local_baserow/models.py
+++ b/enterprise/backend/src/baserow_enterprise/integrations/local_baserow/models.py
@@ -118,3 +118,35 @@ class LocalBaserowTableServiceAggregationGroupBy(models.Model):
 
     class Meta:
         ordering = ("order", "id")
+
+
+class SortOn(models.TextChoices):
+    SERIES = "SERIES", "Series"
+    GROUP_BY = "GROUP_BY", "Group by"
+    PRIMARY = "PRIMARY", "Primary"
+
+
+class SortDirection(models.TextChoices):
+    ASCENDING = "ASC", "Ascending"
+    DESCENDING = "DESC", "Descending"
+
+
+class LocalBaserowTableServiceAggregationSortBy(models.Model):
+    """
+    A sort by for aggregations applicable to a `LocalBaserowTableService`
+    integration service.
+    """
+
+    service = models.ForeignKey(
+        Service,
+        related_name="service_aggregation_sorts",
+        help_text="The service which this aggregation series belongs to.",
+        on_delete=models.CASCADE,
+    )
+    sort_on = models.CharField(max_length=255, choices=SortOn.choices)
+    reference = models.CharField(max_length=255)
+    direction = models.CharField(max_length=255, choices=SortDirection.choices)
+    order = models.PositiveIntegerField()
+
+    class Meta:
+        ordering = ("order", "id")
diff --git a/enterprise/backend/src/baserow_enterprise/integrations/local_baserow/service_types.py b/enterprise/backend/src/baserow_enterprise/integrations/local_baserow/service_types.py
index 061755b99..bd0046f23 100644
--- a/enterprise/backend/src/baserow_enterprise/integrations/local_baserow/service_types.py
+++ b/enterprise/backend/src/baserow_enterprise/integrations/local_baserow/service_types.py
@@ -1,7 +1,5 @@
-from typing import TYPE_CHECKING, Type
-
 from django.conf import settings
-from django.db.models import OrderBy, QuerySet
+from django.db.models import F
 
 from rest_framework.exceptions import ValidationError as DRFValidationError
 
@@ -12,23 +10,20 @@ from baserow.contrib.integrations.local_baserow.integration_types import (
 )
 from baserow.contrib.integrations.local_baserow.mixins import (
     LocalBaserowTableServiceFilterableMixin,
-    LocalBaserowTableServiceSortableMixin,
-)
-from baserow.contrib.integrations.local_baserow.models import (
-    LocalBaserowTableServiceSort,
-    Service,
 )
+from baserow.contrib.integrations.local_baserow.models import Service
 from baserow.contrib.integrations.local_baserow.service_types import (
     LocalBaserowViewServiceType,
 )
 from baserow.core.services.dispatch_context import DispatchContext
 from baserow.core.services.exceptions import ServiceImproperlyConfigured
 from baserow.core.services.registries import DispatchTypes
-from baserow.core.services.types import DispatchResult, ServiceSortDictSubClass
+from baserow.core.services.types import DispatchResult
 from baserow.core.utils import atomic_if_not_already
 from baserow_enterprise.api.integrations.local_baserow.serializers import (
     LocalBaserowTableServiceAggregationGroupBySerializer,
     LocalBaserowTableServiceAggregationSeriesSerializer,
+    LocalBaserowTableServiceAggregationSortBySerializer,
 )
 from baserow_enterprise.integrations.local_baserow.models import (
     LocalBaserowGroupedAggregateRows,
@@ -37,20 +32,18 @@ from baserow_enterprise.integrations.registries import grouped_aggregation_regis
 from baserow_enterprise.services.types import (
     ServiceAggregationGroupByDict,
     ServiceAggregationSeriesDict,
+    ServiceAggregationSortByDict,
 )
 
 from .models import (
     LocalBaserowTableServiceAggregationGroupBy,
     LocalBaserowTableServiceAggregationSeries,
+    LocalBaserowTableServiceAggregationSortBy,
 )
 
-if TYPE_CHECKING:
-    from baserow.contrib.database.table.models import GeneratedTableModel
-
 
 class LocalBaserowGroupedAggregateRowsUserServiceType(
     LocalBaserowTableServiceFilterableMixin,
-    LocalBaserowTableServiceSortableMixin,
     LocalBaserowViewServiceType,
 ):
     """
@@ -62,10 +55,7 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
     type = "local_baserow_grouped_aggregate_rows"
     model_class = LocalBaserowGroupedAggregateRows
     dispatch_type = DispatchTypes.DISPATCH_DATA_SOURCE
-    serializer_mixins = (
-        LocalBaserowTableServiceFilterableMixin.mixin_serializer_mixins
-        + LocalBaserowTableServiceSortableMixin.mixin_serializer_mixins
-    )
+    serializer_mixins = LocalBaserowTableServiceFilterableMixin.mixin_serializer_mixins
 
     def get_schema_name(self, service: LocalBaserowGroupedAggregateRows) -> str:
         return f"GroupedAggregation{service.id}Schema"
@@ -80,7 +70,9 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
             super()
             .enhance_queryset(queryset)
             .prefetch_related(
-                "service_aggregation_series", "service_aggregation_group_bys"
+                "service_aggregation_series",
+                "service_aggregation_group_bys",
+                "service_aggregation_sorts",
             )
         )
 
@@ -96,21 +88,22 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
         return (
             super().serializer_field_names
             + LocalBaserowTableServiceFilterableMixin.mixin_serializer_field_names
-            + LocalBaserowTableServiceSortableMixin.mixin_serializer_field_names
-        ) + ["aggregation_series", "aggregation_group_bys"]
+        ) + ["aggregation_series", "aggregation_group_bys", "aggregation_sorts"]
 
     @property
     def serializer_field_overrides(self):
         return {
             **super().serializer_field_overrides,
             **LocalBaserowTableServiceFilterableMixin.mixin_serializer_field_overrides,
-            **LocalBaserowTableServiceSortableMixin.mixin_serializer_field_overrides,
             "aggregation_series": LocalBaserowTableServiceAggregationSeriesSerializer(
                 many=True, source="service_aggregation_series", required=False
             ),
             "aggregation_group_bys": LocalBaserowTableServiceAggregationGroupBySerializer(
                 many=True, source="service_aggregation_group_bys", required=False
             ),
+            "aggregation_sorts": LocalBaserowTableServiceAggregationSortBySerializer(
+                many=True, source="service_aggregation_sorts", required=False
+            ),
         }
 
     class SerializedDict(
@@ -119,6 +112,7 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
     ):
         service_aggregation_series: list[ServiceAggregationSeriesDict]
         service_aggregation_group_bys: list[ServiceAggregationGroupByDict]
+        service_aggregation_sorts: list[ServiceAggregationSortByDict]
 
     def _update_service_aggregation_series(
         self,
@@ -224,48 +218,43 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
                     ]
                 )
 
-    def _update_service_sortings(
+    def _update_service_sorts(
         self,
         service: LocalBaserowGroupedAggregateRows,
-        service_sorts: list[ServiceSortDictSubClass] | None = None,
+        service_sorts: list[ServiceAggregationSortByDict] | None = None,
     ):
         with atomic_if_not_already():
-            service.service_sorts.all().delete()
+            service.service_aggregation_sorts.all().delete()
             if service_sorts is not None:
-                table_field_ids = service.table.field_set.values_list("id", flat=True)
                 model = service.table.get_model()
 
-                allowed_sort_field_ids = [
-                    series.field_id
+                allowed_sort_references = [
+                    f"field_{series.field_id}_{series.aggregation_type}"
                     for series in service.service_aggregation_series.all()
+                    if series.aggregation_type is not None
+                    and series.field_id is not None
                 ]
 
                 if service.service_aggregation_group_bys.count() > 0:
                     group_by = service.service_aggregation_group_bys.all()[0]
-                    allowed_sort_field_ids += (
-                        [group_by.field_id]
+                    allowed_sort_references += (
+                        [f"field_{group_by.field_id}"]
                         if group_by.field_id is not None
-                        else [model.get_primary_field().id]
+                        else [f"field_{model.get_primary_field().id}"]
                     )
 
                 def validate_sort(service_sort):
-                    if service_sort["field"].id not in table_field_ids:
+                    if service_sort["reference"] not in allowed_sort_references:
                         raise DRFValidationError(
-                            detail=f"The field with ID {service_sort['field'].id} is not "
-                            "related to the given table.",
-                            code="invalid_field",
-                        )
-                    if service_sort["field"].id not in allowed_sort_field_ids:
-                        raise DRFValidationError(
-                            detail=f"The field with ID {service_sort['field'].id} cannot be used for sorting.",
-                            code="invalid_field",
+                            detail=f"The reference sort '{service_sort['reference']}' cannot be used for sorting.",
+                            code="invalid",
                         )
 
                     return True
 
-                LocalBaserowTableServiceSort.objects.bulk_create(
+                LocalBaserowTableServiceAggregationSortBy.objects.bulk_create(
                     [
-                        LocalBaserowTableServiceSort(
+                        LocalBaserowTableServiceAggregationSortBy(
                             **service_sort, service=service, order=index
                         )
                         for index, service_sort in enumerate(service_sorts)
@@ -290,8 +279,10 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
             self._update_service_aggregation_group_bys(
                 instance, values.pop("service_aggregation_group_bys")
             )
-        if "service_sorts" in values:
-            self._update_service_sortings(instance, values.pop("service_sorts"))
+        if "service_aggregation_sorts" in values:
+            self._update_service_sorts(
+                instance, values.pop("service_aggregation_sorts")
+            )
 
     def after_update(
         self,
@@ -328,10 +319,12 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
         elif from_table and to_table:
             instance.service_aggregation_group_bys.all().delete()
 
-        if "service_sorts" in values:
-            self._update_service_sortings(instance, values.pop("service_sorts"))
+        if "service_aggregation_sorts" in values:
+            self._update_service_sorts(
+                instance, values.pop("service_aggregation_sorts")
+            )
         elif from_table and to_table:
-            instance.service_sorts.all().delete()
+            instance.service_aggregation_sorts.all().delete()
 
     def export_prepared_values(self, instance: Service) -> dict[str, any]:
         values = super().export_prepared_values(instance)
@@ -382,16 +375,6 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
             **kwargs,
         )
 
-    def get_dispatch_sorts(
-        self,
-        service: LocalBaserowGroupedAggregateRows,
-        queryset: QuerySet,
-        model: Type["GeneratedTableModel"],
-    ) -> tuple[list[OrderBy], QuerySet]:
-        service_sorts = service.service_sorts.all()
-        sort_ordering = [service_sort.get_order_by() for service_sort in service_sorts]
-        return sort_ordering, queryset
-
     def dispatch_data(
         self,
         service: LocalBaserowGroupedAggregateRows,
@@ -412,24 +395,6 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
         model = self.get_table_model(service)
         queryset = self.build_queryset(service, table, dispatch_context, model=model)
 
-        allowed_sort_field_ids = [
-            series.field_id for series in service.service_aggregation_series.all()
-        ]
-
-        if service.service_aggregation_group_bys.count() > 0:
-            group_by = service.service_aggregation_group_bys.all()[0]
-            allowed_sort_field_ids += (
-                [group_by.field_id]
-                if group_by.field_id is not None
-                else [model.get_primary_field().id]
-            )
-
-        for sort_by in service.service_sorts.all():
-            if sort_by.field_id not in allowed_sort_field_ids:
-                raise ServiceImproperlyConfigured(
-                    f"The field with ID {sort_by.field.id} cannot be used for sorting."
-                )
-
         group_by_values = []
         for group_by in service.service_aggregation_group_bys.all():
             if group_by.field is None:
@@ -482,6 +447,54 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
                 queryset = queryset.annotate(**value.annotations)
                 combined_agg_dict[key] = value.aggregation
 
+        allowed_sort_references = [
+            f"field_{series.field_id}_{series.aggregation_type}"
+            for series in service.service_aggregation_series.all()
+            if series.aggregation_type is not None and series.field_id is not None
+        ]
+
+        if service.service_aggregation_group_bys.count() > 0:
+            group_by = service.service_aggregation_group_bys.all()[0]
+            allowed_sort_references += (
+                [f"field_{group_by.field_id}"]
+                if group_by.field_id is not None
+                else [f"field_{model.get_primary_field().id}"]
+            )
+
+        sorts = []
+        sort_annotations = {}
+        for sort_by in service.service_aggregation_sorts.all():
+            if sort_by.reference not in allowed_sort_references:
+                raise ServiceImproperlyConfigured(
+                    f"The sort reference '{sort_by.reference}' cannot be used for sorting."
+                )
+
+            if sort_by.sort_on == "SERIES":
+                expression = F(f"{sort_by.reference}_raw")
+                if sort_by.direction == "ASC":
+                    expression = expression.asc(nulls_first=True)
+                else:
+                    expression = expression.desc(nulls_last=True)
+                sorts.append(expression)
+            else:
+                field_obj = model.get_field_object(sort_by.reference)
+                field_type = field_obj["type"]
+                field_annotated_order_by = field_type.get_order(
+                    field=field_obj["field"],
+                    field_name=sort_by.reference,
+                    order_direction=sort_by.direction,
+                )
+                if field_annotated_order_by.annotation is not None:
+                    sort_annotations = {
+                        **sort_annotations,
+                        **field_annotated_order_by.annotation,
+                    }
+                field_order_bys = field_annotated_order_by.order_bys
+                for field_order_by in field_order_bys:
+                    sorts.append(field_order_by)
+
+        queryset = queryset.annotate(**sort_annotations)
+
         def process_individual_result(result: dict):
             for agg_series in defined_agg_series:
                 key = f"{agg_series.field.db_column}_{agg_series.aggregation_type}"
@@ -495,9 +508,12 @@ class LocalBaserowGroupedAggregateRowsUserServiceType(
             return result
 
         if len(group_by_values) > 0:
-            queryset = queryset.annotate(**combined_agg_dict)[
+            queryset = queryset.annotate(**combined_agg_dict)
+            queryset = queryset.order_by(*sorts)
+            queryset = queryset[
                 : settings.BASEROW_ENTERPRISE_GROUPED_AGGREGATE_SERVICE_MAX_AGG_BUCKETS
             ]
+
             results = [process_individual_result(result) for result in queryset]
         else:
             results = queryset.aggregate(**combined_agg_dict)
diff --git a/enterprise/backend/src/baserow_enterprise/migrations/0042_localbaserowtableserviceaggregationsortby.py b/enterprise/backend/src/baserow_enterprise/migrations/0042_localbaserowtableserviceaggregationsortby.py
new file mode 100644
index 000000000..c050e9d62
--- /dev/null
+++ b/enterprise/backend/src/baserow_enterprise/migrations/0042_localbaserowtableserviceaggregationsortby.py
@@ -0,0 +1,63 @@
+# Generated by Django 5.0.9 on 2025-03-03 02:31
+
+import django.db.models.deletion
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+    dependencies = [
+        (
+            "baserow_enterprise",
+            "0041_alter_localbaserowtableserviceaggregationseries_field",
+        ),
+        ("core", "0094_alter_importexportresource_size"),
+    ]
+
+    operations = [
+        migrations.CreateModel(
+            name="LocalBaserowTableServiceAggregationSortBy",
+            fields=[
+                (
+                    "id",
+                    models.AutoField(
+                        auto_created=True,
+                        primary_key=True,
+                        serialize=False,
+                        verbose_name="ID",
+                    ),
+                ),
+                (
+                    "sort_on",
+                    models.CharField(
+                        choices=[
+                            ("SERIES", "Series"),
+                            ("GROUP_BY", "Group by"),
+                            ("PRIMARY", "Primary"),
+                        ],
+                        max_length=255,
+                    ),
+                ),
+                ("reference", models.CharField(max_length=255)),
+                (
+                    "direction",
+                    models.CharField(
+                        choices=[("ASC", "Ascending"), ("DESC", "Descending")],
+                        max_length=255,
+                    ),
+                ),
+                ("order", models.PositiveIntegerField()),
+                (
+                    "service",
+                    models.ForeignKey(
+                        help_text="The service which this aggregation series belongs to.",
+                        on_delete=django.db.models.deletion.CASCADE,
+                        related_name="service_aggregation_sorts",
+                        to="core.service",
+                    ),
+                ),
+            ],
+            options={
+                "ordering": ("order", "id"),
+            },
+        ),
+    ]
diff --git a/enterprise/backend/src/baserow_enterprise/services/types.py b/enterprise/backend/src/baserow_enterprise/services/types.py
index 082d70fd0..e1a97f869 100644
--- a/enterprise/backend/src/baserow_enterprise/services/types.py
+++ b/enterprise/backend/src/baserow_enterprise/services/types.py
@@ -8,3 +8,9 @@ class ServiceAggregationSeriesDict(TypedDict):
 
 class ServiceAggregationGroupByDict(TypedDict):
     field_id: int
+
+
+class ServiceAggregationSortByDict(TypedDict):
+    sort_on: str
+    reference: str
+    direction: str
diff --git a/enterprise/backend/tests/baserow_enterprise_tests/api/dashboard/test_grouped_aggregate_rows_data_source_type.py b/enterprise/backend/tests/baserow_enterprise_tests/api/dashboard/test_grouped_aggregate_rows_data_source_type.py
index 895282ef1..53011f54d 100644
--- a/enterprise/backend/tests/baserow_enterprise_tests/api/dashboard/test_grouped_aggregate_rows_data_source_type.py
+++ b/enterprise/backend/tests/baserow_enterprise_tests/api/dashboard/test_grouped_aggregate_rows_data_source_type.py
@@ -4,14 +4,12 @@ from rest_framework.status import HTTP_200_OK
 
 from baserow.contrib.database.rows.handler import RowHandler
 from baserow.contrib.database.views.models import SORT_ORDER_ASC
-from baserow.contrib.integrations.local_baserow.models import (
-    LocalBaserowTableServiceSort,
-)
 from baserow.test_utils.helpers import AnyDict, AnyInt
 from baserow_enterprise.integrations.local_baserow.models import (
     LocalBaserowGroupedAggregateRows,
     LocalBaserowTableServiceAggregationGroupBy,
     LocalBaserowTableServiceAggregationSeries,
+    LocalBaserowTableServiceAggregationSortBy,
 )
 
 
@@ -39,6 +37,13 @@ def test_grouped_aggregate_rows_get_dashboard_data_sources(
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=data_source1.service, field=field_3, order=1
     )
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=data_source1.service,
+        sort_on="GROUP_BY",
+        reference=f"field_{field_3.id}",
+        direction="ASC",
+        order=1,
+    )
     enterprise_data_fixture.create_local_baserow_table_service_sort(
         service=data_source1.service,
         field=field_3,
@@ -74,13 +79,12 @@ def test_grouped_aggregate_rows_get_dashboard_data_sources(
         "dashboard_id": dashboard.id,
         "filter_type": "AND",
         "filters": [],
-        "sortings": [
+        "aggregation_sorts": [
             {
-                "field": field_3.id,
-                "id": AnyInt(),
-                "trashed": False,
-                "order": 2,
-                "order_by": "ASC",
+                "sort_on": "GROUP_BY",
+                "reference": f"field_{field_3.id}",
+                "direction": "ASC",
+                "order": 1,
             }
         ],
         "id": data_source1.id,
@@ -138,7 +142,13 @@ def test_grouped_aggregate_rows_update_data_source(api_client, enterprise_data_f
                 {"field_id": field_2.id, "aggregation_type": "sum"},
             ],
             "aggregation_group_bys": [{"field_id": field_3.id}],
-            "sortings": [{"field": field.id}],
+            "aggregation_sorts": [
+                {
+                    "sort_on": "SERIES",
+                    "reference": f"field_{field.id}_sum",
+                    "direction": "ASC",
+                }
+            ],
         },
         format="json",
         HTTP_AUTHORIZATION=f"JWT {token}",
@@ -164,13 +174,12 @@ def test_grouped_aggregate_rows_update_data_source(api_client, enterprise_data_f
     assert response_json["aggregation_group_bys"] == [
         {"field_id": field_3.id, "order": 0}
     ]
-    assert response_json["sortings"] == [
+    assert response_json["aggregation_sorts"] == [
         {
-            "id": AnyInt(),
-            "field": field.id,
-            "trashed": False,
+            "sort_on": "SERIES",
+            "reference": f"field_{field.id}_sum",
+            "direction": "ASC",
             "order": 0,
-            "order_by": "ASC",
         }
     ]
 
@@ -212,11 +221,19 @@ def test_grouped_aggregate_rows_dispatch_dashboard_data_source(
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=service, field=field, order=1
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field_3, order=1, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="SERIES",
+        reference=f"field_{field_3.id}_sum",
+        order=1,
+        direction="ASC",
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field_2, order=2, order_by="DESC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="SERIES",
+        reference=f"field_{field_2.id}_sum",
+        order=2,
+        direction="DESC",
     )
 
     RowHandler().create_rows(
diff --git a/enterprise/backend/tests/baserow_enterprise_tests/integrations/local_baserow/service_types/test_grouped_aggregate_rows_service_type.py b/enterprise/backend/tests/baserow_enterprise_tests/integrations/local_baserow/service_types/test_grouped_aggregate_rows_service_type.py
index 8fd9ea0ce..e1481ecff 100644
--- a/enterprise/backend/tests/baserow_enterprise_tests/integrations/local_baserow/service_types/test_grouped_aggregate_rows_service_type.py
+++ b/enterprise/backend/tests/baserow_enterprise_tests/integrations/local_baserow/service_types/test_grouped_aggregate_rows_service_type.py
@@ -18,6 +18,7 @@ from baserow_enterprise.integrations.local_baserow.models import (
     LocalBaserowGroupedAggregateRows,
     LocalBaserowTableServiceAggregationGroupBy,
     LocalBaserowTableServiceAggregationSeries,
+    LocalBaserowTableServiceAggregationSortBy,
 )
 
 
@@ -317,8 +318,12 @@ def test_create_grouped_aggregate_rows_service_sort_by_field_outside_of_series_g
                 {"field_id": field.id, "aggregation_type": "sum"},
             ],
             "service_aggregation_group_bys": [{"field_id": field.id}],
-            "service_sorts": [
-                {"field": field_2},
+            "service_aggregation_sorts": [
+                {
+                    "sort_on": "SERIES",
+                    "reference": f"field_{field_2.id}",
+                    "direction": "ASC",
+                },
             ],
         },
         user,
@@ -326,7 +331,7 @@ def test_create_grouped_aggregate_rows_service_sort_by_field_outside_of_series_g
 
     with pytest.raises(
         ValidationError,
-        match=f"The field with ID {field_2.id} cannot be used for sorting.",
+        match=f"The reference sort 'field_{field_2.id}' cannot be used for sorting.",
     ):
         ServiceHandler().create_service(service_type, **values)
 
@@ -354,8 +359,12 @@ def test_create_grouped_aggregate_rows_service_sort_by_primary_field_no_group_by
                 {"field_id": field.id, "aggregation_type": "sum"},
             ],
             "service_aggregation_group_bys": [],
-            "service_sorts": [
-                {"field": field_2},
+            "service_aggregation_sorts": [
+                {
+                    "sort_on": "PRIMARY",
+                    "reference": f"field_{field_2.id}",
+                    "direction": "ASC",
+                },
             ],
         },
         user,
@@ -363,7 +372,7 @@ def test_create_grouped_aggregate_rows_service_sort_by_primary_field_no_group_by
 
     with pytest.raises(
         ValidationError,
-        match=f"The field with ID {field_2.id} cannot be used for sorting.",
+        match=f"The reference sort 'field_{field_2.id}' cannot be used for sorting.",
     ):
         ServiceHandler().create_service(service_type, **values)
 
@@ -389,8 +398,12 @@ def test_create_grouped_aggregate_rows_service_sort_by_primary_field_with_group_
             "integration_id": integration.id,
             "service_aggregation_series": [],
             "service_aggregation_group_bys": [{"field_id": field_2.id}],
-            "service_sorts": [
-                {"field": field},
+            "service_aggregation_sorts": [
+                {
+                    "sort_on": "PRIMARY",
+                    "reference": f"field_{field.id}",
+                    "direction": "ASC",
+                },
             ],
         },
         user,
@@ -398,7 +411,7 @@ def test_create_grouped_aggregate_rows_service_sort_by_primary_field_with_group_
 
     with pytest.raises(
         ValidationError,
-        match=f"The field with ID {field.id} cannot be used for sorting.",
+        match=f"The reference sort 'field_{field.id}' cannot be used for sorting.",
     ):
         ServiceHandler().create_service(service_type, **values)
 
@@ -765,8 +778,12 @@ def test_update_grouped_aggregate_rows_service_sort_by_field_outside_of_series_g
                 {"field_id": field.id, "aggregation_type": "sum"},
             ],
             "service_aggregation_group_bys": [{"field_id": field.id}],
-            "service_sorts": [
-                {"field": field_2},
+            "service_aggregation_sorts": [
+                {
+                    "sort_on": "GROUP_BY",
+                    "reference": f"field_{field_2.id}",
+                    "direction": "ASC",
+                },
             ],
         },
         user,
@@ -774,7 +791,7 @@ def test_update_grouped_aggregate_rows_service_sort_by_field_outside_of_series_g
 
     with pytest.raises(
         ValidationError,
-        match=f"The field with ID {field_2.id} cannot be used for sorting.",
+        match=f"The reference sort 'field_{field_2.id}' cannot be used for sorting.",
     ):
         ServiceHandler().update_service(service_type, service=service, **values)
 
@@ -808,8 +825,12 @@ def test_update_grouped_aggregate_rows_service_sort_by_primary_field_no_group_by
                 {"field_id": field.id, "aggregation_type": "sum"},
             ],
             "service_aggregation_group_bys": [],
-            "service_sorts": [
-                {"field": field_2},
+            "service_aggregation_sorts": [
+                {
+                    "sort_on": "PRIMARY",
+                    "reference": f"field_{field_2.id}",
+                    "direction": "ASC",
+                },
             ],
         },
         user,
@@ -817,7 +838,7 @@ def test_update_grouped_aggregate_rows_service_sort_by_primary_field_no_group_by
 
     with pytest.raises(
         ValidationError,
-        match=f"The field with ID {field_2.id} cannot be used for sorting.",
+        match=f"The reference sort 'field_{field_2.id}' cannot be used for sorting.",
     ):
         ServiceHandler().update_service(service_type, service=service, **values)
 
@@ -849,8 +870,12 @@ def test_update_grouped_aggregate_rows_service_sort_by_primary_field_with_group_
             "integration_id": integration.id,
             "service_aggregation_series": [],
             "service_aggregation_group_bys": [{"field_id": field_2.id}],
-            "service_sorts": [
-                {"field": field},
+            "service_aggregation_sorts": [
+                {
+                    "sort_on": "PRIMARY",
+                    "reference": f"field_{field.id}",
+                    "direction": "ASC",
+                },
             ],
         },
         user,
@@ -858,7 +883,7 @@ def test_update_grouped_aggregate_rows_service_sort_by_primary_field_with_group_
 
     with pytest.raises(
         ValidationError,
-        match=f"The field with ID {field.id} cannot be used for sorting.",
+        match=f"The reference sort 'field_{field.id}' cannot be used for sorting.",
     ):
         ServiceHandler().update_service(service_type, service=service, **values)
 
@@ -887,8 +912,12 @@ def test_update_grouped_aggregate_rows_service_reset_after_table_change(data_fix
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=service, field=field, order=1
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field, order=2, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="SERIES",
+        reference=f"field_{field.id}_sum",
+        order=2,
+        direction="ASC",
     )
 
     values = service_type.prepare_values(
@@ -911,7 +940,7 @@ def test_update_grouped_aggregate_rows_service_reset_after_table_change(data_fix
     assert service.view is None
     assert service.service_aggregation_series.all().count() == 0
     assert service.service_aggregation_group_bys.all().count() == 0
-    assert service.service_sorts.all().count() == 0
+    assert service.service_aggregation_sorts.all().count() == 0
 
 
 @pytest.mark.django_db
@@ -1548,11 +1577,19 @@ def test_grouped_aggregate_rows_service_dispatch_sort_by_series_with_group_by(
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=service, field=field, order=1
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field_3, order=1, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="SERIES",
+        reference=f"field_{field_3.id}_sum",
+        order=1,
+        direction="ASC",
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field_2, order=2, order_by="DESC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="SERIES",
+        reference=f"field_{field_2.id}_sum",
+        order=2,
+        direction="DESC",
     )
 
     RowHandler().create_rows(
@@ -1682,11 +1719,19 @@ def test_grouped_aggregate_rows_service_dispatch_sort_by_series_with_group_by_ro
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=service, field=None, order=1
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field_3, order=1, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="SERIES",
+        reference=f"field_{field_3.id}_sum",
+        order=1,
+        direction="ASC",
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field_2, order=2, order_by="DESC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="SERIES",
+        reference=f"field_{field_2.id}_sum",
+        order=2,
+        direction="DESC",
     )
 
     RowHandler().create_rows(
@@ -1875,8 +1920,12 @@ def test_grouped_aggregate_rows_service_dispatch_sort_by_group_by_field(data_fix
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=service, field=field, order=1
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field, order=1, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="GROUP_BY",
+        reference=f"field_{field.id}",
+        order=1,
+        direction="ASC",
     )
 
     RowHandler().create_rows(
@@ -1997,8 +2046,12 @@ def test_grouped_aggregate_rows_service_dispatch_sort_by_group_by_row_id(data_fi
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=service, field=None, order=1
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field, order=1, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="GROUP_BY",
+        reference=f"field_{field.id}",
+        order=1,
+        direction="ASC",
     )
 
     RowHandler().create_rows(
@@ -2095,15 +2148,19 @@ def test_grouped_aggregate_rows_service_dispatch_sort_by_field_outside_series_or
     LocalBaserowTableServiceAggregationSeries.objects.create(
         service=service, field=field, aggregation_type="sum", order=2
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field_2, order=1, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="GROUP_BY",
+        reference=f"field_{field_2.id}",
+        order=1,
+        direction="ASC",
     )
 
     dispatch_context = FakeDispatchContext()
 
     with pytest.raises(
         ServiceImproperlyConfigured,
-        match=f"The field with ID {field_2.id} cannot be used for sorting.",
+        match=f"The sort reference 'field_{field_2.id}' cannot be used for sorting.",
     ):
         ServiceHandler().dispatch_service(service, dispatch_context)
 
@@ -2130,15 +2187,19 @@ def test_grouped_aggregate_rows_service_dispatch_sort_by_primary_field_no_group_
     LocalBaserowTableServiceAggregationSeries.objects.create(
         service=service, field=field_2, aggregation_type="sum", order=2
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field, order=2, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="PRIMARY",
+        reference=f"field_{field.id}",
+        order=2,
+        direction="ASC",
     )
 
     dispatch_context = FakeDispatchContext()
 
     with pytest.raises(
         ServiceImproperlyConfigured,
-        match=f"The field with ID {field.id} cannot be used for sorting.",
+        match=f"The sort reference 'field_{field.id}' cannot be used for sorting.",
     ):
         ServiceHandler().dispatch_service(service, dispatch_context)
 
@@ -2168,15 +2229,19 @@ def test_grouped_aggregate_rows_service_dispatch_sort_by_primary_field_group_by_
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=service, field=field_2, order=1
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field, order=2, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="PRIMARY",
+        reference=f"field_{field.id}",
+        order=2,
+        direction="ASC",
     )
 
     dispatch_context = FakeDispatchContext()
 
     with pytest.raises(
         ServiceImproperlyConfigured,
-        match=f"The field with ID {field.id} cannot be used for sorting.",
+        match=f"The sort reference 'field_{field.id}' cannot be used for sorting.",
     ):
         ServiceHandler().dispatch_service(service, dispatch_context)
 
@@ -2338,8 +2403,12 @@ def test_grouped_aggregate_rows_service_dispatch_max_buckets_sort_on_group_by_fi
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=service, field=field_2, order=1
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field_2, order=1, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="GROUP_BY",
+        reference=f"field_{field_2.id}",
+        order=1,
+        direction="ASC",
     )
 
     RowHandler().create_rows(
@@ -2425,8 +2494,12 @@ def test_grouped_aggregate_rows_service_dispatch_max_buckets_sort_on_series(
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=service, field=field_2, order=1
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field, order=1, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="SERIES",
+        reference=f"field_{field.id}_sum",
+        order=1,
+        direction="ASC",
     )
 
     RowHandler().create_rows(
@@ -2512,8 +2585,12 @@ def test_grouped_aggregate_rows_service_dispatch_max_buckets_sort_on_primary_fie
     LocalBaserowTableServiceAggregationGroupBy.objects.create(
         service=service, field=None, order=1
     )
-    LocalBaserowTableServiceSort.objects.create(
-        service=service, field=field_2, order=1, order_by="ASC"
+    LocalBaserowTableServiceAggregationSortBy.objects.create(
+        service=service,
+        sort_on="GROUP_BY",
+        reference=f"field_{field_2.id}",
+        order=1,
+        direction="ASC",
     )
 
     rows = RowHandler().create_rows(
diff --git a/enterprise/web-frontend/modules/baserow_enterprise/dashboard/components/data_source/AggregationSortByForm.vue b/enterprise/web-frontend/modules/baserow_enterprise/dashboard/components/data_source/AggregationSortByForm.vue
index c2825a20c..1462e3811 100644
--- a/enterprise/web-frontend/modules/baserow_enterprise/dashboard/components/data_source/AggregationSortByForm.vue
+++ b/enterprise/web-frontend/modules/baserow_enterprise/dashboard/components/data_source/AggregationSortByForm.vue
@@ -4,30 +4,30 @@
     class="margin-bottom-2"
   >
     <Dropdown
-      :value="sortByField"
+      :value="sortReference"
       :show-search="true"
       fixed-items
       class="margin-bottom-1"
-      :error="v$.sortByField?.$error || false"
-      @change="sortByFieldChangedByUser($event)"
+      :error="v$.sortReference?.$error || false"
+      @change="sortReferenceChangedByUser($event)"
     >
       <DropdownItem
         :name="$t('aggregationSortByForm.none')"
         :value="null"
       ></DropdownItem>
       <DropdownItem
-        v-for="field in allowedSortFields"
-        :key="field.id"
-        :name="field.name"
-        :value="field.id"
-        :icon="fieldIconClass(field)"
+        v-for="allowedSortReference in allowedSortReferences"
+        :key="allowedSortReference.reference"
+        :name="allowedSortReference.name"
+        :value="allowedSortReference.reference"
+        :icon="fieldIconClass(allowedSortReference.field)"
       >
       </DropdownItem>
     </Dropdown>
     <SegmentControl
-      :active-index="orderByIndex"
-      :segments="orderByOptions"
-      :initial-active-index="orderByIndex"
+      :active-index="orderDirectionIndex"
+      :segments="orderDirectionOptions"
+      :initial-active-index="orderDirectionIndex"
       @update:activeIndex="orderByChangedByUser"
     ></SegmentControl>
   </FormSection>
@@ -46,7 +46,7 @@ const includesIfSet = (array) => (value) => {
 export default {
   name: 'AggregationSortByForm',
   props: {
-    allowedSortFields: {
+    allowedSortReferences: {
       type: Array,
       required: true,
     },
@@ -60,12 +60,12 @@ export default {
   },
   data() {
     return {
-      sortByField: null,
-      orderByIndex: 0,
+      sortReference: null,
+      orderDirectionIndex: 0,
     }
   },
   computed: {
-    orderByOptions() {
+    orderDirectionOptions() {
       return [
         { label: this.$t('aggregationSortByForm.ascending'), value: 'ASC' },
         { label: this.$t('aggregationSortByForm.descending'), value: 'DESC' },
@@ -76,13 +76,13 @@ export default {
     aggregationSorts: {
       handler(aggregationSorts) {
         if (aggregationSorts.length !== 0) {
-          this.sortByField = aggregationSorts[0].field
-          this.orderByIndex = this.orderByOptions.findIndex(
-            (item) => item.value === aggregationSorts[0].order_by
+          this.sortReference = aggregationSorts[0].reference
+          this.orderDirectionIndex = this.orderDirectionOptions.findIndex(
+            (item) => item.value === aggregationSorts[0].direction
           )
         } else {
-          this.sortByField = null
-          this.orderByIndex = 0
+          this.sortReference = null
+          this.orderDirectionIndex = 0
         }
       },
       immediate: true,
@@ -94,27 +94,38 @@ export default {
   validations() {
     const self = this
     return {
-      sortByField: {
-        isValidSortFieldId: (value) => {
-          const ids = self.allowedSortFields.map((item) => item.id)
-          return includesIfSet(ids)(value)
+      sortReference: {
+        isValidSortReference: (value) => {
+          const sortReferences = self.allowedSortReferences.map(
+            (item) => item.reference
+          )
+          return includesIfSet(sortReferences)(value)
         },
       },
     }
   },
   methods: {
-    sortByFieldChangedByUser(value) {
-      this.sortByField = value
-      this.$emit('value-changed', {
-        field: value,
-        order_by: this.orderByOptions[this.orderByIndex].value,
-      })
+    sortReferenceChangedByUser(value) {
+      this.sortReference = value
+      this.emitValue()
     },
     orderByChangedByUser(index) {
-      this.orderByIndex = index
+      this.orderDirectionIndex = index
+      this.emitValue()
+    },
+    emitValue() {
+      if (this.sortReference === null) {
+        this.$emit('value-changed', null)
+        return
+      }
+
+      const chosenReference = this.allowedSortReferences.find(
+        (item) => item.reference === this.sortReference
+      )
       this.$emit('value-changed', {
-        field: this.sortByField,
-        order_by: this.orderByOptions[index].value,
+        sort_on: chosenReference.sort_on,
+        reference: chosenReference.reference,
+        direction: this.orderDirectionOptions[this.orderDirectionIndex].value,
       })
     },
     fieldIconClass(field) {
diff --git a/enterprise/web-frontend/modules/baserow_enterprise/dashboard/components/data_source/GroupedAggregateRowsDataSourceForm.vue b/enterprise/web-frontend/modules/baserow_enterprise/dashboard/components/data_source/GroupedAggregateRowsDataSourceForm.vue
index 49eeb73fa..0c7d37cf1 100644
--- a/enterprise/web-frontend/modules/baserow_enterprise/dashboard/components/data_source/GroupedAggregateRowsDataSourceForm.vue
+++ b/enterprise/web-frontend/modules/baserow_enterprise/dashboard/components/data_source/GroupedAggregateRowsDataSourceForm.vue
@@ -101,8 +101,8 @@
     </AggregationGroupByForm>
     <AggregationSortByForm
       v-if="values.table_id && !fieldHasErrors('table_id')"
-      :aggregation-sorts="values.sortings"
-      :allowed-sort-fields="allowedSortFields"
+      :aggregation-sorts="values.aggregation_sorts"
+      :allowed-sort-references="allowedSortReferences"
       @value-changed="onSortByUpdated($event)"
     >
     </AggregationSortByForm>
@@ -161,14 +161,14 @@ export default {
         'view_id',
         'aggregation_series',
         'aggregation_group_bys',
-        'sortings',
+        'aggregation_sorts',
       ],
       values: {
         table_id: null,
         view_id: null,
         aggregation_series: [],
         aggregation_group_bys: [],
-        sortings: [],
+        aggregation_sorts: [],
       },
       tableLoading: false,
       databaseSelectedId: null,
@@ -201,6 +201,9 @@ export default {
     tableFields() {
       return this.tableSelected?.fields || []
     },
+    primaryTableField() {
+      return this.tableFields.find((item) => item.primary === true)
+    },
     tableFieldIds() {
       return this.tableFields.map((field) => field.id)
     },
@@ -214,17 +217,35 @@ export default {
     tableViewIds() {
       return this.tableViews.map((view) => view.id)
     },
-    allowedSortFields() {
-      const seriesFieldIds = this.values.aggregation_series.map(
-        (item) => item.field_id
+    allowedSortReferences() {
+      const seriesSortReferences = this.values.aggregation_series
+        .filter((item) => item.field_id && item.aggregation_type)
+        .map((item) => {
+          const field = this.getTableFieldById(item.field_id)
+          return {
+            sort_on: 'SERIES',
+            reference: `field_${item.field_id}_${item.aggregation_type}`,
+            field,
+            name: `${field.name} (${this.getAggregationName(
+              item.aggregation_type
+            )})`,
+          }
+        })
+      const groupBySortReferences = this.values.aggregation_group_bys.map(
+        (item) => {
+          const field =
+            item.field_id === null
+              ? this.primaryTableField
+              : this.getTableFieldById(item.field_id)
+          return {
+            sort_on: 'GROUP_BY',
+            reference: `field_${field.id}`,
+            field,
+            name: field.name,
+          }
+        }
       )
-      const groupByFieldIds = this.values.aggregation_group_bys.map(
-        (item) => item.field_id
-      )
-      const allowedFieldIds = seriesFieldIds.concat(groupByFieldIds)
-      return this.tableFields.filter((item) => {
-        return allowedFieldIds.includes(item.id)
-      })
+      return seriesSortReferences.concat(groupBySortReferences)
     },
   },
   watch: {
@@ -280,22 +301,44 @@ export default {
     }
   },
   methods: {
+    getTableFieldById(fieldId) {
+      return this.tableFields.find((tableField) => {
+        return tableField.id === fieldId
+      })
+    },
+    getAggregationName(aggregationType) {
+      const aggType = this.$registry.get('groupedAggregation', aggregationType)
+      return aggType.getName()
+    },
     changeTableId(tableId) {
       this.values.table_id = tableId
       this.values.view_id = null
       this.values.aggregation_series = []
       this.values.aggregation_group_bys = []
-      this.values.sortings = []
+      this.values.aggregation_sorts = []
       this.v$.values.table_id.$touch()
     },
-    addSeries() {
+    async addSeries() {
+      this.setEmitValues(false)
       this.values.aggregation_series.push({
         field_id: null,
         aggregation_type: '',
       })
+      this.$emit('values-changed', {
+        aggregation_series: this.values.aggregation_series,
+      })
+      await this.$nextTick()
+      this.setEmitValues(true)
     },
-    deleteSeries(index) {
-      this.values.aggregation_series.splice(index, 1)
+    async deleteSeries(index) {
+      this.setEmitValues(false)
+      const updatedAggregationSeries = this.values.aggregation_series
+      updatedAggregationSeries.splice(index, 1)
+      this.$emit('values-changed', {
+        aggregation_series: updatedAggregationSeries,
+      })
+      await this.$nextTick()
+      this.setEmitValues(true)
     },
     onAggregationSeriesUpdated(index, aggregationSeriesValues) {
       const updatedAggregationSeries = this.values.aggregation_series
@@ -315,10 +358,10 @@ export default {
         aggregation_group_bys: aggregationGroupBys,
       })
     },
-    onSortByUpdated(sortBy) {
-      const aggregationSorts = sortBy.field !== null ? [sortBy] : []
+    onSortByUpdated(sort) {
+      const aggregationSorts = sort !== null ? [sort] : []
       this.$emit('values-changed', {
-        sortings: aggregationSorts,
+        aggregation_sorts: aggregationSorts,
       })
     },
   },