<?php
/**
* Copyright Blackbit digital Commerce GmbH <info@blackbit.de>
*
* This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
namespace Blackbit\DataDirectorBundle\EventListener;
use Blackbit\DataDirectorBundle\Controller\ImportController;
use Blackbit\DataDirectorBundle\lib\Pim\Helper;
use Blackbit\DataDirectorBundle\lib\Pim\Import\CallbackFunction;
use Blackbit\DataDirectorBundle\lib\Pim\Item\ImporterInterface;
use Blackbit\DataDirectorBundle\lib\Pim\Item\ItemMoldBuilder;
use Blackbit\DataDirectorBundle\lib\Pim\RawData\Importer;
use Blackbit\DataDirectorBundle\lib\Pim\Serializer;
use Blackbit\DataDirectorBundle\model\Dataport;
use Blackbit\DataDirectorBundle\model\DataportResource;
use Blackbit\DataDirectorBundle\model\Fieldmapping;
use Blackbit\DataDirectorBundle\model\ImportStatus;
use Blackbit\DataDirectorBundle\model\PimcoreDbRepository;
use Blackbit\DataDirectorBundle\model\Queue;
use Blackbit\DataDirectorBundle\model\RawItem;
use Blackbit\DataDirectorBundle\model\RawItemData;
use Blackbit\DataDirectorBundle\Tools\Installer;
use InvalidArgumentException;
use Pimcore\Db;
use Pimcore\Event\Model\AssetEvent;
use Pimcore\Event\Model\ElementEventInterface;
use Pimcore\Logger;
use Pimcore\Model\AbstractModel;
use Pimcore\Model\Asset;
use Pimcore\Model\DataObject\AbstractObject;
use Pimcore\Model\DataObject\Concrete;
use Pimcore\Model\Element\DirtyIndicatorInterface;
use Pimcore\Model\Element\ElementInterface;
use Pimcore\Model\Element\Service;
use Pimcore\Model\User;
use Pimcore\Model\Version;
use Pimcore\Tool;
use Blackbit\DataDirectorBundle\lib\Pim\Cli;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\HttpFoundation\Request;
class AutomaticImportListener implements LoggerAwareInterface
{
use LoggerAwareTrait;
private static $processCommandRegistered = [];
private static $dataports = null;
/** @var ItemMoldBuilder */
private $itemMoldBuilder;
public function __construct(ItemMoldBuilder $itemMoldBuilder)
{
$this->itemMoldBuilder = $itemMoldBuilder;
}
private static function getDataports() {
if(self::$dataports === null) {
self::$dataports = Dataport::getInstance()->find([], 'name');
foreach(self::$dataports as &$dataport) {
$dataport['sourceconfig'] = unserialize($dataport['sourceconfig'], ['allowed_classes' => false]);
$dataport['targetconfig'] = unserialize($dataport['targetconfig'], ['allowed_classes' => false]);
}
unset($dataport);
}
return self::$dataports;
}
public function startImports(ElementEventInterface $e) {
if(\method_exists($e, 'getArgument')) {
try {
$saveVersionOnly = $e->getArgument('saveVersionOnly');
if($saveVersionOnly) {
return;
}
} catch(\InvalidArgumentException $exception) {
}
}
$object = $e->getElement();
$user = Helper::getUser();
if($user instanceof User) {
PimcoreDbRepository::getInstance()->execute('UPDATE edit_lock SET date=? WHERE cid=? AND ctype=? AND userId=?', [time(), $object->getId(), Service::getElementType($object), $user->getId()]);
}
$dataports = Dataport::getInstance();
$dataportResourceRepository = DataportResource::getInstance();
$queue = Queue::getInstance();
$queueAddItems = [];
foreach(self::getDataports() as $dataport) {
$originalLocale = \Pimcore::getContainer()->get('pimcore.locale')->getLocale() ?? Tool::getDefaultLanguage();
try {
$sourceConfig = $dataport['sourceconfig'];
$targetConfig = $dataport['targetconfig'];
if(!empty($sourceConfig['autoImport'])) {
$parser = $dataports->getParser($dataport['id']);
if(method_exists($parser, 'disableLoggingNotFoundImportResource')) {
$parser->disableLoggingNotFoundImportResource();
}
if(\method_exists($parser, 'setSourceFile') && \method_exists($parser, 'getFileConditionFromObject')) {
if (empty($targetConfig['itemClass'])) {
// Export
if (empty($sourceConfig['incrementalExport'])) {
$dataportResources = $dataportResourceRepository->find(['dataportId = ?' => $dataport['id']]);
} else {
$dataportResources = [
$dataportResourceRepository->create(
[
'dataportId' => $dataport['id'],
'resource' => \json_encode([], JSON_UNESCAPED_SLASHES)
]
)
];
}
$dataportResourceIdsStillContainingObject = [];
foreach ($dataportResources as $dataportResource) {
try {
$resourceSettings = \json_decode($dataportResource['resource'], true);
$parser->setSourceFile($resourceSettings['file'] ?? null);
if (!empty($resourceSettings['locale'])) {
\Pimcore::getContainer()->get('pimcore.locale')->setLocale($resourceSettings['locale']);
} else {
$resourceSettings['locale'] = \Pimcore::getContainer()->get('pimcore.locale')->getLocale();
if ($resourceSettings['locale'] === null) {
$resourceSettings['locale'] = Tool::getDefaultLanguage();
}
}
$source = $parser->getFileConditionFromObject($object);
if ($source !== null) {
$dataportResourceIdsStillContainingObject[$source][$resourceSettings['locale']][] = $dataportResource['id'];
} elseif (empty($sourceConfig['incrementalExport'])) {
$source = $parser->getFileConditionFromObject($object, false);
if ($source !== null) {
$tmpSourceConfig = $sourceConfig;
if (\method_exists($parser, 'setConfig')) {
if (\method_exists($parser, 'getConfig')) {
$tmpSourceConfig = $parser->getConfig();
}
$tmpSourceConfig['file'] = '';
$tmpSourceConfig['dataportId'] = $dataport['id'];
$parser->setConfig($tmpSourceConfig);
}
$parser->setSourceFile($source);
$keyFields = [];
foreach ($tmpSourceConfig['fields'] as $fieldIndex => $field) {
if (!empty($field['exportKey'])) {
$keyFields[$fieldIndex] = $field;
}
}
$hashs = [];
if ($dataport['sourcetype'] === 'pimcore') {
$locales = Tool::getValidLanguages();
if ($dataportResource !== null) {
$resource = \json_decode($dataportResource['resource'], true);
if ($resource['locale']) {
$locales = [$resource['locale']];
}
}
foreach ($locales as $language) {
\Pimcore::getContainer()->get('pimcore.locale')->setLocale($language);
foreach ($parser as $rawItemData) {
if ($rawItemData === null) {
continue;
}
$rawItemData = array_filter(
$rawItemData,
static function ($fieldId) use ($keyFields) {
return isset($keyFields[$fieldId]);
},
ARRAY_FILTER_USE_KEY
);
$hashs[] = Importer::getHash($rawItemData);
}
}
} else {
foreach ($parser as $rawItemData) {
if ($rawItemData === null) {
continue;
}
$rawItemData = array_filter(
$rawItemData,
static function ($fieldId) use ($keyFields) {
return isset($keyFields[$fieldId]);
},
ARRAY_FILTER_USE_KEY
);
$hashs[] = Importer::getHash($rawItemData);
}
}
if(count($hashs) > 0) {
$countDeletedRawItems = Db::get()->executeUpdate('DELETE FROM '.Installer::TABLE_RAWITEM.' WHERE dataport_resource_id = ? AND hash IN ("'.implode('","', $hashs).'")', [$dataportResource['id']]);
if ($countDeletedRawItems) {
$this->queueProcessRawData($dataportResource['id'], $dataport['id']);
}
}
}
}
} catch (\Throwable $e) {
if(!$e instanceof SkipTriggerAutomaticImportException) {
$error = 'Check for automatic start for dataport #'.$dataport['id'].' failed: '.(string)$e;
if($this->logger) {
$this->logger->error($error);
} else {
Logger::error($error);
}
}
}
}
foreach($dataportResourceIdsStillContainingObject as $dataportResourceQuery => $dataportResourcesWithSameLocale) {
foreach($dataportResourcesWithSameLocale as $locale => $dataportResourceIds) {
if (empty($sourceConfig['incrementalExport'])) {
if ($dataport['sourcetype'] === 'pimcore' && $this->extractIds($dataportResourceQuery, $match)) {
$ids = $match[1];
sort($ids, SORT_NUMERIC);
foreach ($ids as $id) {
$queueAddItems[] = [
'command' => 'data-director:extract '.$dataport['id'].' "'.str_replace([$match[0], '"', '$', '`'], ['='.$id, '\\"', '\\$', '\\`'], $dataportResourceQuery).'"'.(!empty($locale) ? ' --locale='.$locale : '').' --dataport-resource-id='.implode(',', $dataportResourceIds),
'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
'worker_id' => $dataport['id']
];
}
} else {
$queueAddItems[] = [
'command' => 'data-director:extract '.$dataport['id'].' "'.str_replace(['"', '$', '`'], ['\\"', '\\$', '\\`'], $dataportResourceQuery).'"'.(!empty($locale) ? ' --locale='.$locale : '').' --dataport-resource-id='.implode(',', $dataportResourceIds),
'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
'worker_id' => $dataport['id']
];
}
} elseif ($dataport['sourcetype'] === 'pimcore' && ($ids = $this->extractIds($dataportResourceQuery, $match))) {
$ids = $match[1];
sort($ids, SORT_NUMERIC);
foreach ($ids as $id) {
$queueAddItems[] = [
'command' => 'data-director:complete '.$dataport['id'].' "'.str_replace([$match[0], '"', '$', '`'], ['='.$id, '\\"', '\\$', '\\`'], $dataportResourceQuery).'"',
'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
'worker_id' => $dataport['id']
];
}
} else {
$queueAddItems[] = [
'command' => 'data-director:complete '.$dataport['id'].' "'.str_replace(['"', '$', '`'], ['\\"', '\\$', '\\`'], $dataportResourceQuery).'"',
'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
'worker_id' => $dataport['id']
];
}
}
}
} else {
try {
// Import
if (\method_exists($parser, 'getConfig')) {
$parser->setSourceFile($parser->getConfig()['file'] ?? null);
}
$source = $parser->getFileConditionFromObject($object);
if ($source !== null) {
if ($dataport['sourcetype'] === 'pimcore' && preg_match('/ IN \(((\d+,?)+)\)/', $source, $match)) {
$ids = explode(',', $match[1]);
sort($ids, SORT_NUMERIC);
foreach ($ids as $id) {
$queueAddItems[] = [
'command' => 'data-director:complete '.$dataport['id'].' "'.str_replace([$match[0], '"', '$', '`'], ['='.$id, '\\"', '\\$', '\\`'], $source).'"'.(($user instanceof User) ? ' --user='.$user->getId() : ''),
'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
'worker_id' => $dataport['id']
];
}
} else {
$queueAddItems[] = [
'command' => 'data-director:complete '.$dataport['id'].' "'.str_replace(['"', '$', '`'], ['\\"', '\\$', '\\`'], $source).'"',
'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
'worker_id' => $dataport['id']
];
}
}
} catch(SkipTriggerAutomaticImportException $e) {
}
}
}
}
} catch (\Exception $e) {
if($this->logger) {
$this->logger->error('Automatic processing of dataport #'.$dataport['id'].' "'.$dataport['name'].'" failed:'. (string)$e);
} else {
Logger::error('Automatic processing of dataport #'.$dataport['id'].' "'.$dataport['name'].'" failed:'.(string)$e);
}
} finally {
\Pimcore::getContainer()->get('pimcore.locale')->setLocale($originalLocale);
}
}
if (count($queueAddItems) > 0) {
$queue->create($queueAddItems);
}
}
public function deleteRawdata(ElementEventInterface $e)
{
if (\method_exists($e, 'getArgument')) {
try {
$saveVersionOnly = $e->getArgument('saveVersionOnly');
if ($saveVersionOnly) {
return;
}
} catch (\InvalidArgumentException $exception) {
}
}
$object = $e->getElement();
$dataports = Dataport::getInstance();
$objectType = Service::getElementType($object);
$commandPrefix = '"'.Cli::getPhpCli().'" '.realpath(PIMCORE_PROJECT_ROOT.DIRECTORY_SEPARATOR.'bin'.DIRECTORY_SEPARATOR.'console');
foreach (self::getDataports() as $dataport) {
$sourceConfig = $dataport['sourceconfig'];
$targetConfig = $dataport['targetconfig'];
if (!empty($sourceConfig['autoImport']) && empty($targetConfig['itemClass'])) {
$itemMold = $this->itemMoldBuilder->getItemMoldByClassId($sourceConfig['sourceClass'] ?? null);
if ($object instanceof $itemMold) {
Cli::exec($commandPrefix.' data-director:delete-rawdata --dataport='.$dataport['id'].' --object-id='.$object->getId().' --object-type='.$objectType);
}
}
}
}
private function queueProcessRawData($dataportResourceId, $dataportId) {
if (!isset(self::$processCommandRegistered[$dataportResourceId])) {
self::$processCommandRegistered[$dataportResourceId] = true;
$queue = Queue::getInstance();
$cmd = 'data-director:process ' . $dataportId . ' --dataport-resource-id=' . $dataportResourceId;
register_shutdown_function(
static function () use ($queue, $cmd, $dataportId) {
$queue->create([
'command' => $cmd,
'triggered_by' => 'An item got deleted from raw data -> new result document has to be generated',
'worker_id' => $dataportId
]);
}
);
}
}
private function extractIds($query, &$match) {
$startTerm = ' IN (';
$endTerm = ')';
$start = strpos($query, $startTerm);
if($start === false) {
return null;
}
$end = strpos($query, $endTerm, $start);
if($end === false) {
return null;
}
$match = [
substr($query, $start, $end - $start+strlen($endTerm)),
explode(',', substr($query, $start + strlen($startTerm), $end - $start - strlen($startTerm)))
];
return true;
}
}