Pulsar消息发送、消费架构概述
大家好,我是威哥,《RocketMQ技术内幕》、《RocketMQ实战》作者、RocketMQ社区首席布道师、极客时间《中间件核心技术与实战》专栏作者、中通快递基础架构资深架构师,越努力越幸运,唯有坚持不懈,与大家共勉。
1、订阅与发布
Pulsar基于发布-订阅模式,消息发送者向主题发送消息,而消费消费者订阅主题,消息从Pulsar Broker中获取消息,处理成功后需要向Pulsar发送ACK,表示消息处理成功。与RocketMQ/Kafka不同的是,**Pulsar只有当消费者确认消息都成功被处理后才能去删除消息。**如果Consumer在处理一批消息失败后,可以再次请求Broker重新下发该批消息,以便进行重试。
2、消息(Messages)
消息(Message)是Pulsar中最基本的抽象单位,一条Pulsar消息中的属性如下所示:
组件名称 | 描述 |
---|---|
Value / data payload | 消息体,字节数组 |
Key | 消息键或者分区键,主题的压缩机制将依赖消息的key |
Properties | 消息属性,类比RocketMQ的消息属性、Kafka中的Header,通常用于存放消息的扩展属性 |
Producer name | 消息的生产者 |
Topic name | 消息所属的主题 |
Schema version | 消息生产者的Schema版本号 |
Sequence ID | 消息在主题中所有消息的序号,默认由发送者产生,也可以由用户自定义,可以用于在一次消息发送调用API中消息去重。 如果服务端brokerDeduplicationEnabled设置为true,则服务端会进行唯一性校验 |
Message ID | 消息ID,消息持久化到服务端后生成,里面包含了消息的特定存储位置,并且在集群内是全局唯一的 |
Publish time | 消息的发送时间,消息发送者自动生成 |
Event time | 事件时间,由应用程序附加到消息的可选时间时间戳,通常是业务发生的时间,例如订单的下单时间等 |
在Pulsar中,消息的默认最大消息大小为5M,我们可以有如下两种方式进行更改:
- 在broker.conf中maxMessageSize=5242880
- 在bookkeeper.conf中nettyMaxFrameSizeBytes=5253120
3、生产者(Producers)
Producer生产者,消息发送客户端。
3.1发送模式
Produder有两种消息发送模式:
- 同步发送
- 异步发送
3.2 访问模式
在Pulsar中,生产者访问Broker中主题提供了多种访问模式,详细如下表所示:
组件名称 | 描述 |
---|---|
Shared | 多个生产者都可以向一个主题发送消息,默认模式 |
Exclusive | 独占模式,一个主题只能被一个生产者连接,如果另外一个生产者试图连接,则会立即收到一个错误,但如果老的生产者宕机,会选举产生一个新的生产者 |
ExclusiveWithFencing | 只允许一个生产者往该主题发送消息,相比Exclusive,没有备选机制。 |
WaitForExclusive | 支持多个生产者通过选举机制成为Leader后发送消息,Leader宕机后,重新竞争选举出新的Leader,只有Leader可以发送消息 |
3.3 压缩算法
Pulsar目前支持LZ4、ZLIB、ZSTD、SNAPP四种压缩机制,可以通过如下代码指定压缩算法:
client.newProducer()
.topic("topic-name")
.compressionType(CompressionType.LZ4)
.create();
3.4 批量发送
Pulsar支持批量消息发送。如果开启批量发送,消息发送者会将多条消息累积到一个批次中进行一次发送。一个批次的消息大小由最大的消息条数+最大的发送延迟两个参数共同决定(参考kafka中的batch.size、linger.ms),如果开启了批处理,backlogSize表示的一个批次中消息的条数。
批处理示例图如下:
Pulsar会把一个批次作为一个整体存储到Broker中,消费者接到一个批次后再解绑成一条一条的消息。但即使开启批处理,但调度类消息(设置了deliverAt或者deliverAfter)会单独一条消息进行发送。
默认情况下如果一个批次中的消息出现部分消费失败,消费端在消费重试时会再次收到这个批次中所有的消息,为了避免这种情况,Pulsar在2.6.0版本中引入了批量索引确认机制。一个批次中所有消息被确认后会删除。那pulsar是如何支持消息回溯的呢?[答案在介绍Consumer的时候会介绍]
默认情况下,批索引确认机制是关闭的。如果要开启,需要在broker端配置acknowledgmentAtBatchIndexLevelEnabled=true。同样在消费端也需要设置acknowledgmentAtBatchIndexLevelEnabled=true。
消费端开启批索引确认示例代码:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.enableBatchIndexAcknowledgment(true)
.subscribe();
3.5 消息分块
Pulsar能够在消息发送端将一个大的消息体分割成多个小块的消息,并且在消息消费端聚合成一条完整消息再消费。
如果开启分块机制,当需要发送的消息超过允许的最大消息大小(maxMessageSize)时,其工作流程如下:
- 生产者将原始消息拆分成分块消息,并将他们与分块元数据按顺序发送给Broker。
- Broker将分块消息当成普通消息进行存储,并且使用chunkedMessageRate来记录分块消息的速率。
- 消息消费端首先将缓存分块消息,直到一个消息所有分库都被接收,然后整合成一条完整的消息,传输到接收Queue中,客户端从接收队列中获取一条完整的消息。
限制:
- 分块消息只适用于持久化主题
- 分块只对 exclusive与failover subscription types 两种订阅模式生效
- 分块不能与批处理同时开启
关于分块的工作机制,官方文档如下所示:
两个生产者发送多条消息的分块,在服务端,一条消息的多个分块会被顺序存储,但一条消息的多个分块并不是连续存储的,然后消费者在接收时,会利用缓存对分块信息进行聚合。
**注意:**一旦开启了消息分块,消费时需要在消费端聚合成一条完整的消息,必须为每一条大消息创建独立的缓冲区,会对消费端的内存带来压力,有内存溢出的风险。
故为了保护消费端,消费端采取了两个措施:
- 引入了一个maxPendingChunkedMessage参数,设置可以缓存的最大chunk数,当缓存的chunk数量达到这个值后,pulsar会drop掉部分chunk,先保证一条消息顺利合并,其他丢弃的消息再在合适的时候重新从Broker拉取。
- 引入了expireTimeOfIncompleteChunkedMessage参数,如果一个消息的所有块在指定时间内没有全部到达,这些分块将在消费端全部被移除,默认的过期时间为1min。
要开启消息分块的一个前提条件是需要关闭批量发送,具体做法是将生产者的enableBatching设置为false。
默认情况下消息分块是禁用的,如果需要开启,需要将生产者的chunkingEnabled设置为true。
4、消费组(Consumers)
消费者,通过订阅主题,从而从Broker端接受消息,消息发送,消息消费的核心示意图如下:
消费端会使用一个队列来接受Broker端的消息,这个缓存可以通过receiverQueueSize来配置,默认为1000。
4.1 消息接受模式
消费端接收消息支持同步接收与异步接收两种模式:
- Sync receive:同步接收模式,如果Broker没有需要消费的消息,接受线程将阻塞
- An async receive:异步接收模式,将立马返回,使用了Feture模式,消息真正到达后可用。
4.2 消费监听器(Listeners)
消息消费监听器,当从Broker中收到消息后,将调用消费监听器,从而触发业务代码的执行。
4.3 消费确认机制(Acknowledgment)
消费者在成功消费完一条消息后需要告知Broker已成功消费,俗称ACK确认信息。然后这条消息会被持久化存储,并且在所有订阅组都成功确认后才会删除这条消息。如果希望Broker继续存储已被所有订阅确认的消息,则需要设置消息的持久策略(本文后面会详细介绍)。
如果发送端启用了批处理,则Pulsar可以引入批索引确认机制,避免一个批次的消息重复下发给消费者。
在Pulsar中,确认一条消息有如下两种方式:
- 单条消息独立确认。消费者每一条消息都会发送ACK给Broker,消费端通过调用consumer.acknowledge(msg)对单条消息进行确认。
- 累积确认,消费者只确认接收到的最后一条消息。消费端通过调用consumer.acknowledgeCumulative(msg)进行累积消息确认。
4.3.1 Negative acknowledgment(取消确认)
消费者可以通过发送neagative ack请求到broker,告知broker并未成功消费该条消息,broker收到该请求后,会触发broker将这条消息重新下发给消费者进行消费。
如果消费者订阅模式为Exclusive或者Failover subscription类型时,消费者只能否认收到的最后一条消息。
如果消费者订阅模式为Shared或者Key_Shared类型时,消费者可以否认单独一条消息。
值得注意的是,Negative acknowledgment机制将对顺序性语义带来破坏,在顺序消费场景,请慎重考虑。
如果要对消息使用否定确认,请确保在消息确认超时之前进行发起。
我们可以使用如下API来进行否定确认,代码如下:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-negative-ack")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
//设置当客户端调用negativeAcknowledge方法后,服务端进行再投递的延迟时间
.negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) // the default value is 1 min
.subscribe();
Message<byte[]> message = consumer.receive();
// call the API to send negative acknowledgment
consumer.negativeAcknowledge(message);
message = consumer.receive();
consumer.acknowledge(message);
当客户端调用negativeAcknowledge后,但服务端如果一直未收到这条消息的再次ACK,会在服务端进行重推,并且可以设置阶梯延迟投递,启用类似阶梯投递机制的代码如下所示:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-negative-ack")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
// 启用阶梯延迟重推
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(1000) // 最小时间,机第一次推送的延迟时间,然后按multiplier倍数增长,
.maxDelayMs(60 * 1000) // 最大延迟推送等待时间
.multiplier(2) // 一次延迟推送的倍数,这里设置为,则重试时间如下:1s 2s 4s 8s 16s 32s 60s 60s
.build())
.subscribe();
温馨提示:如果发送端启用了批处理,Broker是按批的维度重推这一批消息。
4.3.2 (确认超时)Acknowledgment timeout
默认情况下,并不会开启ACK超时确认,也就是意味着Broker将一条消息传递给消费者后并不会再次投递,除非消费者崩溃退出。
确认超时机制允许您设置一个时间范围,在此期间客户端跟踪未确认的消息。在设置的超时(ackTimeout)时间过期后,客户端可以向Broker发送redeliver unacknowledged messages 请求,然后Broker会将未确认消息再次投递给消费者。
客户端在ackTimeout超时后,有两种机制向服务端发送redeliver unacknowledged messages:
-
第一种是以固定频率定时发送,主要是通过设置消费者的ackTimeoutTickTime参数,示例如下:
PulsarClient pulsarClient = PulsarClient.builder().build(); Consumer<byte[]> consumer = pulsarClient.newConsumer() .ackTimeout(10, TimeUnit.SECONDS) // 开启超时确认机制 .ackTimeoutTickTime(1000, TimeUnit.SECONDS) // 设置定时发送频率 // 省略其他属性 .subscribe();
-
第二种是延迟梯度的方式进行发送,具体代码如下:
consumer.ackTimeout(10, TimeUnit.SECOND) .ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder() .minDelayMs(1000) // 最小延迟时间 .maxDelayMs(60 * 1000) // 最大延迟时间 .multiplier(2) // 递增倍数 .build());
温馨提示:
- 如果启用了批处理,确认超时后,Broker会将一个批次作为一个整体重推,而不是重推这个批次中的部分消息。
- negative acknowledgment确认比超时确认拥有更高的优先级。
4.4 重试主题(Retry letter topic)
Pulsar支持消息消费重试,消费者在消费消息的过程中如果处理失败,可以将这些消息存储在消费者对应的重试主题中,以便后续再次重新消费,消费者会自动订阅重试主题。达到最大消费重试次数后如果还是失败,则会将消息存储在死信队列,死信队列中的消息需要人工手动去处理。
重试主题的工作机制如下图所示:
**消息消费失败重试机制默认是禁用的,**可以通过设置enableRetry为true开启消费消费失败重试,可以通过maxRedeliverCount设置最大重试次数,开启消息消费重试机制的示例代码如下:
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("dw_test_consumer_022000")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true) // 开启消费重试
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount) // 最大重试次数
.retryLetterTopic("my-retry-letter-topic-name")// 可以自定义重试主题
.build())
.subscribe();
重试主题的默认命名规则:topicName-subscriptionname-RETRY
重试主题中的消息包含一些特殊的属性:
- REAL_TOPIC 消息原始主题
- ORIGIN_MESSAGE_ID 消息原始ID
- RECONSUMETIMES 当前重试次数
- DELAY_TIME 消息重试间隔(毫秒)
如果使用消息重试,客户端需要调用如下API将消息持久化到消息队列中:
consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS);
//并且该方法还有一个重载方法,支持自定义消息属性
Map<String, String> customProperties = new HashMap<String, String>();
customProperties.put("custom-key-1", "custom-value-1");
customProperties.put("custom-key-2", "custom-value-2");
consumer.reconsumeLater(msg, customProperties, 3, TimeUnit.SECONDS);
温馨提示:
- 目前,只在Shared 订阅模式中启用了消息消费重试机制
- 与否定确认(Negative acknowledgment)相比,消息消费重试机制更加适合类似需要大量重试并且重试间隔可配置的场景,因为消息重试主题是持久到BookKeeper中,而否定确认是缓存在客户端。
4.5 死信队列(Dead letter topic)
如果消费重试次数达到指定的最大值后还是未成功消费,Pulsar会将消息发送到消费者对应的死信队列,一旦消息进入到死信队列,Pulsar不会主动对这些消息进行任何处理,需要要消费者自己决定如何处理这些消息。
死信队列默认的主题名称为:topicname-subscriptionname-DLQ。
我们也可以通过如下代码自定义死信队列的名称:
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("my-dead-letter-topic-name")
.build())
.subscribe();
需要特别注意的是消费者默认并不会订阅死信队列,也就是意外着如果有消息进入到了死信队列,说明有部分消息没有被成功消费。
如果需要自动为DLQ创建订阅,可以通过initialSubscriptionName来设置订阅组,但如果服务端将allowAutoSubscriptionCreation设置为false,则无法成功创建DLQ producer。
文章首发:https://www.codingw.net/article?id=786
见字如面,我是威哥,一个从普通二本院校毕业,从未曾接触分布式、微服务、高并发到通过技术分享实现职场蜕变,成长为RocketMQ社区优秀布道师、大厂资深架构师,出版《RocketMQ技术内幕》、《RocketMQ实战》两本技术书籍,在CSDN中记录了我的成长历程,欢迎大家关注,私信,一起交流进步。