Link Search Menu Expand Document

支持Enqueue

Enqueue 是一个 MIT 许可的开源项目,它的持续开发完全得益于社区和我们客户的支持。如果您想加入他们,请考虑:


Web Application Messaging Protocol (WAMP) 传输

应用于Web应用消息传递协议的传输。 WAMP 是一个开放标准的 WebSocket 子协议。 它在内部使用 Thruway 的 thruway/client PHP库。

安装

$ composer require enqueue/wamp

启动WAMP路由

您可以通过Thruway获得 WAMP 路由器:

$ composer require voryx/thruway
$ php vendor/voryx/thruway/Examples/SimpleWsRouter.php

Thruway 现在在 127.0.0.1:9090 上运行

创建上下文

<?php
use Enqueue\Wamp\WampConnectionFactory;

$connectionFactory = new WampConnectionFactory();

// 同上
$connectionFactory = new WampConnectionFactory('wamp:');
$connectionFactory = new WampConnectionFactory('ws:');
$connectionFactory = new WampConnectionFactory('wamp://127.0.0.1:9090');

$context = $connectionFactory->createContext();

消费消息

在向主题发送消息之前启动消息消费者

<?php
/** @var \Enqueue\Wamp\WampContext $context */

$fooTopic = $context->createTopic('foo');

$consumer = $context->createConsumer($fooQueue);

while (true) {
    if ($message = $consumer->receive()) {
        // 处理消息
    }
}

订阅消费者

<?php
use Interop\Queue\Message;
use Interop\Queue\Consumer;

/** @var \Enqueue\Wamp\WampContext $context */
/** @var \Enqueue\Wamp\WampDestination $fooQueue */
/** @var \Enqueue\Wamp\WampDestination $barQueue */

$fooConsumer = $context->createConsumer($fooQueue);
$barConsumer = $context->createConsumer($barQueue);

$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
    // 处理消息

    return true;
});
$subscriptionConsumer->subscribe($barConsumer, function(Message $message, Consumer $consumer) {
    // 处理消息

    return true;
});

$subscriptionConsumer->consume(2000); // 2秒

发送消息到主题

<?php
/** @var \Enqueue\Wamp\WampContext $context */

$fooTopic = $context->createTopic('foo');
$message = $context->createMessage('Hello world!');

$context->createProducer()->send($fooTopic, $message);

返回首页