Link Search Menu Expand Document

支持Enqueue

Enqueue 是一个 MIT 许可的开源项目,它的持续开发完全得益于社区和我们客户的支持。如果您想加入他们,请考虑:


消息处理器

基础

消息处理器是一个实际处理消息的对象,必须返回一个结果状态。下面是例子:

<?php
use Interop\Queue\Processor;
use Interop\Queue\Message;
use Interop\Queue\Context;

class SendMailProcessor implements Processor
{
    public function process(Message $message, Context $context)
    {
        $this->mailer->send('foo@example.com', $message->getBody());

        return self::ACK;
    }
}

通过返回self::ACK,处理器将告诉代理该消息已被正确处理。

还有其他状态:

  • self::ACK - 当消息被成功处理并且消息可以从队列中移除时使用这个常量。
  • self::REJECT - 当消息无效或无法处理时使用此常量。消息将从队列中删除。
  • self::REQUEUE - 当消息无效或无法立即处理,但我们需要稍后再试时,使用此常量。

查看下一个示例,该示例展示了发送邮件之前的消息验证。如果该消息无效,则处理器将拒绝它。

<?php
use Interop\Queue\Processor;
use Interop\Queue\Message;
use Interop\Queue\Context;
use Enqueue\Util\JSON;

class SendMailProcessor implements Processor
{
    public function process(Message $message, Context $context)
    {
        $data = JSON::decode($message->getBody());
        if ($user  = $this->userRepository->find($data['userId'])) {
            return self::REJECT;
        }

        $this->mailer->send($user->getEmail(), $data['text']);

        return self::ACK;
    }
}

有一个 isRedelivered 方法可以查明消息先前是否失败。 如果它返回 true 则尝试处理该消息。

<?php
use Interop\Queue\Processor;
use Interop\Queue\Message;
use Interop\Queue\Context;

class SendMailProcessor implements Processor
{
    public function process(Message $message, Context $context)
    {
        if ($message->isRedelivered()) {
            return self::REQUEUE;
        }

        $this->mailer->send('foo@example.com', $message->getBody());

        return self::ACK;
    }
}

第二个参数是你的上下文。您可以使用它向其他队列\主题发送消息。

<?php
use Interop\Queue\Processor;
use Interop\Queue\Message;
use Interop\Queue\Context;

class SendMailProcessor implements Processor
{
    public function process(Message $message, Context $context)
    {
        $this->mailer->send('foo@example.com', $message->getBody());

        $queue = $context->createQueue('anotherQueue');
        $message = $context->createMessage('Message has been sent');
        $context->createProducer()->send($queue, $message);

        return self::ACK;
    }
}

答复结果

消费组件提供了一些有用的扩展,例如有一个使处理 RPC 更简单的扩展。 生产者可能会等待消费者的回复,并且为了发送它,处理器必须返回答复结果。 不要忘记添加 ReplyExtension

<?php
use Interop\Queue\Processor;
use Interop\Queue\Message;
use Interop\Queue\Context;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\Result;

class SendMailProcessor implements Processor
{
    public function process(Message $message, Context $context)
    {
        $this->mailer->send('foo@example.com', $message->getBody());

        $replyMessage = $context->createMessage('Message has been sent');

        return Result::reply($replyMessage);
    }
}

/** @var \Interop\Queue\Context $context */

$queueConsumer = new QueueConsumer($context, new ChainExtension([
    new ReplyExtension()
]));

$queueConsumer->bind('foo', new SendMailProcessor());

$queueConsumer->consume();

关于异常

建议不要捕获异常并快速失败。 还可以考虑使用 Supervisord 或类似的进程管理器来重新启动已退出的消费者。

尽管建议失败,但在某些情况下您可能希望捕获异常。

  • 消息验证器对无效消息抛出异常。最好抓住它并返回REJECT
  • 某些传输(Doctrine DBALFilesystemRedis)确实注意到一个错误,因此将无法重新传递消息。该消息已完全丢失。您可能希望捕获异常以正确重新传递\重新入队该消息

示例

随意贡献你自己的示例。

返回首页