mirror of
https://github.com/salesagility/SuiteCRM.git
synced 2024-12-22 12:28:31 +00:00
687ab79049
- Add normalization service - Add Synchronous execution option - Add Asynchronous execution option - Entry to administration repair page
373 lines
10 KiB
PHP
373 lines
10 KiB
PHP
<?php
|
|
|
|
/**
|
|
*
|
|
* SugarCRM Community Edition is a customer relationship management program developed by
|
|
* SugarCRM, Inc. Copyright (C) 2004-2013 SugarCRM Inc.
|
|
*
|
|
* SuiteCRM is an extension to SugarCRM Community Edition developed by SalesAgility Ltd.
|
|
* Copyright (C) 2011 - 2018 SalesAgility Ltd.
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify it under
|
|
* the terms of the GNU Affero General Public License version 3 as published by the
|
|
* Free Software Foundation with the addition of the following permission added
|
|
* to Section 15 as permitted in Section 7(a): FOR ANY PART OF THE COVERED WORK
|
|
* IN WHICH THE COPYRIGHT IS OWNED BY SUGARCRM, SUGARCRM DISCLAIMS THE WARRANTY
|
|
* OF NON INFRINGEMENT OF THIRD PARTY RIGHTS.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
* FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
|
|
* details.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License along with
|
|
* this program; if not, see http://www.gnu.org/licenses or write to the Free
|
|
* Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
|
* 02110-1301 USA.
|
|
*
|
|
* You can contact SugarCRM, Inc. headquarters at 10050 North Wolfe Road,
|
|
* SW2-130, Cupertino, CA 95014, USA. or at email address contact@sugarcrm.com.
|
|
*
|
|
* The interactive user interfaces in modified source and object code versions
|
|
* of this program must display Appropriate Legal Notices, as required under
|
|
* Section 5 of the GNU Affero General Public License version 3.
|
|
*
|
|
* In accordance with Section 7(b) of the GNU Affero General Public License version 3,
|
|
* these Appropriate Legal Notices must retain the display of the "Powered by
|
|
* SugarCRM" logo and "Supercharged by SuiteCRM" logo. If the display of the logos is not
|
|
* reasonably feasible for technical reasons, the Appropriate Legal Notices must
|
|
* display the words "Powered by SugarCRM" and "Supercharged by SuiteCRM".
|
|
*/
|
|
|
|
if (!defined('sugarEntry') || !sugarEntry) {
|
|
die('Not A Valid Entry Point');
|
|
}
|
|
|
|
require_once __DIR__ . '/../../../modules/SchedulersJobs/SchedulersJob.php';
|
|
|
|
abstract class BatchJob implements RunnableSchedulerJob
|
|
{
|
|
/**
|
|
* @var $job SchedulersJob
|
|
*/
|
|
protected $job;
|
|
|
|
/**
|
|
* @var $batchSize int
|
|
*/
|
|
protected $batchSize = 100;
|
|
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function setJob(SchedulersJob $job)
|
|
{
|
|
$this->job = $job;
|
|
}
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function run($data)
|
|
{
|
|
$data = $data ?? [];
|
|
|
|
if (is_string($data)) {
|
|
$data = json_decode(html_entity_decode($data), true, 512, JSON_THROW_ON_ERROR);
|
|
}
|
|
|
|
if (!$this->shouldRun($data)) {
|
|
return false;
|
|
}
|
|
|
|
$this->init();
|
|
|
|
if (empty($data['tracking'])) {
|
|
$data['tracking'] = $this->setupTracking();
|
|
}
|
|
|
|
$batchSize = $this->getBatchSize($data);
|
|
[$type, $recordIds, $data] = $this->getNextBatch($data, $batchSize);
|
|
|
|
if (empty($recordIds)) {
|
|
$this->markComplete();
|
|
|
|
return true;
|
|
}
|
|
|
|
$data = $this->processBatch($type, $data, $recordIds);
|
|
|
|
$this->updateTrackingTable($type, $data, $recordIds);
|
|
$this->updateJobStatus($data);
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Get job key
|
|
* @return string
|
|
*/
|
|
abstract public function getJobKey(): string;
|
|
|
|
/**
|
|
* Get job key
|
|
* @return array
|
|
*/
|
|
abstract protected function setupTracking(): array;
|
|
|
|
/**
|
|
* Should run
|
|
* @return bool
|
|
*/
|
|
abstract protected function shouldRun(): bool;
|
|
|
|
/**
|
|
* Do initial setup
|
|
*/
|
|
abstract protected function init(): void;
|
|
|
|
/**
|
|
* @param string $type
|
|
* @param array $data
|
|
* @return array
|
|
*/
|
|
abstract protected function getAllIdsInGroup(string $type, array $data): array;
|
|
|
|
/**
|
|
* @param string $type
|
|
* @param array $data
|
|
* @param array $ids
|
|
* @return array
|
|
*/
|
|
abstract protected function processBatch(string $type, array $data, array $ids): array;
|
|
|
|
/**
|
|
* Get tracking table name
|
|
* @param string $type
|
|
* @return string
|
|
*/
|
|
protected function getTrackingTableName(string $type): string
|
|
{
|
|
return 'tmp_' . strtolower(str_replace('-', '_', $this->getJobKey())) . '_' . strtolower($type);
|
|
}
|
|
|
|
/**
|
|
* Create and initialize tracking table with all ids to process
|
|
* @param string $table
|
|
* @param array $ids
|
|
*/
|
|
protected function initTrackingTable(string $table, array $ids): void
|
|
{
|
|
global $db;
|
|
|
|
$fields = [
|
|
'id' => [
|
|
'name' => 'id',
|
|
'type' => 'varchar',
|
|
],
|
|
'status' => [
|
|
'name' => 'status',
|
|
'type' => 'varchar',
|
|
'isnull' => true
|
|
],
|
|
];
|
|
|
|
$db->createTableParams($table, $fields, []);
|
|
|
|
$chunkSize = 2500;
|
|
$recordIds = array_chunk($ids, $chunkSize, true);
|
|
|
|
$baseQuery = "INSERT INTO " . $table . " (id) VALUES ";
|
|
|
|
foreach ($recordIds as $chunk) {
|
|
|
|
$query = $baseQuery . '("' . implode('") , ("', $chunk) . '")';
|
|
|
|
$db->query($query);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Delete ids table
|
|
* @param string $table
|
|
*/
|
|
protected function deleteTrackingTable(string $table): void
|
|
{
|
|
global $db;
|
|
|
|
$db->dropTableName($table);
|
|
}
|
|
|
|
/**
|
|
* Update tracking table with ids that have been processed
|
|
* @param string $type
|
|
* @param array $data
|
|
* @param array $recordIds
|
|
*/
|
|
protected function updateTrackingTable(string $type, array $data, array $recordIds): void
|
|
{
|
|
global $db;
|
|
|
|
$trackingTable = $data['tracking'][$type]['table'] ?? '';
|
|
|
|
$sql = "UPDATE $trackingTable SET status = 'processed' WHERE id IN ('" . implode("' , '", $recordIds) . "') ";
|
|
|
|
$db->query($sql);
|
|
}
|
|
|
|
/**
|
|
* Get Batch of ids to process
|
|
* @param string $trackingTable
|
|
* @param array $data
|
|
* @return array
|
|
*/
|
|
protected function getIdsBatch(string $trackingTable, array $data): array
|
|
{
|
|
global $db;
|
|
|
|
$batchSize = $this->getBatchSize($data);
|
|
|
|
$query = "SELECT id FROM $trackingTable WHERE status is null";
|
|
$start = 0;
|
|
$count = $batchSize;
|
|
|
|
$result = $db->limitQuery($query, $start, $count);
|
|
|
|
$ids = [];
|
|
$row = $db->fetchByAssoc($result, false);
|
|
|
|
while (!empty($row)) {
|
|
if (!empty($row['id'])) {
|
|
$ids[] = $row['id'];
|
|
}
|
|
|
|
$row = $db->fetchByAssoc($result);
|
|
|
|
}
|
|
|
|
return $ids;
|
|
}
|
|
|
|
/**
|
|
* @param array $data
|
|
* @param string $group
|
|
* @return array
|
|
*/
|
|
protected function getGroupBatchInfo(array $data, string $group): array
|
|
{
|
|
return $data['tracking'][$group] ?? [];
|
|
}
|
|
|
|
/**
|
|
* @param array $data
|
|
* @return int
|
|
*/
|
|
protected function getBatchSize(array $data): int
|
|
{
|
|
$batchSize = $this->batchSize;
|
|
if (!empty($data['batchSize']) && is_numeric($data['batchSize'])) {
|
|
$batchSize = (int)$data['batchSize'];
|
|
}
|
|
|
|
return $batchSize;
|
|
}
|
|
|
|
/**
|
|
* @param $data
|
|
* @param int $batchSize
|
|
* @return array
|
|
*/
|
|
protected function getNextBatch($data, int $batchSize): array
|
|
{
|
|
$groups = array_keys($data['tracking']);
|
|
$type = '';
|
|
$recordIds = [];
|
|
|
|
foreach ($groups as $group) {
|
|
$groupStatus = $data['tracking'][$group]['status'] ?? 'queued';
|
|
|
|
if ($groupStatus === 'done' || $groupStatus === 'no-records') {
|
|
continue;
|
|
}
|
|
|
|
$currentDataSet = $this->getGroupBatchInfo($data, $group);
|
|
|
|
$trackingTable = $currentDataSet['table'] ?? '';
|
|
$updatedCount = $this->getUpdatedCount($currentDataSet);
|
|
|
|
if ($trackingTable === '') {
|
|
$ids = $this->getAllIdsInGroup($group, $data);
|
|
|
|
if (empty($ids)) {
|
|
$data['tracking'][$group]['status'] = 'no-records';
|
|
continue;
|
|
}
|
|
|
|
$trackingTable = $this->getTrackingTableName($group);
|
|
$this->initTrackingTable($trackingTable, $ids);
|
|
|
|
$data['tracking'][$group]['total'] = count($ids);
|
|
$data['tracking'][$group]['table'] = $trackingTable;
|
|
|
|
$idChunks = array_chunk($ids, $batchSize);
|
|
$recordIds = $idChunks[0];
|
|
}
|
|
|
|
if (empty($recordIds)) {
|
|
$recordIds = $this->getIdsBatch($trackingTable, $data);
|
|
}
|
|
|
|
if (!empty($recordIds)) {
|
|
$data['tracking'][$group]['status'] = 'in-progress';
|
|
$data['tracking'][$group]['updated_count'] = $updatedCount + count($recordIds);
|
|
$type = $group;
|
|
break;
|
|
}
|
|
|
|
// no more ids for current group, delete table
|
|
if (!empty($data['tracking'][$group]['table']) && empty($data['keepTracking'])) {
|
|
$this->deleteTrackingTable($data['tracking'][$group]['table']);
|
|
}
|
|
|
|
$data['tracking'][$group]['status'] = 'done';
|
|
}
|
|
|
|
return [$type, $recordIds, $data];
|
|
}
|
|
|
|
/**
|
|
* Get updated count
|
|
* @param $currentDataSet
|
|
* @return int
|
|
*/
|
|
protected function getUpdatedCount($currentDataSet): int
|
|
{
|
|
$updatedCount = 0;
|
|
if (!empty($currentDataSet['updated_count']) && is_numeric($currentDataSet['updated_count'])) {
|
|
$updatedCount = (int)$currentDataSet['updated_count'];
|
|
}
|
|
|
|
return $updatedCount;
|
|
}
|
|
|
|
/**
|
|
* Update job status and data
|
|
* @param $data
|
|
*/
|
|
protected function updateJobStatus($data): void
|
|
{
|
|
$this->job->data = json_encode($data, JSON_THROW_ON_ERROR);
|
|
$this->job->status = 'queued';
|
|
$this->job->postponeJob(null, 5);
|
|
}
|
|
|
|
/**
|
|
* Mark complete
|
|
*/
|
|
protected function markComplete(): void
|
|
{
|
|
$this->job->resolveJob(SchedulersJob::JOB_SUCCESS);
|
|
}
|
|
}
|