Link Search Menu Expand Document

支持Enqueue

Enqueue 是一个 MIT 许可的开源项目,它的持续开发完全得益于社区和我们客户的支持。如果您想加入他们,请考虑:


Enqueue Mongodb 消息队列传输

允许使用 MongoDB 作为消息队列代理。

安装

$ composer require enqueue/mongodb

创建上下文

<?php
use Enqueue\Mongodb\MongodbConnectionFactory;

// 连接到localhost
$connectionFactory = new MongodbConnectionFactory();

// 同上
$factory = new MongodbConnectionFactory('mongodb:');

// 同上
$factory = new MongodbConnectionFactory([]);

$factory = new MongodbConnectionFactory([
    'dsn' => 'mongodb://localhost:27017/db_name',
    'dbname' => 'enqueue',
    'collection_name' => 'enqueue',
    'polling_interval' => '1000',
]);

$context = $factory->createContext();

// 如果已安装了 enqueue/enqueue 库,则可以使用工厂从DSN构建上下文。
$context = (new \Enqueue\ConnectionFactoryFactory())->create('mongodb:')->createContext();

发送消息到主题

<?php
/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooTopic */

$message = $context->createMessage('Hello world!');

$context->createProducer()->send($fooTopic, $message);

发送消息到队列

<?php
/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$message = $context->createMessage('Hello world!');

$context->createProducer()->send($fooQueue, $message);

发送权重消息

<?php
/** @var \Enqueue\Mongodb\MongodbContext $context */

$fooQueue = $context->createQueue('foo');

$message = $context->createMessage('Hello world!');

$context->createProducer()
    ->setPriority(5) // 优先级越高,消息越快到达消费者。
    //
    ->send($fooQueue, $message)
;

发送限期消息

<?php
/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$message = $context->createMessage('Hello world!');

$context->createProducer()
    ->setTimeToLive(60000) // 60秒
    //
    ->send($fooQueue, $message)
;

发送延迟消息

<?php
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;

/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

// 确保运行了 "composer require enqueue/amqp-tools"

$message = $context->createMessage('Hello world!');

$context->createProducer()
    ->setDeliveryDelay(5000) // 5秒

    ->send($fooQueue, $message)
;

消费消息

<?php
/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$consumer = $context->createConsumer($fooQueue);

$message = $consumer->receive();

// 处理消息

$consumer->acknowledge($message);
// $consumer->reject($message);

订阅消费者

<?php
use Interop\Queue\Message;
use Interop\Queue\Consumer;

/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */
/** @var \Enqueue\Mongodb\MongodbDestination $barQueue */

$fooConsumer = $context->createConsumer($fooQueue);
$barConsumer = $context->createConsumer($barQueue);

$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
    // 处理消息

    $consumer->acknowledge($message);

    return true;
});
$subscriptionConsumer->subscribe($barConsumer, function(Message $message, Consumer $consumer) {
    // 处理消息

    $consumer->acknowledge($message);

    return true;
});

$subscriptionConsumer->consume(2000); // 2秒

返回首页