RabbitMQ消息事务与确认机制

1
参考文章:https://www.jianshu.com/p/63ed636c773d

事务机制

基于 AMQP 实现了事务机制,类似于MySQL的事务

RabbitMQ提供了三个方法对消息发送进行事务管理:

  • txSelect():用于将通道 Channel 开启事务模式,服务端会返回Tx.Select-OK
  • txCommit():用于提交事务
  • txRollback():用于回滚事务

使用格式如下:

1
2
3
4
5
6
7
8
9
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
try{
channel.txSelect();
// publish...
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}

原理

开启事务后,消息起初并不会到指定的队列中

而是首先发送到一个临时队列中

只有当调用了txCommit()后,刚刚存储到临时队列中的消息才会到指定的队列中去

缺点

开启-提交-回滚三次操作,每次都相当于一次请求,降低了消息的吞吐量

因为走的通信太多,大量消息就会大量请求服务器,这样会非常耗时

注意

在消费者中,要将 autoACK 设置为 false,手动提交ack

而为true是不支持事物的,也就是说即使在收到消息之后回滚事务也是无济于事的,因为队列已经把消息移除了

确认机制

基于confirm模式实现确认机制

考虑到 AMQP 的事务机制性能消耗大

RabbitMQ提供了另一种低消耗的事务管理方式,使用 confirmSelect()方法

原理

confirm模式下的 channel发送的消息会生成一个唯一的有序 ID (从1开始)

一旦消息成功发送到相应的队列之后,RabbitMQ服务端 会发送给生产者一个确认标志,包含消息的 ID

1
这样生产者就知道该消息已经发送成功了

如果消息和队列是持久化的,那么只有当 RabbitMQ服务器将消息成功写入到 磁盘之后,服务端才会发送确认标志

此外,服务端也可以设置 basic.ackmutiple 域,表明是否是批量确认的消息,即该序号之前的消息都已经收到了

confirm 的机制是异步的,生产者可以在等待的同时继续发送下一条消息,并且异步等待回调处理

  • 消息发送成功,会返回 ack 消息供异步处理
  • 消息发送失败,会返回 nack 消息

confirm 的事件没有明确说明,并且同一个消息只会被 confirm 一次

处理 acknack 的方式有三种

串行 confirm

1
2
3
4
5
6
7
8
9
10
11
12
//开启confirm模式
channel.confirmSelect();
String message = "Hello World";
//发送消息
channel.basicPublish(EXCHANGE_NAME,
"",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
//判断是否回复
if(channel.waitForConfirms()){
System.out.println("Message send success.");
}

其中waitForConfirms可以换成带有时间参数的方法waitForConfirms(Long mills)指定等待响应时间

批量 confirm

每发送一批次消息就调用waitForConfirms()方法等待服务端confirm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//开启confirm模式
channel.confirmSelect();

for(int i =0;i<1000;i++){
String message = "Hello World";
//发送消息
channel.basicPublish(EXCHANGE_NAME,
"",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
if(i%100==0){
//每发送100条判断一次是否回复
if(channel.waitForConfirms()){
System.out.println("Message send success.");
}
}
}

批量的方法从数量级上降低了confirm的性能消耗,提高了效率。

但是有个致命的缺陷,一旦回复确认失败,当前确认批次的消息会全部重新发送,导致消息重复发送。

所以批量的confirm虽然性能提高了,但是消息的重复率也提高了。

异步 confirm

Channel 对象提供的 ConfirmListener() 回调方法只包含deliveryTag(当前Channel发出的消息序列号)

我们需要自己为每一个Channel维护一个unconfirm的序列号的集合

  • push一条数据,集合元素加1
  • 每回调一次handleAck方法,unconfirm集合就删掉相应的一条(multiple=false) 或者多条(multiple=true)记录

从程序运行效率上来看,这个unconfirm集合最好采用有序集合SortedSet存储结构。

使用监听方法,当服务端confirm了一条或多条消息后,调用回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//声明一个用来记录消息唯一ID的有序集合SortedSet
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//开启confirm模式
channel.confirmSelect();
//异步监听方法 处理ack与nack方法
channel.addConfirmListener(new ConfirmListener() {
//处理ack multiple 是否批量 如果是批量 则将比该条小的所有数据都移除 否则只移除该条
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
//处理nack 与ack相同
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
//获取消息confirm的唯一ID
long nextSeqNo = channel.getNextPublishSeqNo();
String message = "Hello World.";
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//将ID加入到有序集合中
confirmSet.add(nextSeqNo);
}

每一个comfirm的通道维护一个集合,每发送一条数据,集合增加一个元素,每异步响应一条ack或者nack的数据,集合删除一条。

SortedSet是一个有序的集合,它的有序是值大小的有序,不是插入时间的有序。

JDK中waitForConfirms()方法也是使用了SortedSet集合


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!