RabbitMQ消息事务与确认机制
1 |
|
事务机制
基于 AMQP
实现了事务机制,类似于MySQL
的事务
RabbitMQ
提供了三个方法对消息发送进行事务管理:
txSelect()
:用于将通道Channel
开启事务模式,服务端会返回Tx.Select-OK
txCommit()
:用于提交事务txRollback()
:用于回滚事务
使用格式如下:
1 |
|
原理
开启事务后,消息起初并不会到指定的队列中
而是首先发送到一个临时队列中
只有当调用了txCommit()
后,刚刚存储到临时队列中的消息才会到指定的队列中去
缺点
开启-提交-回滚,三次操作,每次都相当于一次请求,降低了消息的吞吐量
因为走的通信太多,大量消息就会大量请求服务器,这样会非常耗时
注意
在消费者中,要将 autoACK
设置为 false
,手动提交ack
而为true
是不支持事物的,也就是说即使在收到消息之后回滚事务也是无济于事的,因为队列已经把消息移除了
确认机制
基于confirm
模式实现确认机制
考虑到 AMQP
的事务机制性能消耗大
RabbitMQ
提供了另一种低消耗的事务管理方式,使用 confirmSelect()
方法
原理
confirm
模式下的 channel
发送的消息会生成一个唯一的有序 ID
(从1开始)
一旦消息成功发送到相应的队列之后,RabbitMQ服务端
会发送给生产者一个确认标志,包含消息的 ID
1 |
|
如果消息和队列是持久化的,那么只有当 RabbitMQ服务器
将消息成功写入到 磁盘之后,服务端才会发送确认标志
此外,服务端也可以设置 basic.ack
和 mutiple
域,表明是否是批量确认的消息,即该序号之前的消息都已经收到了
confirm
的机制是异步的,生产者可以在等待的同时继续发送下一条消息,并且异步等待回调处理
- 消息发送成功,会返回
ack
消息供异步处理 - 消息发送失败,会返回
nack
消息
confirm
的事件没有明确说明,并且同一个消息只会被 confirm
一次
处理 ack
和 nack
的方式有三种
串行 confirm
1 |
|
其中waitForConfirms
可以换成带有时间参数的方法waitForConfirms(Long mills)
指定等待响应时间
批量 confirm
每发送一批次消息就调用waitForConfirms()
方法等待服务端confirm
1 |
|
批量的方法从数量级上降低了confirm
的性能消耗,提高了效率。
但是有个致命的缺陷,一旦回复确认失败,当前确认批次的消息会全部重新发送,导致消息重复发送。
所以批量的confirm
虽然性能提高了,但是消息的重复率也提高了。
异步 confirm
Channel
对象提供的 ConfirmListener()
回调方法只包含deliveryTag
(当前Channel
发出的消息序列号)
我们需要自己为每一个Channel
维护一个unconfirm
的序列号的集合
- 每
push
一条数据,集合元素加1
- 每回调一次
handleAck
方法,unconfirm
集合就删掉相应的一条(multiple=false
) 或者多条(multiple=true
)记录
从程序运行效率上来看,这个unconfirm
集合最好采用有序集合SortedSet
存储结构。
使用监听方法,当服务端confirm
了一条或多条消息后,调用回调方法
1 |
|
每一个comfirm
的通道维护一个集合,每发送一条数据,集合增加一个元素,每异步响应一条ack
或者nack
的数据,集合删除一条。
SortedSet
是一个有序的集合,它的有序是值大小的有序,不是插入时间的有序。
JDK中waitForConfirms()
方法也是使用了SortedSet
集合
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!