支持Enqueue
Enqueue 是一个 MIT 许可的开源项目,它的持续开发完全得益于社区和我们客户的支持。如果您想加入他们,请考虑:
异步事件调度器 (Symfony)
该文档展示了如何在原生 PHP 中设置异步事件调度。 如果您正在寻找在 Symfony 应用中使用它的方法,请阅读这篇文章
安装
您需要请求异步调度器库和受支持的传输之一
$ composer require enqueue/async-event-dispatcher enqueue/fs
配置
<?php
// config.php
use Enqueue\AsyncEventDispatcher\AsyncListener;
use Enqueue\AsyncEventDispatcher\AsyncProcessor;
use Enqueue\AsyncEventDispatcher\PhpSerializerEventTransformer;
use Enqueue\AsyncEventDispatcher\AsyncEventDispatcher;
use Enqueue\AsyncEventDispatcher\SimpleRegistry;
use Enqueue\Fs\FsConnectionFactory;
use Symfony\Component\EventDispatcher\EventDispatcher;
require_once __DIR__.'/vendor/autoload.php';
// 它可以是任何 queue-interop/queue-interop 兼容其它的上下文。
$context = (new FsConnectionFactory('file://'.__DIR__.'/queues'))->createContext();
$eventQueue = $context->createQueue('symfony_events');
$registry = new SimpleRegistry(
['the_event' => 'default'],
['default' => new PhpSerializerEventTransformer($context)]
);
$asyncListener = new AsyncListener($context, $registry, $eventQueue);
$dispatcher = new EventDispatcher();
// 监听器甚至通过MQ作为消息发送
$dispatcher->addListener('the_event', $asyncListener);
$asyncDispatcher = new AsyncEventDispatcher($dispatcher, $asyncListener);
// 监听器在消费端执行
$asyncDispatcher->addListener('the_event', function() {
});
$asyncProcessor = new AsyncProcessor($registry, $asyncDispatcher);
调度事件
<?php
// send.php
use Symfony\Component\EventDispatcher\GenericEvent;
require_once __DIR__.'/vendor/autoload.php';
include __DIR__.'/config.php';
$dispatcher->dispatch('the_event', new GenericEvent('theSubject'));
处理异步事件
<?php
// consume.php
use Interop\Queue\Processor;
require_once __DIR__.'/vendor/autoload.php';
include __DIR__.'/config.php';
$consumer = $context->createConsumer($eventQueue);
while (true) {
if ($message = $consumer->receive(5000)) {
$result = $asyncProcessor->process($message, $context);
switch ((string) $result) {
case Processor::ACK:
$consumer->acknowledge($message);
break;
case Processor::REJECT:
$consumer->reject($message);
break;
case Processor::REQUEUE:
$consumer->reject($message, true);
break;
default:
throw new \LogicException('Result is not supported');
}
}
}