关键概念
如果您不熟悉队列系统,则需要了解一些关键概念以充分利用本库。
本库由几个组件组成。这些组件可以独立使用或作为整体的部分使用。
组件
传输
传输是特定于供应商的底层库,提供队列功能:一种编程化创建、发送和读取消息的方式。 它基于 queue interop 接口。如果需要完全控制或访问特定于供应商的功能,请直接使用传输。
最著名的传输是 RabbitMQ、Amazon SQS、Redis、Filesystem。
- 连接工厂 (connection factory)使用特定于供应商的配置来创建连到供应商服务的连接
- 上下文(context)提供生产者、消费者并帮助创建消息。它是最常用的对象,也是抽象工厂模式的一种实现。
- 目的地 (destination)是消息可以发送到的地点的概念。选择队列或主题。目的地代表代理状态,因此希望在代理端看到相同的名称。
- 队列(queue)是一个已命名目的地,消息可以发送到该目的地。消息在队列上累积,直到服务于这些队列的程序(称为消费者)检索到消息为止。
- 主题(topic)实现发布和订阅语义。当您发布消息时,它会被发送给所有感兴趣的订阅者 - 因此零到多个订阅者将收到该消息的副本。一些代理不支持 Pub\Sub。
- 消息(message)用于描述发送到(或接收自)目的地的数据。它由正文、标头和属性组成。
- 生产者(producer)向目的地发送消息。生产者实现特定于供应商的逻辑,并负责在Enqueue和特定于供应商的消息格式之间转换消息。
- 消费者(consumer)从目的地获取消息。消费者实现特定于供应商的逻辑,并负责在特定于供应商的消息格式和Enqueue之间转换消息。
- 订阅消费者(subscription consumer)提供了一种同时消费来自多个目的地的消息的方法。某些代理不支持此功能。
- 处理器(processor)是用于共享消息处理逻辑的可选概念。独立于供应商。实现您自己的业务逻辑。
我们可能会参考的其他术语:
- 接收以及删除投递(receive and delete delivery):当消费者获取到消息时,队列将删除该消息。如果处理失败,则消息将丢失,不会再次处理。这称为仅一次处理。
- 窥视和锁定投递(peek and lock delivery):当消息被消费者获取时,队列会在短时间内锁定该消息,使其对其他消费者不可见,以防止重复处理以及消息丢失。如果在锁定超时之前没有确认,则假定失败,然后该消息在队列中再次可见,以供消费者再次尝试。这称为至少一次处理。
- 显式确认(an explicit acknowledgement):队列在消费者获取消息时锁定消息,使其对其他消费者不可见,以防止重复处理和消息丢失。如果在连接关闭之前没有收到明确的确认,则假定失败,然后消息在队列中再次可见,以供再次尝试。这称为至少一次处理。
- 消息投递延迟(message delivery delay): 消息被发送到队列,但消费者无法立即看到以获取它们。您可能需要它在特定时间执行一项动作。
- (消息限期)message expiration:消息可以在一段时间内从队列中删除而不进行处理。您可能需要它以便不处理过时的消息。某些传输不支持此功能。
- 消息权重:消息可以以更高的优先级发送,因此被消费得更快。它违反了先进先出的概念,应谨慎使用。某些传输不支持该功能。
- 先进先出:消息的处理顺序与它们进入队列的顺序相同。
生命周期
队列系统分为两个主要部分:生产和消费。 传输部分的快速指南展示了两个部分的一些代码示例。
生产部分
- 应用创建一个带有连接工厂的上下文
- 上下文帮助应用创建消息
- 应用从上下文中获取生产者
- 应用使用生产者将消息发送到队列
消费部分
- 应用从上下文中获取消费者
- 消费者从队列接收消息
- 消费者调用处理器来处理消息
- 处理器向消费者返回状态(如
Interop\Queue\Processor::ACK
) - 消费者根据处理器返回的状态将消息重新入队或从队列中删除
消费
消费组件基于传输之上的。 最重要的类是 QueueConsumer。 可以与任何兼容queue interop的传输一起使用。 它提供了可以临时进入进程流的扩展点。您可以注册现有扩展,或编写自定义扩展。
客户端
Enqueue Client 旨在提供尽可能简单的开发体验。 它提供了高级别的、非常优雅的 API。 它在内部管理所有传输的差异,甚至会模拟那些缺失的功能(如发布-订阅)。 请注意:客户端有自己的命名传输目的地的逻辑。其期望有一个与客户端主题、命令名称不同的传输队列\主题名称。可以禁用前缀行为。
- 主题(Topic):当您想通知多个订阅器发生了某些事情时,请向主题发送消息。它无法获得订阅器结果(result)。在内部使用路由器来投递消息。
- 命令(Command):保证只有一个命令处理器\订阅器。然后,您可以从中获得结果。如果没有命令订阅器,则抛出异常。
- 路由(Router):复制要发送到主题的消息,并为每个订阅器生成该消息的副本,然后发送它。
- 驱动(Driver):包含特定于供应商的逻辑。
- 生产者(Producer):负责向主题或命令发送消息。它与传输的生产者无关。
- 消息(Message):包含要发送的数据。请注意,在消费者端,您必须处理传输的消息。
- 消费(Consumption):依赖于消费组件。
如何使用Enqueue?
有多种使用 Enqueue 的方法:这两种方法都可以减少开始使用 Enqueue 功能时必须编写的样板代码。
- 作为客户端:依赖 DSN 进行连接。
- 作为Symfony Bundle:如果您使用的是 Symfony 框架,则推荐使用它。