pulsar事务消息
最近在重构业务里面的一个消息推送系统,重新考虑了一下mq的选型,最终选定了pulsar这个对云原生支持最好的mq,为了做到best practice,还细细研究了一下其中的事务消息特性,本文做一些简单的归纳和总结。
一、Pulsar事务定义
在Pulsar中,对于事务语义是这样定义的:允许事件流应用将消费、处理、生产消息整个过程定义为一个原子操作,即生产者或消费者能够处理跨多个主题和分区的消息,并确保这些消息作为一个单元被处理。
二、事务中的几个概念
2.1 事务协调器(Transaction Coordinator)
为了实现消息的事务性,Pulsar引入一个名为事务协调器(又名TC )的服务器端模块。TC管理生产者发送的消息,消费者接收消息以及签收消息这一整个事务过程,并把此过程作为一个整体进行提交或中止操作。
2.2 事务缓冲区(Transaction Buffer)
事务中产生的消息将存储在事务缓冲区(又称TB )中。除非消费者提交事务,否则不会将TB中的消息持久化(可见)。当事务中止时,TB中的消息将被丢弃。
2.3 事务确认(Pending Ack)
事件流应用程序(例如Pulsar Functions)可能包括消费者和生产者,其中应用程序消费来自输入topic的消息,并产生新消息以输出到输出topic。为了实现精确的一次(exactly one),我们需要确保输入消息上的确认作为事务的一部分发生,以便实现原子性。否则,如果确认输入topic和生产输出topic的消息之间出现故障,将根据两个操作的顺序发生数据重复或数据丢失,试想:如果生产者首先提交了消息,然后发生了故障,则输入消息将在客户端恢复时重新投递,因为它们未被确认,因此数据重复;如果首先确认输入消息,则提交失败的输出消息将不会重新生成,因为输入消息已被确认,因此会丢失数据。
因此,我们需要在事务中包含确认来保证原子性。为了实现这一点,我们必须改变事务中确认的行为。因为目前pulsar中的所有确认都只是best-effort的操作。ack可能在网络断开或broker崩溃期间丢失,这将导致数据重复。
2.4 物化机制
对于附加到TB的消息,事务实现还应该提供物化机制来物化未提交的消息,以使它们在事务提交时对消费者可见。
2.5 事务流
所有事务实现都可以使用上述描述的这些关键组件/概念来构造。

在上图中,事务流如下:
- 灰色方框代表不同的broker
- 灰色圆形框表示可以在broker内部运行或作为独立服务运行的逻辑组件(例如,像作为broker的一部分的function worker)
- 所有蓝色方框代表日志。日志可以是pulsar主题、bookkeeper ledger或managed ledger。
- 每个箭头代表请求流或消息流。这些操作按每个箭头旁边的数字指示的顺序进行。
- 下面的部分编号与图表中显示的操作相匹配。
三、事务步骤
3.1 开始事务
在事务开始时,pulsar客户端会找到一个事务协调器(Transaction Coordinator,简称TC),TC将为事务分配一个事务id(又名TxnID)。事务将在事务日志中记录其事务id和打开状态(表示事务是打开的)(如步骤1a所示),这确保了无论TC崩溃,事务状态都保持不变,事务状态被记录到日志后,TC将事务id回复给pulsar客户端。
3.2 事务循环
在这个阶段,pulsar客户端将进入一个事务循环,重复consume-transform-produce由事务组成的消息的动作。这是一个漫长的阶段,可能包含多个生成和确认请求。
3.3 增加分区到事务
在pulsar客户端向新的主题分区生成消息之前,客户端向TC发送一个请求,将该分区添加到事务中。TC将事务的分区更改记录到其事务日志中,以确保持久性(如2.1a所示)。这一步确保TC知道事务接触的所有分区,因此TC可以在分区结束阶段提交或中止每个分区上的更改。
3.4 发送消息到分区
pulsar客户端开始向分区产生消息。该生产流程与正常消息生产流程相同,唯一的区别是由事务产生的一批消息将包含事务id。接收该批消息的broker检查该批消息是否属于事务。如果它不属于事务,broker将批处理直接写入分区的managed ledger(这是正常的生产流程)。如果它属于一个事务,broker将把它们写入事务的事务缓冲区(TB)中。
3.5 增加订阅到事务
pulsar客户端在新的订阅首次被确认为事务的一部分时向TC发送请求。TC在步骤2.3a中记录事务的订阅添加。该步骤确保TC知道事务覆盖的所有订阅,因此TC可以在结束事务阶段提交或中止对每个订阅的更改。
3.6 响应消息到订阅
pulsar客户端开始确认订阅消息,该事务确认流程与正常确认流程相同,然而,确认请求将携带一个事务id。接收确认请求的broker检查确认是否属于该事务。如果它属于一个事务,broker将把消息标记为PENDING_ACK状态。PENDING_ACK状态意味着在确认被提交或中止之前,消息不能被其他消费者确认或否认。这使得如果一条消息上有两个事务试图确认,只有一个会成功,另一个会中止。
3.7 结束事务
在事务结束时,应用程序将决定提交或中止事务。当在确认消息上检测到冲突时,事务也可以中止。
3.8 结束事务请求
当pulsar客户端完成一个事务时,它可以向TC发出一个结束事务请求,其中一个字段指示事务是提交还是中止。
收到该请求后,TC将:
①将提交或中止消息写入其事务日志(如3.1a所示)。
②开始向该事务中涉及的所有分区提交或中止消息或确认的过程。它在3.2中展示并进行相应的描述。
③在成功提交或中止该事务中涉及的所有分区后,TC将提交或中止消息写入其事务日志。它在3.3中展示并进行相应的描述。
3.8 最终确定过程
在此阶段,TC将通过提交确认或者终止确认所有分区上的消息来完成事务。
提交生产的消息是将消息进行物化,并使它们对消费者可见(如图3.2a所示)。由于故障(例如恢复后的重试、网络断开等),提交操作可能会发生多次,TB实现须确保在提交过程中不会引入重复。
中止生成的消息将丢弃TB中的消息,如果事务中止,TB必须确保清理这些消息并回收空间。
提交确认将消息从PENDING_ACK移动到ACK。中止确认将不会确认消息,因此该消息将被重新传递给其他消费者。
四、事务实现细节
to be continued

