Build production-grade async workflows in Pimcore 11 using Symfony Messenger. Multi-transport config, resilience patterns, and PIM/DAM orchestration.
Event-Driven Architecture in Pimcore: Building Asynchronous Workflow Systems with Symfony Messenger
Every Pimcore developer eventually hits the same wall. You’re importing 50,000 products from an ERP, generating thumbnails for 10,000 assets, or synchronizing data across five e-commerce channels—and the admin interface becomes unresponsive. Users stare at spinning loaders. Timeouts kill long-running operations. The system feels brittle.
The solution isn’t faster hardware. It’s architectural: decouple the triggering of operations from their execution. This is where event-driven architecture shines, and Pimcore’s native integration with Symfony Messenger provides the foundation to build it properly.
This article walks through building production-grade asynchronous workflows in Pimcore. We’ll cover multi-transport configuration, custom event subscribers, resilience patterns, and orchestration strategies for complex PIM/DAM operations. By the end, you’ll have a blueprint for handling massive data operations without blocking users or risking data inconsistency.
Prerequisites
- Pimcore 11.x with Symfony 6.4+
- Working knowledge of Symfony services and dependency injection
- A message broker installed (Redis or RabbitMQ—we’ll cover both)
- Supervisor or systemd for worker process management
- PHP 8.2+ with
redis and amqp extensions
Architecture and Key Concepts
Before diving into code, let’s establish the architecture we’re building:
flowchart TD
subgraph APP["PIMCORE APPLICATION"]
UI[Admin UI\nAPI Call\nCLI Import] --> ES[Event Subscribers]
ES --> MB[Symfony Messenger\nMessage Bus]
end
MB --> T
subgraph T["MESSAGE TRANSPORTS"]
RD["Redis\n── async_fast\n(thumbnails, cache)"]
RMQ["RabbitMQ\n── async_reliable\n(imports, sync)"]
DOC["Doctrine\n── async_fallback\n(when brokers down)"]
end
T --> W
subgraph W["WORKER PROCESSES"]
WA[Asset Worker\nthumbnails]
WI[Import Worker\nbulk data]
WS[Sync Worker\nchannels]
end
W --> DL["PIMCORE DATA LAYER\nDataObjects · Assets · Documents"]
W -->|max retries exceeded| DLQ[Dead Letter Queue]
The routing strategy maps message classes to transports based on their requirements:
flowchart LR
MSG[Message] --> R{Router}
R -->|AssetPostProcessMessage\nSearchBackendMessage| RF[async_fast\nRedis]
R -->|ProcessImportBatchMessage\nChannelSyncMessage| RR[async_reliable\nRabbitMQ]
R -->|fallback| RD[async_fallback\nDoctrine]
RF --> WF[Asset Workers\nhigh throughput]
RR --> WR[Import + Sync Workers\nguaranteed delivery]
RD --> WD[Fallback Workers]
WF & WR & WD -->|unrecoverable| DLQ[failed\nDead Letter Queue]
The key insight: Pimcore fires events for virtually every operation. We intercept those events, dispatch lightweight message objects to queues, and dedicated worker processes handle the heavy lifting asynchronously.
Step-by-Step Implementation
Configuring Multi-Transport Messenger Infrastructure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| # config/packages/messenger.yaml
framework:
messenger:
failure_transport: failed
transports:
async_fast:
dsn: '%env(MESSENGER_TRANSPORT_DSN_REDIS)%'
options:
stream: 'pimcore_fast'
group: 'pimcore_workers'
consumer: 'fast_%env(HOSTNAME)%'
auto_setup: true
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 10000
async_reliable:
dsn: '%env(MESSENGER_TRANSPORT_DSN_AMQP)%'
options:
exchange:
name: 'pimcore_reliable'
type: direct
queues:
pimcore_imports:
binding_keys: ['import']
pimcore_sync:
binding_keys: ['sync']
retry_strategy:
max_retries: 5
delay: 5000
multiplier: 3
max_delay: 300000
async_fallback:
dsn: 'doctrine://default?table_name=messenger_messages'
retry_strategy:
max_retries: 10
delay: 60000
failed:
dsn: 'doctrine://default?table_name=messenger_failed_messages'
routing:
'App\Messenger\Message\Asset\AssetPostProcessMessage': async_fast
'App\Messenger\Message\Import\ProcessImportBatchMessage': async_reliable
'App\Messenger\Message\Sync\ChannelSyncMessage': async_reliable
'Pimcore\Messenger\AssetUpdateTasksMessage': async_fast
'Pimcore\Messenger\SearchBackendMessage': async_fast
|
đź’ˇ Transport selection strategy: Redis excels at high-throughput ephemeral tasks (regenerating a thumbnail can always be retried). RabbitMQ provides delivery guarantees essential for operations with external side effects (sending data to an ERP that doesn’t handle duplicates gracefully).
1
2
3
| # .env
MESSENGER_TRANSPORT_DSN_REDIS=redis://localhost:6379/messages
MESSENGER_TRANSPORT_DSN_AMQP=amqp://guest:guest@localhost:5672/%2f/messages
|
Building Event Subscribers for CRUD Interception
The asset event lifecycle looks like this:
sequenceDiagram
participant U as User / CLI
participant P as Pimcore Core
participant S as AssetWorkflowSubscriber
participant MB as Message Bus
participant W as Asset Worker
U->>P: Upload image (POST_ADD)
P->>S: AssetEvent fired (priority -100)
S->>S: shouldProcessAsync()?
S->>MB: dispatch(AssetPostProcessMessage)\n+ DelayStamp(500ms)
MB-->>S: Envelope accepted
S-->>P: return (< 1ms)
P-->>U: Response (fast)
note over MB,W: 500ms later...
MB->>W: deliver message
W->>W: Asset::getById()
W->>W: generateThumbnails()
W->>W: extractMetadata()
W->>W: extractColors()
W-->>MB: ack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
| <?php
declare(strict_types=1);
namespace App\EventSubscriber;
use App\Messenger\Message\Asset\AssetPostProcessMessage;
use Pimcore\Event\AssetEvents;
use Pimcore\Event\Model\AssetEvent;
use Pimcore\Model\Asset;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Uid\Uuid;
use Psr\Log\LoggerInterface;
final class AssetWorkflowSubscriber implements EventSubscriberInterface
{
private const EXCLUDED_TYPES = ['folder'];
private const ASYNC_THRESHOLD_BYTES = 102400; // 100KB
public function __construct(
private readonly MessageBusInterface $messageBus,
private readonly LoggerInterface $logger
) {}
public static function getSubscribedEvents(): array
{
return [
AssetEvents::POST_ADD => ['onAssetPostAdd', -100],
AssetEvents::POST_UPDATE => ['onAssetPostUpdate', -100],
];
}
public function onAssetPostAdd(AssetEvent $event): void
{
$asset = $event->getAsset();
if (!$this->shouldProcessAsync($asset)) {
return;
}
$message = new AssetPostProcessMessage(
assetId: $asset->getId(),
assetType: $asset->getType(),
operations: $this->determineOperations($asset),
priority: $this->calculatePriority($asset),
correlationId: Uuid::v4()->toRfc4122()
);
// Delay to ensure DB transaction is committed
$this->messageBus->dispatch($message, [new DelayStamp(500)]);
}
public function onAssetPostUpdate(AssetEvent $event): void
{
$asset = $event->getAsset();
if (!$asset->getDataChanged() || !$this->shouldProcessAsync($asset)) {
return;
}
$this->messageBus->dispatch(
new AssetPostProcessMessage(
assetId: $asset->getId(),
assetType: $asset->getType(),
operations: ['thumbnails'],
correlationId: Uuid::v4()->toRfc4122()
),
[new DelayStamp(500)]
);
}
private function shouldProcessAsync(Asset $asset): bool
{
if (in_array($asset->getType(), self::EXCLUDED_TYPES, true)) {
return false;
}
$fileSize = $asset->getFileSize();
return $fileSize === null || $fileSize >= self::ASYNC_THRESHOLD_BYTES;
}
private function determineOperations(Asset $asset): array
{
return match (true) {
$asset instanceof Asset\Image => ['thumbnails', 'metadata', 'color_extraction'],
$asset instanceof Asset\Video => ['thumbnails', 'metadata', 'transcode'],
$asset instanceof Asset\Document => ['preview', 'text_extraction'],
default => [],
};
}
private function calculatePriority(Asset $asset): int
{
$fileSize = $asset->getFileSize() ?? 0;
return match (true) {
$fileSize < 1_048_576 => 10, // < 1MB
$fileSize < 10_485_760 => 5, // < 10MB
default => 0,
};
}
}
|
⚠️ The 500ms DelayStamp is critical. Without it, your handler may try to load an asset before its database transaction commits. This causes intermittent “Asset not found” errors that are very hard to reproduce.
Message Handler with Resilience Patterns
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
| <?php
declare(strict_types=1);
namespace App\Messenger\Handler\Asset;
use App\Messenger\Message\Asset\AssetPostProcessMessage;
use Pimcore\Model\Asset;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Doctrine\DBAL\Connection;
use Psr\Log\LoggerInterface;
#[AsMessageHandler]
final class AssetPostProcessHandler
{
public function __construct(
private readonly LoggerInterface $logger,
private readonly Connection $connection,
) {}
public function __invoke(AssetPostProcessMessage $message): void
{
// Always clear runtime cache in workers to avoid stale data
\Pimcore\Cache\RuntimeCache::clear();
$asset = Asset::getById($message->getAssetId());
if ($asset === null) {
if ($this->assetWasDeleted($message->getAssetId())) {
return; // Silently skip deleted assets
}
// Transaction not yet committed — retry
throw new RecoverableMessageHandlingException(
sprintf('Asset %d not found, retrying', $message->getAssetId())
);
}
try {
foreach ($message->getOperations() as $operation) {
$this->executeOperation($asset, $operation);
}
} catch (\ImagickException $e) {
// Corrupt/unsupported file — no point retrying
throw new UnrecoverableMessageHandlingException(
sprintf('Unrecoverable image error for asset %d: %s', $asset->getId(), $e->getMessage()),
previous: $e
);
} catch (\RuntimeException $e) {
// Transient error — retry
throw new RecoverableMessageHandlingException(
sprintf('Transient error for asset %d, will retry: %s', $asset->getId(), $e->getMessage()),
previous: $e
);
}
}
private function executeOperation(Asset $asset, string $operation): void
{
match ($operation) {
'thumbnails' => $this->generateThumbnails($asset),
'metadata' => $this->extractMetadata($asset),
'color_extraction'=> $this->extractColors($asset),
'preview' => $this->generatePreview($asset),
'text_extraction' => $this->extractText($asset),
'transcode' => $this->transcodeVideo($asset),
default => $this->logger->warning('Unknown operation', ['op' => $operation]),
};
}
private function generateThumbnails(Asset $asset): void
{
if (!$asset instanceof Asset\Image) return;
foreach (['product_thumb', 'product_large', 'og_image'] as $config) {
$asset->getThumbnail($config, false);
}
}
private function extractColors(Asset $asset): void
{
if (!$asset instanceof Asset\Image) return;
$imagePath = $asset->getLocalFile();
if (!$imagePath || !file_exists($imagePath)) return;
$imagick = new \Imagick($imagePath);
$imagick->quantizeImage(5, \Imagick::COLORSPACE_RGB, 0, false, false);
$imagick->uniqueImageColors();
$colors = [];
for ($i = 0; $i < min($imagick->getImageWidth(), 5); $i++) {
$colors[] = $imagick->getImagePixelColor($i, 0)->getColor();
}
$imagick->destroy();
$settings = $asset->getCustomSettings() ?? [];
$settings['dominantColors'] = $colors;
$asset->setCustomSettings($settings);
$asset->save();
}
private function extractMetadata(Asset $asset): void { $asset->getMetadata(); }
private function generatePreview(Asset $asset): void { /* pdftotext / Imagick */ }
private function extractText(Asset $asset): void { /* Smalot\PdfParser */ }
private function transcodeVideo(Asset $asset): void { /* FFmpeg */ }
private function assetWasDeleted(int $assetId): bool
{
return $this->connection->fetchOne('SELECT id FROM assets WHERE id = ?', [$assetId]) === false;
}
}
|
Production Configuration
Worker supervision with Supervisord:
1
2
3
4
5
6
7
8
9
10
11
12
13
| [program:messenger_fast]
command=php bin/console messenger:consume async_fast --limit=100 --memory-limit=256M --time-limit=3600
numprocs=4
autostart=true
autorestart=true
stderr_logfile=/var/log/supervisor/messenger_fast.err.log
[program:messenger_reliable]
command=php bin/console messenger:consume async_reliable --limit=50 --memory-limit=512M --time-limit=3600
numprocs=2
autostart=true
autorestart=true
stderr_logfile=/var/log/supervisor/messenger_reliable.err.log
|
Common Mistakes and Troubleshooting
⚠️ Never pass Pimcore entities in messages. Pimcore uses Doctrine proxies and lazy loading — serialization will either fail or produce stale objects. Always pass the entity ID and reload inside the handler.
⚠️ Always clear RuntimeCache in handlers. Workers are long-running processes. Without RuntimeCache::clear(), you’ll process stale data from the first message on every subsequent one.
đź’ˇ Monitor your queues actively. Use messenger:stats and messenger:failed:show in production. A growing failed queue is an early warning system.
With this architecture, a 500,000-product import transforms from:
- Before: 8 hours, server unresponsive, admin unusable
- After: 45 minutes with 4 parallel workers, admin fully operational throughout
The prefetch_count: 5 on RabbitMQ is key — workers take small batches, naturally distributing load without thundering herd on startup.
Conclusion and Next Steps
You now have a production-ready async workflow foundation for Pimcore that scales horizontally and handles failures gracefully. The key principles:
- Use separate transports for different reliability/throughput requirements
- Never serialize Pimcore entities — IDs only
RecoverableMessageHandlingException vs UnrecoverableMessageHandlingException determines retry behavior- Always
RuntimeCache::clear() at the start of every handler
Next steps to explore: