Link Search Menu Expand Document

支持Enqueue

Enqueue 是一个 MIT 许可的开源项目,它的持续开发完全得益于社区和我们客户的支持。如果您想加入他们,请考虑:


快速指南

传输

传输层或 PSR(Enqueue消息服务)是一个面向消息的中间件,用于在两个或多个客户端之间发送消息。 它是一个消息组件,允许应用创建、发送、接收和读取消息。它允许分布式应用的不同组件之间的通信松散耦合、可靠和异步。

PSR 的灵感来自 JMS(Java消息服务)。我们试图尽可能接近 JSR 914 规范。 目前它支持 AMQPSTOMP 消息队列协议。 您可以连接到许多现代代理,例如 RabbitMQActiveMQ 等。

生产消息:

<?php
use Interop\Queue\ConnectionFactory;

/** @var ConnectionFactory $connectionFactory **/
$context = $connectionFactory->createContext();

$destination = $context->createQueue('foo');
//$destination = $context->createTopic('foo');

$message = $context->createMessage('Hello world!');

$context->createProducer()->send($destination, $message);

消费消息:

<?php
use Interop\Queue\ConnectionFactory;

/** @var ConnectionFactory $connectionFactory **/
$context = $connectionFactory->createContext();

$destination = $context->createQueue('foo');
//$destination = $context->createTopic('foo');

$consumer = $context->createConsumer($destination);

$message = $consumer->receive();

// 处理消息

$consumer->acknowledge($message);
// $consumer->reject($message);

消费

消费是建立在传输功能之上的层。 该组件的目标是简单地消费消息。 QueueConsumer 是组件的主要部分,它允许将消息处理器(或回调)绑定到队列。 consume 方法则开启一个只要不被中断就持续消费的进程。

<?php
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Interop\Queue\Context;
use Enqueue\Consumption\QueueConsumer;

/** @var Context $context */

$queueConsumer = new QueueConsumer($context);

$queueConsumer->bindCallback('foo_queue', function(Message $message) {
    // 处理消息

    return Processor::ACK;
});
$queueConsumer->bindCallback('bar_queue', function(Message $message) {
    // 处理消息

    return Processor::ACK;
});

$queueConsumer->consume();

有很多扩展可用。 这里是如何添加它们的示例。 SignalExtension 提供进程信号的支持,例如,无论何时发送 SIGTERM,它都将得到正确管理。 LimitConsumptionTimeExtension 将在给定时间后中断消费。

<?php
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Extension\SignalExtension;
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;

/** @var \Interop\Queue\Context $context */

$queueConsumer = new QueueConsumer($context, new ChainExtension([
    new SignalExtension(),
    new LimitConsumptionTimeExtension(new \DateTime('now + 60 sec')),
]));

远程过程调用(RPC)

这里有一个 RPC 组件可以让您轻松地通过 MQ 发送 RPC 请求。 这是您发送 RPC 消息并等待回复消息的方法。

<?php
use Enqueue\Rpc\RpcClient;

/** @var \Interop\Queue\Context $context */

$queue = $context->createQueue('foo');
$message = $context->createMessage('Hi there!');

$rpcClient = new RpcClient($context);

$promise = $rpcClient->callAsync($queue, $message, 1);
$replyMessage = $promise->receive();

消费组件也有扩展。 它简化了 RPC 的服务器端。

<?php
use Interop\Queue\Message;
use Interop\Queue\Context;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\Result;

/** @var \Interop\Queue\Context $context */

$queueConsumer = new QueueConsumer($context, new ChainExtension([
    new ReplyExtension()
]));

$queueConsumer->bindCallback('foo', function(Message $message, Context $context) {
    $replyMessage = $context->createMessage('Hello');

    return Result::reply($replyMessage);
});

$queueConsumer->consume();

客户端

它提供了一个易于使用的高级抽象。 该组件的目标是隐藏尽可能多的底层细节,以便您可以专注于真正重要的事情。 例如,它通过创建队列、交换和绑定来为您配置代理。 它提供易于使用的服务来生成和处理消息。 它支持使用统一格式来设置消息过期、延迟、时间戳、关联id。 它支持消息总线,因此不同的应用可以相互通信。

下面是如何发送和使用事件消息的示例:

<?php
use Enqueue\SimpleClient\SimpleClient;
use Interop\Queue\Message;

// composer require enqueue/amqp-ext
$client = new SimpleClient('amqp:');

// composer require enqueue/fs
$client = new SimpleClient('file://foo/bar');
$client->bindTopic('a_foo_topic', function(Message $message) {
    echo $message->getBody().PHP_EOL;

    // 您的事件处理器逻辑
});

$client->setupBroker();

$client->sendEvent('a_foo_topic', 'message');

// 这是一个阻塞调用,它将消费消息直到消息被中断
$client->consume();

以及命令消息

<?php
use Enqueue\SimpleClient\SimpleClient;
use Interop\Queue\Message;
use Interop\Queue\Context;
use Enqueue\Client\Config;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\Result;

// composer require enqueue/amqp-ext # 或 enqueue/amqp-bunny 或 enqueue/amqp-lib
$client = new SimpleClient('amqp:');

// composer require enqueue/fs
//$client = new SimpleClient('file://foo/bar');

$client->bindCommand('bar_command', function(Message $message) {
    // 您的命令处理器逻辑
});

$client->bindCommand('baz_reply_command', function(Message $message, Context $context) {
    // 您的baz回复命令处理器逻辑

    return Result::reply($context->createMessage('theReplyBody'));
});

$client->setupBroker();

// 它被发送给一个消费者
$client->sendCommand('bar_command', 'aMessageData');

// 有可能得到答复
$promise = $client->sendCommand('bar_command', 'aMessageData', true);

// 只有在开始收到回复后,才能发送多个命令。

$replyMessage = $promise->receive(2000); // 2秒

// 这是一个阻塞调用,它将消费消息直到消息被中断
$client->consume([new ReplyExtension()]);

在此处阅读有关事件和命令的更多信息。

CLI命令

该库提供了开箱即用的方便的命令。 它建立在 Symfony Console 组件之上。 最有用的是一个消费命令。其中有两种,一种来自消费组件,另一种来自客户端。

让我们看看如何使用消费一:

#!/usr/bin/env php
<?php
// app.php

use Symfony\Component\Console\Application;
use Interop\Queue\Message;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Symfony\Consumption\SimpleConsumeCommand;

/** @var QueueConsumer $queueConsumer */

$queueConsumer->bindCallback('a_queue', function(Message $message) {
    // 处理消息
});

$consumeCommand = new SimpleConsumeCommand($queueConsumer);
$consumeCommand->setName('consume');

$app = new Application();
$app->add($consumeCommand);
$app->run();

并从控制台开始消费:

$ app.php consume

监控

有一个工具可以跟踪已发送/已消费的消息以及消费器性能。在这里阅读更多。

返回首页

Symfony

在此处阅读有关使用 Enqueue 作为 Symfony Bundle 的更多信息。