支持Enqueue
Enqueue 是一个 MIT 许可的开源项目,它的持续开发完全得益于社区和我们客户的支持。如果您想加入他们,请考虑:
监控
Enqueue 提供了一个监控消息队列的工具。 有了它,您可以控制发送的消息数量、成功或失败的处理数量。 有多少消费者正在工作、他们的正常运行时间、处理的消息统计信息、内存使用情况和系统负载。 该工具几乎可以与任何分析和监控平台集成。 有几种集成:
- Datadog StatsD
- InfluxDB 和 Grafana
- WAMP (Web Application Messaging Protocol) 我们正在开发基于 JS\WAMP 的实时 UI 工具,有关更多信息,请联系我们。
如果需要 Grafana 模板,例如图片上的,请联系我们。
- 安装
- 追踪发送的消息
- 追踪消费的消息
- 追踪消费者指标
- 消费扩展
- Enqueue客户端扩展
- InfluxDB存储
- Datadog存储
- WAMP(Web Socket Messaging Protocol)存储
- Symfony应用
安装
$ composer req enqueue/monitoring:0.9.x-dev
追踪发送的消息
<?php
use Enqueue\Monitoring\SentMessageStats;
use Enqueue\Monitoring\GenericStatsStorageFactory;
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushSentMessageStats(new SentMessageStats(
(int) (microtime(true) * 1000), // 时间戳
'queue_name', // 队列
'aMessageId',
'aCorrelationId',
[], // 标头
[] // 属性
));
或者,如果您使用 Queue Interop 传输,这里是您如何跟踪发送的消息的示例:
<?php
use Interop\Queue\Context;
use Enqueue\Monitoring\SentMessageStats;
use Enqueue\Monitoring\GenericStatsStorageFactory;
/** @var Context $context */
$queue = $context->createQueue('foo');
$message = $context->createMessage('body');
$context->createProducer()->send($queue, $message);
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushSentMessageStats(new SentMessageStats(
(int) (microtime(true) * 1000),
$queue->getQueueName(),
$message->getMessageId(),
$message->getCorrelationId(),
$message->getHeaders()[],
$message->getProperties()
));
追踪消费的消息
<?php
use Enqueue\Monitoring\ConsumedMessageStats;
use Enqueue\Monitoring\GenericStatsStorageFactory;
$receivedAt = (int) (microtime(true) * 1000);
// 繁重的逻辑处理
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushConsumedMessageStats(new ConsumedMessageStats(
'consumerId',
(int) (microtime(true) * 1000), // 此时
$receivedAt,
'aQueue',
'aMessageId',
'aCorrelationId',
[], // 标头
[], // 属性
false, // 是否已接收到
ConsumedMessageStats::STATUS_ACK
));
或者,如果您使用 Queue Interop 传输,这里是您如何跟踪发送的消息的示例:
<?php
use Interop\Queue\Context;
use Enqueue\Monitoring\ConsumedMessageStats;
use Enqueue\Monitoring\GenericStatsStorageFactory;
/** @var Context $context */
$queue = $context->createQueue('foo');
$consumer = $context->createConsumer($queue);
$consumerId = uniqid('consumer-id', true); // 我们建议在这里使用UUID
if ($message = $consumer->receiveNoWait()) {
$receivedAt = (int) (microtime(true) * 1000);
// 繁重的逻辑处理
$consumer->acknowledge($message);
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushConsumedMessageStats(new ConsumedMessageStats(
$consumerId,
(int) (microtime(true) * 1000), // 此时
$receivedAt,
$queue->getQueueName(),
$message->getMessageId(),
$message->getCorrelationId(),
$message->getHeaders(),
$message->getProperties(),
$message->isRedelivered(),
ConsumedMessageStats::STATUS_ACK
));
}
追踪消费者指标
消费者是长时间运行的进程。了解其中有多少正在运行、它们如何执行、它们使用了多少内存等至关重要。 此示例显示了如何发送此类指标。 在处理消息之间不时的调用此代码。
<?php
use Enqueue\Monitoring\ConsumerStats;
use Enqueue\Monitoring\GenericStatsStorageFactory;
$startedAt = (int) (microtime(true) * 1000);
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushConsumerStats(new ConsumerStats(
'consumerId',
(int) (microtime(true) * 1000), // 此时
$startedAt,
null, // 完成时间
true, // 已开始?
false, // 已完成?
false, // 已失败?
['foo'], // 消费的队列
123, // 已接收的消息数量
120, // 已认可(acknowledged)的消息数量
1, // 已拒绝的消息数量
1, // 重新入队的消息数量
memory_get_usage(true),
sys_getloadavg()[0]
));
消费扩展
Enqueue QueueConsumer 有一个 ConsumerMonitoringExtension
扩展。 它可以为您收集消费的消息和消费者统计信息。
<?php
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Monitoring\ConsumerMonitoringExtension;
use Enqueue\Monitoring\GenericStatsStorageFactory;
use Interop\Queue\Context;
/** @var Context $context */
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$queueConsumer = new QueueConsumer($context, new ChainExtension([
new ConsumerMonitoringExtension($statsStorage)
]));
// 绑定...
// 消费...
Enqueue客户端扩展
Enqueue 客户端 也有一个 ClientMonitoringExtension 扩展。它可以为您收集已发送的消息统计信息。
InfluxDB存储
安装额外的软件包:
$ composer req influxdb/influxdb-php:^1.14
<?php
use Enqueue\Monitoring\GenericStatsStorageFactory;
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
一些可用的选项:
* 'host' => '127.0.0.1',
* 'port' => '8086',
* 'user' => '',
* 'password' => '',
* 'db' => 'enqueue',
* 'measurementSentMessages' => 'sent-messages',
* 'measurementConsumedMessages' => 'consumed-messages',
* 'measurementConsumers' => 'consumers',
* 'client' => null,
* 'retentionPolicy' => null,
您可以在 client
选项中传递 InfluxDB\Client
实例。否则,它将在第一次使用时根据其他选项创建。
如果您的 InfluxDB\Client
使用实现了 InfluxDB\Driver\QueryDriverInterface
接口路的驱动,那么如果数据库不存在,则会自动为您创建它。默认的 InfluxDB\Client
也会这样做。
Datadog存储
安装额外的软件包:
$ composer req datadog/php-datadogstatsd:^1.3
<?php
use Enqueue\Monitoring\GenericStatsStorageFactory;
$statsStorage = (new GenericStatsStorageFactory())->create('datadog://127.0.0.1:8125');
为获得最佳体验,请在公制摘要中调整单位和类型。
示例仪表板:
一些可用的选项(以及所有可用的指标):
* 'host' => '127.0.0.1',
* 'port' => '8125',
* 'batched' => true, // 性能提升
* 'global_tags' => '', // 需要包含键和值
* 'metric.messages.sent' => 'enqueue.messages.sent',
* 'metric.messages.consumed' => 'enqueue.messages.consumed',
* 'metric.messages.redelivered' => 'enqueue.messages.redelivered',
* 'metric.messages.failed' => 'enqueue.messages.failed',
* 'metric.consumers.started' => 'enqueue.consumers.started',
* 'metric.consumers.finished' => 'enqueue.consumers.finished',
* 'metric.consumers.failed' => 'enqueue.consumers.failed',
* 'metric.consumers.received' => 'enqueue.consumers.received',
* 'metric.consumers.acknowledged' => 'enqueue.consumers.acknowledged',
* 'metric.consumers.rejected' => 'enqueue.consumers.rejected',
* 'metric.consumers.requeued' => 'enqueue.consumers.requeued',
* 'metric.consumers.memoryUsage' => 'enqueue.consumers.memoryUsage',
WAMP(Web Socket Messaging Protocol)存储
安装额外的软件包:
$ composer req thruway/pawl-transport:^0.5.0 thruway/client:^0.5.0
<?php
use Enqueue\Monitoring\GenericStatsStorageFactory;
$statsStorage = (new GenericStatsStorageFactory())->create('wamp://127.0.0.1:9090?topic=stats');
一些可用的选项:
* 'host' => '127.0.0.1',
* 'port' => '9090',
* 'topic' => 'stats',
* 'max_retries' => 15,
* 'initial_retry_delay' => 1.5,
* 'max_retry_delay' => 300,
* 'retry_delay_growth' => 1.5,
Symfony应用
您必须注册一些服务才能将监控工具合并到您的 Symfony 应用中。
# config/packages/enqueue.yaml
enqueue:
default:
transport: 'amqp://guest:guest@bar:5672/%2f'
monitoring: 'influxdb://127.0.0.1:8086?db=foo'
another:
transport: 'amqp://guest:guest@foo:5672/%2f'
monitoring: 'wamp://127.0.0.1:9090?topic=stats'
client: ~
datadog:
transport: 'amqp://guest:guest@foo:5672/%2f'
monitoring: 'datadog://127.0.0.1:8125?batched=false'
client: ~