支持Enqueue
Enqueue 是一个 MIT 许可的开源项目,它的持续开发完全得益于社区和我们客户的支持。如果您想加入他们,请考虑:
消息处理器
基础
消息处理器是一个实际处理消息的对象,必须返回一个结果状态。下面是例子:
<?php
use Interop\Queue\Processor;
use Interop\Queue\Message;
use Interop\Queue\Context;
class SendMailProcessor implements Processor
{
public function process(Message $message, Context $context)
{
$this->mailer->send('foo@example.com', $message->getBody());
return self::ACK;
}
}
通过返回self::ACK
,处理器将告诉代理该消息已被正确处理。
还有其他状态:
self::ACK
- 当消息被成功处理并且消息可以从队列中移除时使用这个常量。self::REJECT
- 当消息无效或无法处理时使用此常量。消息将从队列中删除。self::REQUEUE
- 当消息无效或无法立即处理,但我们需要稍后再试时,使用此常量。
查看下一个示例,该示例展示了发送邮件之前的消息验证。如果该消息无效,则处理器将拒绝它。
<?php
use Interop\Queue\Processor;
use Interop\Queue\Message;
use Interop\Queue\Context;
use Enqueue\Util\JSON;
class SendMailProcessor implements Processor
{
public function process(Message $message, Context $context)
{
$data = JSON::decode($message->getBody());
if ($user = $this->userRepository->find($data['userId'])) {
return self::REJECT;
}
$this->mailer->send($user->getEmail(), $data['text']);
return self::ACK;
}
}
有一个 isRedelivered
方法可以查明消息先前是否失败。 如果它返回 true
则尝试处理该消息。
<?php
use Interop\Queue\Processor;
use Interop\Queue\Message;
use Interop\Queue\Context;
class SendMailProcessor implements Processor
{
public function process(Message $message, Context $context)
{
if ($message->isRedelivered()) {
return self::REQUEUE;
}
$this->mailer->send('foo@example.com', $message->getBody());
return self::ACK;
}
}
第二个参数是你的上下文。您可以使用它向其他队列\主题发送消息。
<?php
use Interop\Queue\Processor;
use Interop\Queue\Message;
use Interop\Queue\Context;
class SendMailProcessor implements Processor
{
public function process(Message $message, Context $context)
{
$this->mailer->send('foo@example.com', $message->getBody());
$queue = $context->createQueue('anotherQueue');
$message = $context->createMessage('Message has been sent');
$context->createProducer()->send($queue, $message);
return self::ACK;
}
}
答复结果
消费组件提供了一些有用的扩展,例如有一个使处理 RPC 更简单的扩展。 生产者可能会等待消费者的回复,并且为了发送它,处理器必须返回答复结果。 不要忘记添加 ReplyExtension
。
<?php
use Interop\Queue\Processor;
use Interop\Queue\Message;
use Interop\Queue\Context;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\Result;
class SendMailProcessor implements Processor
{
public function process(Message $message, Context $context)
{
$this->mailer->send('foo@example.com', $message->getBody());
$replyMessage = $context->createMessage('Message has been sent');
return Result::reply($replyMessage);
}
}
/** @var \Interop\Queue\Context $context */
$queueConsumer = new QueueConsumer($context, new ChainExtension([
new ReplyExtension()
]));
$queueConsumer->bind('foo', new SendMailProcessor());
$queueConsumer->consume();
关于异常
建议不要捕获异常并快速失败。 还可以考虑使用 Supervisord 或类似的进程管理器来重新启动已退出的消费者。
尽管建议失败,但在某些情况下您可能希望捕获异常。
- 消息验证器对无效消息抛出异常。最好抓住它并返回
REJECT
。 - 某些传输(Doctrine DBAL、Filesystem、Redis)确实注意到一个错误,因此将无法重新传递消息。该消息已完全丢失。您可能希望捕获异常以正确重新传递\重新入队该消息
示例
随意贡献你自己的示例。