RabbitMQ消息事务与确认机制
1  |  | 
事务机制
基于 AMQP 实现了事务机制,类似于MySQL的事务
RabbitMQ提供了三个方法对消息发送进行事务管理:
txSelect():用于将通道Channel开启事务模式,服务端会返回Tx.Select-OKtxCommit():用于提交事务txRollback():用于回滚事务
使用格式如下:
1  |  | 
原理
开启事务后,消息起初并不会到指定的队列中
而是首先发送到一个临时队列中
只有当调用了txCommit()后,刚刚存储到临时队列中的消息才会到指定的队列中去
缺点
开启-提交-回滚,三次操作,每次都相当于一次请求,降低了消息的吞吐量
因为走的通信太多,大量消息就会大量请求服务器,这样会非常耗时
注意
在消费者中,要将 autoACK 设置为 false,手动提交ack
而为true是不支持事物的,也就是说即使在收到消息之后回滚事务也是无济于事的,因为队列已经把消息移除了
确认机制
基于confirm模式实现确认机制
考虑到 AMQP 的事务机制性能消耗大
RabbitMQ提供了另一种低消耗的事务管理方式,使用 confirmSelect()方法
原理
confirm模式下的 channel发送的消息会生成一个唯一的有序 ID (从1开始)
一旦消息成功发送到相应的队列之后,RabbitMQ服务端 会发送给生产者一个确认标志,包含消息的 ID 
1  |  | 
如果消息和队列是持久化的,那么只有当 RabbitMQ服务器将消息成功写入到 磁盘之后,服务端才会发送确认标志
此外,服务端也可以设置 basic.ack 和 mutiple 域,表明是否是批量确认的消息,即该序号之前的消息都已经收到了
confirm 的机制是异步的,生产者可以在等待的同时继续发送下一条消息,并且异步等待回调处理
- 消息发送成功,会返回 
ack消息供异步处理 - 消息发送失败,会返回 
nack消息 
confirm 的事件没有明确说明,并且同一个消息只会被 confirm 一次
处理 ack 和 nack 的方式有三种
串行 confirm
1  |  | 
其中waitForConfirms可以换成带有时间参数的方法waitForConfirms(Long mills)指定等待响应时间
批量 confirm
每发送一批次消息就调用waitForConfirms()方法等待服务端confirm
1  |  | 
批量的方法从数量级上降低了confirm的性能消耗,提高了效率。
但是有个致命的缺陷,一旦回复确认失败,当前确认批次的消息会全部重新发送,导致消息重复发送。
所以批量的confirm虽然性能提高了,但是消息的重复率也提高了。
异步 confirm
Channel 对象提供的 ConfirmListener() 回调方法只包含deliveryTag(当前Channel发出的消息序列号)
我们需要自己为每一个Channel维护一个unconfirm的序列号的集合
- 每
push一条数据,集合元素加1 - 每回调一次
handleAck方法,unconfirm集合就删掉相应的一条(multiple=false) 或者多条(multiple=true)记录 
从程序运行效率上来看,这个unconfirm集合最好采用有序集合SortedSet存储结构。
使用监听方法,当服务端confirm了一条或多条消息后,调用回调方法
1  |  | 
每一个comfirm的通道维护一个集合,每发送一条数据,集合增加一个元素,每异步响应一条ack或者nack的数据,集合删除一条。
SortedSet是一个有序的集合,它的有序是值大小的有序,不是插入时间的有序。
JDK中waitForConfirms()方法也是使用了SortedSet集合
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!