Link Search Menu Expand Document

支持Enqueue

Enqueue 是一个 MIT 许可的开源项目,它的持续开发完全得益于社区和我们客户的支持。如果您想加入他们,请考虑:


快速指南

EnqueueBundle 集成了队列库。 它添加了易于使用的配置层、注册服务以及方便的cli 命令

安装

$ composer require enqueue/enqueue-bundle enqueue/fs

注意: 您可以使用其他 传输

注意: 如果您正在寻找一种从 php-amqplib/rabbitmq-bundle 迁移的方式,请阅读本文

启用Bundle

然后,将 new Enqueue\Bundle\EnqueueBundle() 添加到您项目的 app/AppKernel.php 文件的 registerBundles 方法中的bundle数组中:

<?php
// src/Kernel.php
namespace App;

use Symfony\Component\HttpKernel\Kernel as BaseKernel;

class Kernel extends BaseKernel
{
    public function registerBundles()
    {
        $bundles = array(
            // ...

            new Enqueue\Bundle\EnqueueBundle(),
        );

        // ...
    }

    // ...
}

用例

首先,您必须配置一个传输层。 如果需要,您可以选择配置多个传输。 基于以下内容,其中一个将自动成为默认传输:

  1. 如果有一个名为 default 的传输,那么它将成为默认值。
  2. 未特别指定时的第一个传输。

默认传输的服务将在它们各自的类接口下的常用 Symfony 容器中提供给您(见下文)

# app/config/config.yml

enqueue:
    default:
        transport: "amqp:"
        client: ~
    some_other_transport:
        transport: "amqp:"
        client: ~

配置完所有内容后,您就可以开始生产消息了。 如前所述,默认传输服务已在容器中可用。在这里,我们使用 ProducerInterfacedefault 传输生产消息。

<?php
use Enqueue\Client\Message;
use Enqueue\Client\ProducerInterface;

/** @var ProducerInterface $producer **/
$producer = $container->get(ProducerInterface::class);

// 如果您想要一个有别于默认的生产者(例如上面示例中指定的另一个生产者),则使用
// $producer = $container->get('enqueue.client.some_other_transport.producer');

// 给若干消费者发送事件
$producer->sendEvent('aFooTopic', 'Something has happened');
// 如果需要更大的灵活性,还可以给第二个参数传递一个Enqueue\Client\Message的实例。
$properties = [];
$headers = [];
$message = new Message('Message body', $properties, $headers);
$producer->sendEvent('aBarTopic', $message);

// 给一个消费者发送命令
$producer->sendCommand('aProcessorName', 'Something has happened');

要消费消息,您必须首先创建一个消息处理器。

下面的示例展示了如何创建一个处理器来接收来自 aFooTopic 主题(并且只有这个主题)的消息。 它假定您使用默认的 Symfony 服务配置,并且此类已自动装配。否则,您将不得不手动标记它。 如果您使用多种传输,则尤其如此:如果保持自动装配,处理器将仅连接到默认传输。

注意:Enqueue的主题和某些传输(例如 Kafka)上的主题是两件不同的事情。

<?php
use Interop\Queue\Message;
use Interop\Queue\Context;
use Interop\Queue\Processor;
use Enqueue\Client\TopicSubscriberInterface;

class FooProcessor implements Processor, TopicSubscriberInterface
{
    public function process(Message $message, Context $session)
    {
        echo $message->getBody();

        return self::ACK;
        // return self::REJECT; // 当消息已经损坏时
        // return self::REQUEUE; // 消息正常,但您希望推迟处理
    }

    public static function getSubscribedTopics()
    {
        return ['aFooTopic'];
    }
}

将其注册为容器服务。如果您不使用自动装配,请将其订阅到该主题。

foo_message_processor:
    class: 'FooProcessor'
    tags:
        - { name: 'enqueue.topic_subscriber' }
        # 使用下面的变体连接到特定客户端
        # 另外请注意,若不禁用自动配置,则上述标记将自动应用于默认客户端
        # - { name: 'enqueue.topic_subsciber', client: 'some_other_transport' }

现在你可以开始消费消息了:

$ ./bin/console enqueue:consume --setup-broker -vvv

您可以选择特定的客户端进行消费:

$ ./bin/console enqueue:consume --setup-broker --client="some_other_transport" -vvv

注意: 添加 -vvv 以了解在您消费消息时发生了什么,那里有很多有价值的调试信息。

返回首页