0
0
mirror of https://github.com/salesagility/SuiteCRM.git synced 2024-12-22 12:28:31 +00:00
salesagility_SuiteCRM/include/Services/Batch/BatchJob.php
Clemente Raposo 687ab79049 Fix #9191 - Add utf normalization repair command
- Add normalization service
- Add Synchronous execution option
- Add Asynchronous execution option
- Entry to administration repair page
2022-01-25 14:21:23 +00:00

373 lines
10 KiB
PHP

<?php
/**
*
* SugarCRM Community Edition is a customer relationship management program developed by
* SugarCRM, Inc. Copyright (C) 2004-2013 SugarCRM Inc.
*
* SuiteCRM is an extension to SugarCRM Community Edition developed by SalesAgility Ltd.
* Copyright (C) 2011 - 2018 SalesAgility Ltd.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU Affero General Public License version 3 as published by the
* Free Software Foundation with the addition of the following permission added
* to Section 15 as permitted in Section 7(a): FOR ANY PART OF THE COVERED WORK
* IN WHICH THE COPYRIGHT IS OWNED BY SUGARCRM, SUGARCRM DISCLAIMS THE WARRANTY
* OF NON INFRINGEMENT OF THIRD PARTY RIGHTS.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
* details.
*
* You should have received a copy of the GNU Affero General Public License along with
* this program; if not, see http://www.gnu.org/licenses or write to the Free
* Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA.
*
* You can contact SugarCRM, Inc. headquarters at 10050 North Wolfe Road,
* SW2-130, Cupertino, CA 95014, USA. or at email address contact@sugarcrm.com.
*
* The interactive user interfaces in modified source and object code versions
* of this program must display Appropriate Legal Notices, as required under
* Section 5 of the GNU Affero General Public License version 3.
*
* In accordance with Section 7(b) of the GNU Affero General Public License version 3,
* these Appropriate Legal Notices must retain the display of the "Powered by
* SugarCRM" logo and "Supercharged by SuiteCRM" logo. If the display of the logos is not
* reasonably feasible for technical reasons, the Appropriate Legal Notices must
* display the words "Powered by SugarCRM" and "Supercharged by SuiteCRM".
*/
if (!defined('sugarEntry') || !sugarEntry) {
die('Not A Valid Entry Point');
}
require_once __DIR__ . '/../../../modules/SchedulersJobs/SchedulersJob.php';
abstract class BatchJob implements RunnableSchedulerJob
{
/**
* @var $job SchedulersJob
*/
protected $job;
/**
* @var $batchSize int
*/
protected $batchSize = 100;
/**
* @inheritDoc
*/
public function setJob(SchedulersJob $job)
{
$this->job = $job;
}
/**
* @inheritDoc
*/
public function run($data)
{
$data = $data ?? [];
if (is_string($data)) {
$data = json_decode(html_entity_decode($data), true, 512, JSON_THROW_ON_ERROR);
}
if (!$this->shouldRun($data)) {
return false;
}
$this->init();
if (empty($data['tracking'])) {
$data['tracking'] = $this->setupTracking();
}
$batchSize = $this->getBatchSize($data);
[$type, $recordIds, $data] = $this->getNextBatch($data, $batchSize);
if (empty($recordIds)) {
$this->markComplete();
return true;
}
$data = $this->processBatch($type, $data, $recordIds);
$this->updateTrackingTable($type, $data, $recordIds);
$this->updateJobStatus($data);
return true;
}
/**
* Get job key
* @return string
*/
abstract public function getJobKey(): string;
/**
* Get job key
* @return array
*/
abstract protected function setupTracking(): array;
/**
* Should run
* @return bool
*/
abstract protected function shouldRun(): bool;
/**
* Do initial setup
*/
abstract protected function init(): void;
/**
* @param string $type
* @param array $data
* @return array
*/
abstract protected function getAllIdsInGroup(string $type, array $data): array;
/**
* @param string $type
* @param array $data
* @param array $ids
* @return array
*/
abstract protected function processBatch(string $type, array $data, array $ids): array;
/**
* Get tracking table name
* @param string $type
* @return string
*/
protected function getTrackingTableName(string $type): string
{
return 'tmp_' . strtolower(str_replace('-', '_', $this->getJobKey())) . '_' . strtolower($type);
}
/**
* Create and initialize tracking table with all ids to process
* @param string $table
* @param array $ids
*/
protected function initTrackingTable(string $table, array $ids): void
{
global $db;
$fields = [
'id' => [
'name' => 'id',
'type' => 'varchar',
],
'status' => [
'name' => 'status',
'type' => 'varchar',
'isnull' => true
],
];
$db->createTableParams($table, $fields, []);
$chunkSize = 2500;
$recordIds = array_chunk($ids, $chunkSize, true);
$baseQuery = "INSERT INTO " . $table . " (id) VALUES ";
foreach ($recordIds as $chunk) {
$query = $baseQuery . '("' . implode('") , ("', $chunk) . '")';
$db->query($query);
}
}
/**
* Delete ids table
* @param string $table
*/
protected function deleteTrackingTable(string $table): void
{
global $db;
$db->dropTableName($table);
}
/**
* Update tracking table with ids that have been processed
* @param string $type
* @param array $data
* @param array $recordIds
*/
protected function updateTrackingTable(string $type, array $data, array $recordIds): void
{
global $db;
$trackingTable = $data['tracking'][$type]['table'] ?? '';
$sql = "UPDATE $trackingTable SET status = 'processed' WHERE id IN ('" . implode("' , '", $recordIds) . "') ";
$db->query($sql);
}
/**
* Get Batch of ids to process
* @param string $trackingTable
* @param array $data
* @return array
*/
protected function getIdsBatch(string $trackingTable, array $data): array
{
global $db;
$batchSize = $this->getBatchSize($data);
$query = "SELECT id FROM $trackingTable WHERE status is null";
$start = 0;
$count = $batchSize;
$result = $db->limitQuery($query, $start, $count);
$ids = [];
$row = $db->fetchByAssoc($result, false);
while (!empty($row)) {
if (!empty($row['id'])) {
$ids[] = $row['id'];
}
$row = $db->fetchByAssoc($result);
}
return $ids;
}
/**
* @param array $data
* @param string $group
* @return array
*/
protected function getGroupBatchInfo(array $data, string $group): array
{
return $data['tracking'][$group] ?? [];
}
/**
* @param array $data
* @return int
*/
protected function getBatchSize(array $data): int
{
$batchSize = $this->batchSize;
if (!empty($data['batchSize']) && is_numeric($data['batchSize'])) {
$batchSize = (int)$data['batchSize'];
}
return $batchSize;
}
/**
* @param $data
* @param int $batchSize
* @return array
*/
protected function getNextBatch($data, int $batchSize): array
{
$groups = array_keys($data['tracking']);
$type = '';
$recordIds = [];
foreach ($groups as $group) {
$groupStatus = $data['tracking'][$group]['status'] ?? 'queued';
if ($groupStatus === 'done' || $groupStatus === 'no-records') {
continue;
}
$currentDataSet = $this->getGroupBatchInfo($data, $group);
$trackingTable = $currentDataSet['table'] ?? '';
$updatedCount = $this->getUpdatedCount($currentDataSet);
if ($trackingTable === '') {
$ids = $this->getAllIdsInGroup($group, $data);
if (empty($ids)) {
$data['tracking'][$group]['status'] = 'no-records';
continue;
}
$trackingTable = $this->getTrackingTableName($group);
$this->initTrackingTable($trackingTable, $ids);
$data['tracking'][$group]['total'] = count($ids);
$data['tracking'][$group]['table'] = $trackingTable;
$idChunks = array_chunk($ids, $batchSize);
$recordIds = $idChunks[0];
}
if (empty($recordIds)) {
$recordIds = $this->getIdsBatch($trackingTable, $data);
}
if (!empty($recordIds)) {
$data['tracking'][$group]['status'] = 'in-progress';
$data['tracking'][$group]['updated_count'] = $updatedCount + count($recordIds);
$type = $group;
break;
}
// no more ids for current group, delete table
if (!empty($data['tracking'][$group]['table']) && empty($data['keepTracking'])) {
$this->deleteTrackingTable($data['tracking'][$group]['table']);
}
$data['tracking'][$group]['status'] = 'done';
}
return [$type, $recordIds, $data];
}
/**
* Get updated count
* @param $currentDataSet
* @return int
*/
protected function getUpdatedCount($currentDataSet): int
{
$updatedCount = 0;
if (!empty($currentDataSet['updated_count']) && is_numeric($currentDataSet['updated_count'])) {
$updatedCount = (int)$currentDataSet['updated_count'];
}
return $updatedCount;
}
/**
* Update job status and data
* @param $data
*/
protected function updateJobStatus($data): void
{
$this->job->data = json_encode($data, JSON_THROW_ON_ERROR);
$this->job->status = 'queued';
$this->job->postponeJob(null, 5);
}
/**
* Mark complete
*/
protected function markComplete(): void
{
$this->job->resolveJob(SchedulersJob::JOB_SUCCESS);
}
}