Architettura Event-Driven in Pimcore: Implementare un Sistema di Workflow Asincrono con Symfony Messenger
Chiunque abbia gestito un’istanza Pimcore in produzione con centinaia di migliaia di DataObjects e Assets sa cosa significa vedere l’interfaccia admin bloccarsi durante un’importazione massiva. O peggio, ricevere una chiamata alle 3 di notte perché il cron di sincronizzazione con l’ERP ha saturato la memoria del server.
La soluzione non è “comprare più RAM”. La soluzione è ripensare l’architettura: spostare le operazioni pesanti fuori dal ciclo request-response e orchestrarle in modo asincrono. Pimcore, essendo costruito su Symfony, ci offre Messenger come strumento nativo per farlo. Configurarlo correttamente per un contesto PIM/DAM enterprise richiede però una comprensione profonda sia di Messenger che delle peculiarità di Pimcore.
In questo articolo costruiremo un sistema di workflow asincrono completo, partendo dalla configurazione dei transport fino all’implementazione del Saga Pattern per operazioni distribuite. Vedremo come gestire importazioni di 500.000 prodotti senza impattare gli utenti, come orchestrare la generazione di thumbnail su più worker e come implementare retry intelligenti che non trasformino un problema temporaneo in un disastro.
Prerequisiti
- Pimcore 11.x con PHP 8.2+
- Conoscenza solida di Symfony (service container, eventi, console commands)
- Redis 7+ o RabbitMQ 3.12+ installato e configurato
- Familiarità con i concetti di message queue (producer, consumer, acknowledgment)
- Docker per l’ambiente di sviluppo (opzionale ma consigliato)
Assumo che tu abbia già un’installazione Pimcore funzionante. Se stai partendo da zero, la documentazione ufficiale copre l’installazione base.
Architettura e Concetti Chiave
Prima di scrivere codice, capiamo cosa stiamo costruendo:
flowchart TD
subgraph APP["PIMCORE APPLICATION"]
HR[HTTP Request] --> ED[Event Dispatcher]
UI[Admin UI] --> PE[Pimcore Events\npostUpdate / postAdd]
CLI[CLI / Cron] --> MB
ED --> MB[Message Bus]
PE --> MB
end
MB --> T
subgraph T["MESSAGE TRANSPORTS"]
RDS["Redis Streams\n── async_high\n── asset_processing"]
RMQ["RabbitMQ\n── async_bulk\n── sync_external"]
DOC["Doctrine\n── async_low\n(fallback)"]
end
T --> W
subgraph W["WORKER POOL"]
W1[Worker 1\nhigh priority]
W2[Worker 2\nhigh priority]
W3[Worker 3\nbulk]
WN[Worker N\nasset proc]
DLQ[Dead Letter Queue\n+ Monitoring]
end
I concetti fondamentali:
- Message: un oggetto serializzabile che rappresenta un’operazione da eseguire (es.
ProcessProductImportMessage) - Handler: la classe che esegue l’operazione quando il messaggio viene consumato
- Transport: il sistema di storage/delivery dei messaggi (Redis, RabbitMQ, Doctrine)
- Worker: il processo che consuma i messaggi da un transport e li passa agli handler
- Envelope: il wrapper che contiene il messaggio più i metadata (stamps)
Il routing dei messaggi verso i transport avviene in base alla classe del messaggio:
flowchart LR
MSG[Message] --> R{Router}
R -->|HighPriority/*| RH[async_high\nRedis Stream]
R -->|Bulk/*| RB[async_bulk\nRabbitMQ]
R -->|Asset/*| RA[asset_processing\nRedis Stream]
R -->|Sync/*| RS[sync_external\nRabbitMQ Topic]
R -->|"*"| RH
RH --> W1[Worker Pool\nHigh]
RB --> W2[Worker Pool\nBulk]
RA --> W3[Worker Pool\nAsset]
RS --> W4[Worker Pool\nSync]
W1 & W2 & W3 & W4 -->|max retries| DLQ[Dead Letter\nQueue]
Implementazione Passo-Passo
Configurazione Multi-Transport per Scenari Pimcore
La configurazione di default di Messenger non è adatta a un PIM enterprise. Dobbiamo definire transport separati per tipologie di operazioni diverse, con caratteristiche di performance e resilienza specifiche.
| |
⚠️ Attenzione: non usare mai
sync://in produzione per operazioni pesanti. È utile solo per test e debugging.
💡 Suggerimento: il
claim_intervaldi Redis Streams è fondamentale. Se un worker crasha, i messaggi “pending” vengono reclamati da altri worker dopo questo intervallo. Impostalo in base al tempo massimo previsto per l’elaborazione di un messaggio.
| |
Event Subscribers per Intercettare Operazioni CRUD
Il cuore di un sistema event-driven in Pimcore sono gli Event Subscribers che reagiscono alle operazioni su DataObjects e Assets.
| |
📝 Nota: il flag
$processingBatchè statico intenzionalmente. Durante importazioni massive, vogliamo prevenire la generazione di migliaia di messaggi individuali.
Saga Pattern per Workflow Complessi
Le operazioni reali in un PIM/DAM non sono mai atomiche. Un’importazione prodotto tipica coinvolge più step interdipendenti:
sequenceDiagram
participant C as CLI Command
participant S as ProductPublishSaga
participant MB as Message Bus
participant H1 as CreateObjectHandler
participant H2 as UploadAssetHandler
participant H3 as GenerateThumbnailHandler
participant H4 as SyncEcommerceHandler
C->>S: startSaga(importData)
S->>MB: dispatch(CreateProductMessage)
MB->>H1: handle()
H1-->>S: ProductCreatedEvent
S->>MB: dispatch(UploadAssetMessage)
MB->>H2: handle()
H2-->>S: AssetUploadedEvent
S->>MB: dispatch(GenerateThumbnailMessage)
MB->>H3: handle()
H3-->>S: ThumbnailReadyEvent
S->>MB: dispatch(SyncToEcommerceMessage)
MB->>H4: handle()
H4-->>S: SyncCompletedEvent
S-->>C: sagaCompleted
note over S,H4: Se uno step fallisce → compensating transactions
| |
Configurazione per Produzione
| |
| |
Errori Comuni e Troubleshooting
⚠️ Loop infiniti: se il tuo handler modifica un DataObject, scatena un nuovo evento, che dispatcha un nuovo messaggio, che viene consumato dall’handler… hai un loop. Usa sempre
$processingBatcho un flag per identificare operazioni “interne”.
⚠️ Serializzazione di entità Pimcore: non mettere mai un
DataObjectdirettamente nel messaggio. Pimcore usa lazy loading e proxy Doctrine — la serializzazione fallirà o produrrà oggetti inconsistenti. Passa sempre solo l’ID.
💡 Monitoring: usa
messenger:statsemessenger:failed:showper monitorare code e messaggi falliti in produzione.
Performance e Scalabilità
Con questa architettura, un’importazione di 500.000 prodotti diventa:
- Prima: 8 ore, server bloccato, admin inutilizzabile
- Dopo: 45 minuti con 4 worker paralleli, admin completamente funzionante durante l’import
Il segreto è il prefetch_count: 5 su RabbitMQ — ogni worker prende solo 5 messaggi alla volta, distribuendo naturalmente il carico senza thundering herd.
Conclusioni e Next Steps
Abbiamo costruito un sistema di workflow asincrono production-ready per Pimcore che scala orizzontalmente e gestisce i fallimenti in modo resiliente. I concetti chiave da ricordare:
- Usa transport separati per priorità diverse
- Non serializzare entità Pimcore — solo ID
- Il Saga Pattern è la soluzione per workflow multi-step
enableBatchMode()prima di qualsiasi importazione massiva
Come next steps, esplora:
- Symfony Messenger docs
- Pimcore Event System
- Worker autoscaling con Kubernetes HPA basato sulla lunghezza della coda