Architettura Event-Driven Pimcore con Symfony Messenger

Post at — 2026-03-20
#pimcore #symfony-messenger #event-driven-architecture #cms #advanced-tutorial #italiano

Guida avanzata per implementare workflow asincroni in Pimcore con Symfony Messenger. Gestisci importazioni massive e operazioni distribuite senza bloccare il sistema.

Architettura Event-Driven in Pimcore: Implementare un Sistema di Workflow Asincrono con Symfony Messenger

Chiunque abbia gestito un’istanza Pimcore in produzione con centinaia di migliaia di DataObjects e Assets sa cosa significa vedere l’interfaccia admin bloccarsi durante un’importazione massiva. O peggio, ricevere una chiamata alle 3 di notte perchΓ© il cron di sincronizzazione con l’ERP ha saturato la memoria del server.

La soluzione non Γ¨ “comprare piΓΉ RAM”. La soluzione Γ¨ ripensare l’architettura: spostare le operazioni pesanti fuori dal ciclo request-response e orchestrarle in modo asincrono. Pimcore, essendo costruito su Symfony, ci offre Messenger come strumento nativo per farlo. Configurarlo correttamente per un contesto PIM/DAM enterprise richiede perΓ² una comprensione profonda sia di Messenger che delle peculiaritΓ  di Pimcore.

In questo articolo costruiremo un sistema di workflow asincrono completo, partendo dalla configurazione dei transport fino all’implementazione del Saga Pattern per operazioni distribuite. Vedremo come gestire importazioni di 500.000 prodotti senza impattare gli utenti, come orchestrare la generazione di thumbnail su piΓΉ worker e come implementare retry intelligenti che non trasformino un problema temporaneo in un disastro.

Prerequisiti

  • Pimcore 11.x con PHP 8.2+
  • Conoscenza solida di Symfony (service container, eventi, console commands)
  • Redis 7+ o RabbitMQ 3.12+ installato e configurato
  • FamiliaritΓ  con i concetti di message queue (producer, consumer, acknowledgment)
  • Docker per l’ambiente di sviluppo (opzionale ma consigliato)

Assumo che tu abbia giΓ  un’installazione Pimcore funzionante. Se stai partendo da zero, la documentazione ufficiale copre l’installazione base.

Architettura e Concetti Chiave

Prima di scrivere codice, capiamo cosa stiamo costruendo:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                              PIMCORE APPLICATION                            β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚   HTTP       β”‚     β”‚  Event           β”‚     β”‚   Message            β”‚    β”‚
β”‚  β”‚   Request    │────▢│  Dispatcher      │────▢│   Bus                β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                                            β”‚                β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚                β”‚
β”‚  β”‚   Admin UI   │────▢│  Pimcore Events  │─────────────────                β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚  (postUpdate,    β”‚                β”‚                β”‚
β”‚                       β”‚   postAdd, etc)  β”‚                β”‚                β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚                β”‚
β”‚  β”‚   CLI/Cron   β”‚β”€οΏ½οΏ½β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                                          β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                       β”‚
                                       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                            MESSAGE TRANSPORTS                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€οΏ½οΏ½οΏ½β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚ Redis Stream    β”‚  β”‚ RabbitMQ        β”‚  β”‚ Doctrine (fallback)        β”‚ β”‚
β”‚  β”‚ ───────────────│  β”‚ ─────────────── β”‚  β”‚ ─────────────────────────  β”‚ β”‚
β”‚  β”‚ β€’ async_high   β”‚  β”‚ β€’ async_bulk    β”‚  β”‚ β€’ async_low                β”‚ β”‚
β”‚  β”‚ β€’ asset_proc   β”‚  β”‚ β€’ sync_external β”‚  β”‚ (per ambienti senza Redis) β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                       β”‚
                                       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€οΏ½οΏ½οΏ½β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                              WORKER POOL                                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  Worker 1   β”‚  β”‚  Worker 2   β”‚  β”‚  Worker 3   β”‚  β”‚  Worker N       β”‚    β”‚
β”‚  β”‚  (high pri) β”‚  β”‚  (high pri) β”‚  β”‚  (bulk)     β”‚  β”‚  (asset proc)   β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                                                             β”‚
β”‚                         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                            β”‚
β”‚                         β”‚   Dead Letter Queue  β”‚                            β”‚
β”‚                         β”‚   + Monitoring       β”‚                            β”‚
β”‚                         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                            β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

I concetti fondamentali:

  1. Message: un oggetto serializzabile che rappresenta un’operazione da eseguire (es. ProcessProductImportMessage)
  2. Handler: la classe che esegue l’operazione quando il messaggio viene consumato
  3. Transport: il sistema di storage/delivery dei messaggi (Redis, RabbitMQ, Doctrine)
  4. Worker: il processo che consuma i messaggi da un transport e li passa agli handler
  5. Envelope: il wrapper che contiene il messaggio piΓΉ i metadata (stamps)

Implementazione Passo-Passo

Configurazione Multi-Transport per Scenari Pimcore

La configurazione di default di Messenger non Γ¨ adatta a un PIM enterprise. Dobbiamo definire transport separati per tipologie di operazioni diverse, con caratteristiche di performance e resilienza specifiche.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# config/packages/messenger.yaml
framework:
    messenger:
        # Serializzatore personalizzato per gestire entitΓ  Pimcore
        default_serializer: App\Messenger\Serializer\PimcoreMessageSerializer
        
        # Configurazione failure transport globale
        failure_transport: failed
        
        transports:
            # Transport ad alta prioritΓ  per operazioni critiche
            # Usa Redis Streams per bassa latenza
            async_high:
                dsn: '%env(REDIS_MESSENGER_DSN)%'
                options:
                    stream: pimcore_high_priority
                    group: pimcore_workers
                    consumer: '%env(HOSTNAME)%'
                    auto_setup: true
                    # Claim messaggi orfani dopo 5 minuti
                    claim_interval: 300000
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 2
                    max_delay: 30000
            
            # Transport per operazioni bulk (importazioni massive)
            # Usa RabbitMQ per migliore gestione della backpressure
            async_bulk:
                dsn: '%env(RABBITMQ_MESSENGER_DSN)%'
                options:
                    exchange:
                        name: pimcore_bulk
                        type: direct
                    queues:
                        pimcore_bulk_queue:
                            binding_keys: [bulk]
                    # Prefetch basso per distribuire il carico
                    prefetch_count: 5
                retry_strategy:
                    max_retries: 5
                    delay: 5000
                    multiplier: 3
                    max_delay: 3600000 # 1 ora max tra retry
            
            # Transport per elaborazione asset (thumbnail, video encoding)
            asset_processing:
                dsn: '%env(REDIS_MESSENGER_DSN)%'
                options:
                    stream: pimcore_assets
                    group: asset_workers
                    consumer: '%env(HOSTNAME)%'
                retry_strategy:
                    max_retries: 2
                    delay: 10000
                    multiplier: 2
            
            # Transport per sincronizzazione con sistemi esterni
            sync_external:
                dsn: '%env(RABBITMQ_MESSENGER_DSN)%'
                options:
                    exchange:
                        name: pimcore_sync
                        type: topic
                    queues:
                        erp_sync:
                            binding_keys: ['sync.erp.*']
                        ecommerce_sync:
                            binding_keys: ['sync.ecommerce.*']
                retry_strategy:
                    max_retries: 10
                    delay: 60000
                    multiplier: 2
                    max_delay: 7200000 # 2 ore max
            
            # Dead letter queue per messaggi falliti
            failed:
                dsn: 'doctrine://default?queue_name=failed_messages'
            
            # Transport sincrono per test
            sync:
                dsn: 'sync://'
        
        routing:
            # Routing basato sulla classe del messaggio
            'App\Message\HighPriority\*': async_high
            'App\Message\Bulk\*': async_bulk
            'App\Message\Asset\*': asset_processing
            'App\Message\Sync\*': sync_external
            # Default per messaggi non categorizzati
            '*': async_high

⚠️ Attenzione: non usare mai sync:// in produzione per operazioni pesanti. È utile solo per test e debugging.

πŸ’‘ Suggerimento: il claim_interval di Redis Streams Γ¨ fondamentale. Se un worker crasha, i messaggi “pending” vengono reclamati da altri worker dopo questo intervallo. Impostalo in base al tempo massimo previsto per l’elaborazione di un messaggio.

Configurazione delle variabili d’ambiente:

1
2
3
# .env.local
REDIS_MESSENGER_DSN=redis://localhost:6379/messages?serializer=1
RABBITMQ_MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages

Event Subscribers per Intercettare Operazioni CRUD

Il cuore di un sistema event-driven in Pimcore sono gli Event Subscribers che reagiscono alle operazioni su DataObjects e Assets. L’obiettivo Γ¨ intercettare le modifiche e dispatchare messaggi invece di eseguire operazioni sincrone.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
<?php
// src/EventSubscriber/DataObjectEventSubscriber.php

declare(strict_types=1);

namespace App\EventSubscriber;

use App\Message\Bulk\ReindexProductMessage;
use App\Message\Bulk\ResyncCategoryProductsMessage;
use App\Message\HighPriority\InvalidateCacheMessage;
use App\Message\Sync\SyncToErpMessage;
use App\Message\Sync\SyncToEcommerceMessage;
use Pimcore\Event\DataObjectEvents;
use Pimcore\Event\Model\DataObjectEvent;
use Pimcore\Model\DataObject\AbstractObject;
use Pimcore\Model\DataObject\Product;
use Pimcore\Model\DataObject\Category;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportNamesStamp;

final class DataObjectEventSubscriber implements EventSubscriberInterface
{
    // Flag per prevenire loop infiniti durante operazioni batch
    private static bool $processingBatch = false;
    
    public function __construct(
        private readonly MessageBusInterface $messageBus,
        private readonly LoggerInterface $logger,
    ) {}
    
    public static function getSubscribedEvents(): array
    {
        return [
            DataObjectEvents::POST_UPDATE => ['onPostUpdate', 0],
            DataObjectEvents::POST_ADD => ['onPostAdd', 0],
            DataObjectEvents::POST_DELETE => ['onPostDelete', 0],
            // Evento custom per fine batch
            'pimcore.dataobject.batch.complete' => ['onBatchComplete', -100],
        ];
    }
    
    public function onPostUpdate(DataObjectEvent $event): void
    {
        $object = $event->getObject();
        
        // Ignora se siamo dentro un'operazione batch o se Γ¨ una versione draft
        if (self::$processingBatch || !$object->isPublished()) {
            return;
        }
        
        // Ignora modifiche automatiche da Pimcore (es. saveVersion internals)
        if ($this->isInternalUpdate($event)) {
            return;
        }
        
        $this->dispatchForObject($object, 'update');
    }
    
    public function onPostAdd(DataObjectEvent $event): void
    {
        $object = $event->getObject();
        
        if (self::$processingBatch || !$object->isPublished()) {
            return;
        }
        
        $this->dispatchForObject($object, 'create');
    }
    
    public function onPostDelete(DataObjectEvent $event): void
    {
        $object = $event->getObject();
        
        // Per delete processiamo anche se siamo in batch,
        // ma con prioritΓ  diversa
        $this->dispatchForObject($object, 'delete', self::$processingBatch);
    }
    
    public function onBatchComplete(): void
    {
        self::$processingBatch = false;
    }
    
    private function dispatchForObject(
        AbstractObject $object, 
        string $operation,
        bool $lowPriority = false
    ): void {
        $objectId = $object->getId();
        $className = $object->getClassName();
        
        $this->logger->info('Dispatching messages for object', [
            'id' => $objectId,
            'class' => $className,
            'operation' => $operation,
        ]);
        
        // Strategia di dispatch basata sul tipo di oggetto
        match (true) {
            $object instanceof Product => $this->handleProductChange($object, $operation, $lowPriority),
            $object instanceof Category => $this->handleCategoryChange($object, $operation, $lowPriority),
            default => $this->handleGenericChange($object, $operation),
        };
    }
    
    private function handleProductChange(
        Product $product, 
        string $operation,
        bool $lowPriority
    ): void {
        $stamps = [];
        
        // In low priority (durante batch), aggiungiamo delay per non sovraccaricare
        if ($lowPriority) {
            $stamps[] = new DelayStamp(5000); // 5 secondi di delay
        }
        
        // 1. Invalida cache immediatamente (alta prioritΓ )
        $this->messageBus->dispatch(
            new InvalidateCacheMessage(
                objectId: $product->getId(),
                objectType: 'Product',
                cacheKeys: $this->buildCacheKeys($product),
            ),
            [new TransportNamesStamp(['async_high'])]
        );
        
        // 2. Reindicizza per search (puΓ² aspettare)
        $this->messageBus->dispatch(
            new ReindexProductMessage(
                productId: $product->getId(),
                operation: $operation,
                changedFields: $this->detectChangedFields($product),
            ),
            array_merge($stamps, [new TransportNamesStamp(['async_bulk'])])
        );
        
        // 3. Sincronizza con ERP (se configurato e non in delete)
        if ($operation !== 'delete' && $product->getSyncToErp()) {
            $this->messageBus->dispatch(
                new SyncToErpMessage(
                    productId: $product->getId(),
                    sku: $product->getSku(),
                    operation: $operation,
                ),
                [new TransportNamesStamp(['sync_external'])]
            );
        }
        
        // 4. Sincronizza con e-commerce
        if ($product->getPublishToWeb()) {
            $this->messageBus->dispatch(
                new SyncToEcommerceMessage(
                    productId: $product->getId(),
                    channels: $product->getSalesChannels() ?? ['default'],
                    operation: $operation,
                ),
                [
                    new TransportNamesStamp(['sync_external']),
                    // Delay per dare tempo alla cache di invalidarsi
                    new DelayStamp(2000),
                ]
            );
        }
    }
    
    private function handleCategoryChange(
        Category $category, 
        string $operation, 
        bool $lowPriority
    ): void {
        // Una modifica categoria impatta tutti i prodotti figli
        $this->messageBus->dispatch(
            new InvalidateCacheMessage(
                objectId: $category->getId(),
                objectType: 'Category',
                cacheKeys: ["category_{$category->getId()}", "category_tree"],
                propagateToChildren: true,
            )
        );
        
        // Per operazioni su categorie, schedula re-sync di tutti i prodotti
        // con delay significativo per evitare thundering herd
        if ($operation === 'update') {
            $this->messageBus->dispatch(
                new ResyncCategoryProductsMessage(
                    categoryId: $category->getId(),
                    recursive: true,
                ),
                [
                    new TransportNamesStamp(['async_bulk']),
                    new DelayStamp(30000), // 30 secondi
                ]
            );
        }
    }
    
    private function handleGenericChange(AbstractObject $object, string $operation): void
    {
        // Fallback per altri tipi di oggetto
        $this->messageBus->dispatch(
            new InvalidateCacheMessage(
                objectId: $object->getId(),
                objectType: $object->getClassName(),
                cacheKeys: [strtolower($object->getClassName()) . "_{$object->getId()}"],
            )
        );
    }
    
    /**
     * Abilita modalitΓ  batch per importazioni massive.
     * Chiamato esternamente prima di operazioni bulk.
     */
    public static function enableBatchMode(): void
    {
        self::$processingBatch = true;
    }
    
    public static function disableBatchMode(): void
    {
        self::$processingBatch = false;
    }
    
    private function isInternalUpdate(DataObjectEvent $event): bool
    {
        $arguments = $event->getArguments();
        return isset($arguments['isAutoSave']) && $arguments['isAutoSave'] === true;
    }
    
    private function buildCacheKeys(Product $product): array
    {
        $keys = [
            "product_{$product->getId()}",
            "product_sku_{$product->getSku()}",
        ];
        
        // Aggiungi chiavi per categorie associate
        foreach ($product->getCategories() ?? [] as $category) {
            $keys[] = "category_{$category->getId()}_products";
        }
        
        return $keys;
    }
    
    private function detectChangedFields(Product $product): array
    {
        // Pimcore mantiene i valori originali in memoria.
        // In produzione useresti un servizio dedicato che traccia le modifiche.
        return [];
    }
}

πŸ“ Nota: il flag $processingBatch Γ¨ statico intenzionalmente. Durante importazioni massive, vogliamo prevenire la generazione di migliaia di messaggi individuali. Alla fine del batch, genereremo un singolo messaggio di “reconciliation”.

Ecco un esempio di messaggio ben strutturato:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<?php
// src/Message/Sync/SyncToErpMessage.php

declare(strict_types=1);

namespace App\Message\Sync;

use DateTimeImmutable;

/**
 * Messaggio per sincronizzazione prodotto verso ERP.
 * 
 * Immutabile e serializzabile. Contiene solo ID e dati
 * necessari per identificare l'operazione, non l'intero oggetto.
 */
final readonly class SyncToErpMessage
{
    public function __construct(
        public int $productId,
        public string $sku,
        public string $operation, // 'create', 'update', 'delete'
        public ?string $correlationId = null, // Per tracciare saga
        public int $retryCount = 0,
        public ?DateTimeImmutable $originalTimestamp = null,
    ) {}
    
    /**
     * Factory per creare messaggio di retry con contesto preservato.
     */
    public function forRetry(): self
    {
        return new self(
            productId: $this->productId,
            sku: $this->sku,
            operation: $this->operation,
            correlationId: $this->correlationId,
            retryCount: $this->retryCount + 1,
            originalTimestamp: $this->originalTimestamp ?? new DateTimeImmutable(),
        );
    }
    
    /**
     * Identifica univocamente questa operazione per deduplicazione.
     */
    public function getDeduplicationKey(): string
    {
        return sprintf('erp_sync_%s_%s', $this->sku, $this->operation);
    }
}

Saga Pattern per Workflow Complessi

Le operazioni reali in un PIM/DAM non sono mai atomiche. Un’importazione prodotto tipica coinvolge: creazione DataObject, upload asset, generazione thumbnail, sincronizzazione multi-canale, indicizzazione search. Se uno step fallisce a metΓ , dobbiamo gestire il rollback.

Il Saga Pattern ci permette di orchestrare queste operazioni come una sequenza di step con compensating transactions.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
<?php
// src/Saga/ProductPublishSaga.php

declare(strict_types=1);

namespace App\Saga;

use App\Message\Asset\GenerateThumbnailsMessage;
use App\Message\Bulk\ReindexProductMessage;
use App\Message\Sync\SyncToEcommerceMessage;
use App\Saga\State\ProductPublishState;
use Psr\Log\LoggerInterface;
use Symfony