Link Search Menu Expand Document

支持Enqueue

Enqueue 是一个 MIT 许可的开源项目,它的持续开发完全得益于社区和我们客户的支持。如果您想加入他们,请考虑:


Kafka 传输

本传输使用 Kafka 流平台作为 MQ 代理。

安装

$ composer require enqueue/rdkafka

创建上下文

<?php
use Enqueue\RdKafka\RdKafkaConnectionFactory;

// 连接到localhost:9092
$connectionFactory = new RdKafkaConnectionFactory();

// 同上
$connectionFactory = new RdKafkaConnectionFactory('kafka:');

// 同上
$connectionFactory = new RdKafkaConnectionFactory([]);

// 添加自定义选项,连接到example.com:1000上的Kafka代理
$connectionFactory = new RdKafkaConnectionFactory([
    'global' => [
        'group.id' => uniqid('', true),
        'metadata.broker.list' => 'example.com:1000',
        'enable.auto.commit' => 'false',
    ],
    'topic' => [
        'auto.offset.reset' => 'beginning',
    ],
]);

$context = $connectionFactory->createContext();

// 如果已安装了 enqueue/enqueue 库,则可以使用工厂从DSN构建上下文。
$context = (new \Enqueue\ConnectionFactoryFactory())->create('kafka:')->createContext();

发送消息到主题

<?php
/** @var \Enqueue\RdKafka\RdKafkaContext $context */

$message = $context->createMessage('Hello world!');

$fooTopic = $context->createTopic('foo');

$context->createProducer()->send($fooTopic, $message);

发送消息到队列

<?php
/** @var \Enqueue\RdKafka\RdKafkaContext $context */

$message = $context->createMessage('Hello world!');

$fooQueue = $context->createQueue('foo');

$context->createProducer()->send($fooQueue, $message);

消费消息

<?php
/** @var \Enqueue\RdKafka\RdKafkaContext $context */

$fooQueue = $context->createQueue('foo');

$consumer = $context->createConsumer($fooQueue);

// 启用异步提交以获得更好的性能(自版本0.9.9起默认为true)。
//$consumer->setCommitAsync(true);

$message = $consumer->receive();

// 处理消息

$consumer->acknowledge($message);
// $consumer->reject($message);

序列化消息

默认情况下,传输将消息序列化为 json 格式,但您可能希望使用另一种格式,例如Apache Avro。 为此,您必须实现 Serializer 接口并将其设置为上下文、生产者或消费者。 如果序列化器设置为上下文,它将被注入到由上下文创建的所有消费者和生产者。

<?php
use Enqueue\RdKafka\Serializer;
use Enqueue\RdKafka\RdKafkaMessage;

class FooSerializer implements Serializer
{
    public function toMessage($string) {}

    public function toString(RdKafkaMessage $message) {}
}

/** @var \Enqueue\RdKafka\RdKafkaContext $context */

$context->setSerializer(new FooSerializer());

修改偏移量

默认情况下,消费者从主题的开头开始,并在您处理消息时更新偏移量。 这里有方法可以更改当前的偏移量。

<?php
/** @var \Enqueue\RdKafka\RdKafkaContext $context */

$fooQueue = $context->createQueue('foo');

$consumer = $context->createConsumer($fooQueue);
$consumer->setOffset(123);

$message = $consumer->receive(2000);

与Symfony bundle一起使用

设置您的 enqueue 以使用 rdkafka 作为您的传输

# app/config/config.yml

enqueue:
    default:
        transport: "rdkafka:"
        client: ~

如果您不想通过 DSN 字符串来传递它们或需要传递特定选项,您还可以扩展配置以传递其他选项。 由于 rdkafka 使用 librdkafka(基本上是它的封装器),大多数配置选项与 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 中的配置选项相同。

# app/config/config.yml

enqueue:
    default:
        transport:
            dsn: "rdkafka://"
            global:
                ### 确保这对于每个应用/消费者组都是唯一的,并且不会更改
                ### 否则,Kafka将无法跟踪您的上一个偏移量,并将始终根据 “auto.offset.reset” 设置来启动。
                ### 如果您想了解更多信息,请参阅关于`group.id`属性的Kafka文档
                group.id: 'foo-app'
                metadata.broker.list: 'example.com:1000'
            topic:
                auto.offset.reset: beginning
            ### 自版本0.9.9起,默认情况下异步提交为true。
            ### 建议在早期版本中将其设置为true,否则消费者会变得非常缓慢,
            ### 它会等待偏移量存储在Kafka上,然后再继续。
            commit_async: true
        client: ~

返回首页