0
0
Fork 0
mirror of https://github.com/nextcloud/server.git synced 2025-01-16 08:09:00 +00:00
nextcloud_server/lib/private/DB/QueryBuilder/Sharded/CrossShardMoveHelper.php
provokateurin 9836e9b164
chore(deps): Update nextcloud/coding-standard to v1.3.1
Signed-off-by: provokateurin <kate@provokateurin.de>
2024-09-19 14:21:20 +02:00

162 lines
5 KiB
PHP

<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OC\DB\QueryBuilder\Sharded;
use OCP\DB\QueryBuilder\IQueryBuilder;
use OCP\IDBConnection;
/**
* Utility methods for implementing logic that moves data across shards
*/
class CrossShardMoveHelper {
public function __construct(
private ShardConnectionManager $connectionManager,
) {
}
public function getConnection(ShardDefinition $shardDefinition, int $shardKey): IDBConnection {
return $this->connectionManager->getConnection($shardDefinition, $shardDefinition->getShardForKey($shardKey));
}
/**
* Update the shard key of a set of rows, moving them to a different shard if needed
*
* @param ShardDefinition $shardDefinition
* @param string $table
* @param string $shardColumn
* @param int $sourceShardKey
* @param int $targetShardKey
* @param string $primaryColumn
* @param int[] $primaryKeys
* @return void
*/
public function moveCrossShards(ShardDefinition $shardDefinition, string $table, string $shardColumn, int $sourceShardKey, int $targetShardKey, string $primaryColumn, array $primaryKeys): void {
$sourceShard = $shardDefinition->getShardForKey($sourceShardKey);
$targetShard = $shardDefinition->getShardForKey($targetShardKey);
$sourceConnection = $this->connectionManager->getConnection($shardDefinition, $sourceShard);
if ($sourceShard === $targetShard) {
$this->updateItems($sourceConnection, $table, $shardColumn, $targetShardKey, $primaryColumn, $primaryKeys);
return;
}
$targetConnection = $this->connectionManager->getConnection($shardDefinition, $targetShard);
$sourceItems = $this->loadItems($sourceConnection, $table, $primaryColumn, $primaryKeys);
foreach ($sourceItems as &$sourceItem) {
$sourceItem[$shardColumn] = $targetShardKey;
}
if (!$sourceItems) {
return;
}
$sourceConnection->beginTransaction();
$targetConnection->beginTransaction();
try {
$this->saveItems($targetConnection, $table, $sourceItems);
$this->deleteItems($sourceConnection, $table, $primaryColumn, $primaryKeys);
$targetConnection->commit();
$sourceConnection->commit();
} catch (\Exception $e) {
$sourceConnection->rollback();
$targetConnection->rollback();
throw $e;
}
}
/**
* Load rows from a table to move
*
* @param IDBConnection $connection
* @param string $table
* @param string $primaryColumn
* @param int[] $primaryKeys
* @return array[]
*/
public function loadItems(IDBConnection $connection, string $table, string $primaryColumn, array $primaryKeys): array {
$query = $connection->getQueryBuilder();
$query->select('*')
->from($table)
->where($query->expr()->in($primaryColumn, $query->createParameter('keys')));
$chunks = array_chunk($primaryKeys, 1000);
$results = [];
foreach ($chunks as $chunk) {
$query->setParameter('keys', $chunk, IQueryBuilder::PARAM_INT_ARRAY);
$results = array_merge($results, $query->execute()->fetchAll());
}
return $results;
}
/**
* Save modified rows
*
* @param IDBConnection $connection
* @param string $table
* @param array[] $items
* @return void
*/
public function saveItems(IDBConnection $connection, string $table, array $items): void {
if (count($items) === 0) {
return;
}
$query = $connection->getQueryBuilder();
$query->insert($table);
foreach ($items[0] as $column => $value) {
$query->setValue($column, $query->createParameter($column));
}
foreach ($items as $item) {
foreach ($item as $column => $value) {
if (is_int($column)) {
$query->setParameter($column, $value, IQueryBuilder::PARAM_INT);
} else {
$query->setParameter($column, $value);
}
}
$query->executeStatement();
}
}
/**
* @param IDBConnection $connection
* @param string $table
* @param string $primaryColumn
* @param int[] $primaryKeys
* @return void
*/
public function updateItems(IDBConnection $connection, string $table, string $shardColumn, int $targetShardKey, string $primaryColumn, array $primaryKeys): void {
$query = $connection->getQueryBuilder();
$query->update($table)
->set($shardColumn, $query->createNamedParameter($targetShardKey, IQueryBuilder::PARAM_INT))
->where($query->expr()->in($primaryColumn, $query->createNamedParameter($primaryKeys, IQueryBuilder::PARAM_INT_ARRAY)));
$query->executeQuery()->fetchAll();
}
/**
* @param IDBConnection $connection
* @param string $table
* @param string $primaryColumn
* @param int[] $primaryKeys
* @return void
*/
public function deleteItems(IDBConnection $connection, string $table, string $primaryColumn, array $primaryKeys): void {
$query = $connection->getQueryBuilder();
$query->delete($table)
->where($query->expr()->in($primaryColumn, $query->createParameter('keys')));
$chunks = array_chunk($primaryKeys, 1000);
foreach ($chunks as $chunk) {
$query->setParameter('keys', $chunk, IQueryBuilder::PARAM_INT_ARRAY);
$query->executeStatement();
}
}
}