RabbitMQ学习

MQ(Message Queue)是什么?

作用

实现系统间的解耦

原理

通过利用高效的消息传递机制进行平台无关的数据交流,并基于数据通信进行分布式系统的集成

模型

image-20211225134539204

  • 生成者:不断向消息队列中生产消息
  • 消费者:不断向队列中获取消息
  • 消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松地实现系统间解耦
1
因此跨系统通讯时,首选消息队列

消息中间件

如何实现,就用到了消息中间件

image-20211225135206345

AMQP协议

image-20211225140413964

安装

image-20211225144002366

image-20211225144024194

image-20211225144417498

插件

1
rabbitmq-plugins enable rabbitmq_management

启动服务

1
rabbitmq-server

常用命令

1
systemctl start rabbitmq-server
1
systemctl status rabbitmq-server
1
systemctl restart rabbitmq-server
1
systemctl stop rabbitmq-server
1
journalctl -xe 查看报错日志

管理命令和管理界面

通过自行配置开放端口号进入到WEB的管理界面

在没有WEB界面下,可以通过以下命令进行管理

image-20211225153705068

默认端口说明

  • 15672(HTTP):HTTP WEB界面端口
  • 5672(AMQP):TCP 通讯端口(Java操作时会用到的端口)
  • 25672(CLUSTERING):集群通讯

AMQP协议

Advanced Message Queuing Protocal 高级消息队列协议是一个进程间传递异步消息的网路协议

image-20211225170524287

Virtual Host 相当于MySQL中的库,操作时,常常为每一个应用建立一个虚拟主机

七种消息发布模式

1
https://www.rabbitmq.com/getstarted.html

点对点模型

1
hello world

image-20211226162905062

适用场景

  • 登录

生成者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.wu.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wu.utils.RabbitMQUtils;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author Haixin Wu
* @date 2021/12/25 20:58
* @since 1.0
*/
public class Provider {
@Test
public void testSendMessage() throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
//从连接中创建一个通道对象
Channel channel = connection.createChannel();
/**
* 通道绑定对应的消息队列
* 参数1:队列名称,如果队列不存在自动创建
* 参数2:定义的队列是否持久化?持久化会将队列写入磁盘,重启后仍存在
* 参数3:是否独占队列?独占表示一个队列只允许当前连接可用
* 参数4:是否在消费完成后自动消除队列?
* 参数5:附加参数
*/
channel.queueDeclare("hello", false, false, false, null);
/**
* 发布消息
* 参数1:交换机名称
* 参数2:队列名称
* 参数3:传递消息额外设置
* 参数4:消息的具体内容
*/
channel.basicPublish("","hello",null,"hello rabbit".getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel, connection);
}
}


绑定队列和发送队列

1
2
3
4
5
6
7
8
9
/**
* 通道绑定对应的消息队列
* 参数1:队列名称,如果队列不存在自动创建
* 参数2:定义的队列是否持久化?持久化会将队列写入磁盘,重启后仍存在,仅是队列持久化,其中的消息内容并不会持久化
* 参数3:是否独占队列?独占表示一个队列只允许当前连接可用
* 参数4:是否在消费者消费完成(断开连接)后自动消除队列?
* 参数5:附加参数
*/
channel.queueDeclare("hello", false, false, false, null);
1
2
3
4
5
6
7
8
/**
* 发布消息
* 参数1:交换机名称
* 参数2:队列名称
* 参数3:传递消息额外设置,例如可以传参MessageProperties.PERSISTENT_TEXT_PLAIN表示队列中的消息持久化
* 参数4:消息的具体内容
*/
channel.basicPublish("","hello",null,"hello rabbit".getBytes());

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.wu.helloworld;

import com.rabbitmq.client.*;
import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author Haixin Wu
* @date 2021/12/25 21:20
* @since 1.0
*/
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
Connection connection = RabbitMQUtils.getConnection();
//从连接中创建一个通道对象
Channel channel = connection.createChannel();
/**
* 通道绑定对应的消息队列
* 参数1:队列名称,如果队列不存在自动创建
* 参数2:定义的队列是否持久化?持久化会将队列写入磁盘,重启后仍存在
* 参数3:是否独占队列?独占表示一个队列只允许当前连接可用
* 参数4:是否在消费者消费完成(断开连接)后自动消除队列?
* 参数5:附加参数
*/
channel.queueDeclare("hello", false, false, false, null);
/**
* 消费消息
* 参数1:队列名称
* 参数2:开始消息时的自动确认机制
* 参数3:消费时的回调接口
*/
channel.basicConsume("hello", true, new DefaultConsumer(channel){
@Override // 最后一个参数:消息队列中取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body) = " + new String(body));
}
});
//不建议关闭,因为消费者要一直监听
}
}

任务模型

1
Work queues

image-20211226162919813

平均消费消息

生成者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.wu.task;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wu.utils.RabbitMQUtils;
import org.junit.Test;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2021/12/26 16:44
* @since 1.0
*/
public class Provider {
@Test
public void taskProviderTest() throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",false,false,false,null);
for (int i = 0; i < 200; i++) {
channel.basicPublish("","work",null,(i + " hello task").getBytes());
}
RabbitMQUtils.closeConnectionAndChanel(channel, connection);
}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.wu.task;

import com.rabbitmq.client.*;
import com.wu.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2021/12/26 16:44
* @since 1.0
*/
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",false,false,false,null);
channel.basicConsume("work",true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer1: " + new String(body));
}
});
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.wu.task;

import com.rabbitmq.client.*;
import com.wu.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2021/12/26 16:45
* @since 1.0
*/
public class Customer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",false,false,false,null);
channel.basicConsume("work",true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer2: " + new String(body));
}
});
}
}

image-20211226170247634image-20211226170302705

总结

1
2
3
4
5
默认情况下,RabbitMQ在任务模型下会将消息按顺序发送给下一个使用者。

如果消息总量能够整除消费者的数量,那么每个消费者都能够收到相同数量的消息。这种分发消息的方式成为循环。

但是对于存在一个消费者快,一个消费者慢的情况而言,循环分发消息的方式就不适应了。

消息自动确认机制

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 在默认的消息消费的机制(平均分配)下,代表着开始消息时的自动确认机制的第二参数,是开启的,
* 这种情况下它不关心你的业务是否处理完,消息在接收到消息时会自动地向消息队列中表示确认。
* 举例说明:如果分配给该消费者5个消息,此时消费者无论消息是否完成都会自动地向消息队列中表示确认,
* 如果进行到第3个消息时宕机了,剩下的2个消息就会被丢失了,这并不是我们希望发生的
*/
channel.basicConsume("work",true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer2: " + new String(body));
}
});

避免信息丢失的方法

第一步

1
2
// 不能让消息队列将消息一次性地给消费者,让消息一个一个地来,所以就让消息通道中只有一个消息
channel.basicQos(1);

第二步

1
2
3
4
5
6
7
// 关闭消息确认机制
channel.basicConsume("work",false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer1: " + new String(body));
}
});

第三步

1
2
3
4
// 手动确认消息。如果服务器突然宕机,未确认的消息就会重新回到消息队列中等待消费
// 第一个参数,标记为手动确认消息标识
// 第二个参数,是否开启多消息同时确认,false每次确认一个。因为通道中每次只有一个消息,所以此处设为false
channel.basicAck(envelope.getDeliveryTag(), false);

能者多劳消费消息

1
通过上例方法不仅能够实现避免消息丢失,而且还能实现能者多劳。
image-20211229225000769 image-20211229225016640

广播/发布/订阅模型

1
Publish/Subscribe

image-20211230202722263

注意点

1
交换机和队列的自动生成取决于消费者

适用场景

  • 注册
  • 提交订单时,既要与订单表进行交互,也要和库存进行交互
  • 生成日志

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.wu.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wu.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2021/12/30 20:34
* @since 1.0
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 将通道指明指定交换机
* 参数1.交换机名称
* 参数2.交换机类型
*/
channel.exchangeDeclare("logs", "fanout");
/**
* 发送消息
*/
channel.basicPublish("logs","",null,"faount type message".getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel, connection);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.wu.fanout;

import com.rabbitmq.client.*;
import com.wu.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2021/12/30 20:42
* @since 1.0
*/
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("logs", "fanout");
// 临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queueName, "logs", "");
// 消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
}
});
}
}

image-20211230205302790

image-20211230205309723

路由模型

1
Routing

image-20211231161512169

适用场景

  • 日志打印,根据日志类型分发到不同的消费者中去

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.wu.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wu.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2021/12/31 16:20
* @since 1.0
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 通过通道声明交换机 参数1:交换机名称 参数2:路由模式
channel.exchangeDeclare("logs_direct", "direct");
// 发送消息
String routingKey = "info";
channel.basicPublish("logs_direct", routingKey, null, ("这是direct模型发布的基于route key:["+routingKey+"]发送的消息").getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel, connection);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.wu.direct;

import com.rabbitmq.client.*;
import com.wu.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2021/12/31 16:26
* @since 1.0
*/
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 通道声明交换机和交换机的类型
channel.exchangeDeclare("logs_direct", "direct");
// 创建一个临时队列
String queue = channel.queueDeclare().getQueue();
// 基于route key绑定队列和交换机
channel.queueBind(queue, "logs_direct", "error");
// 获取队列中的消息
channel.basicConsume(queue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.wu.direct;

import com.rabbitmq.client.*;
import com.wu.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2021/12/31 16:26
* @since 1.0
*/
public class Customer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 通道声明交换机和交换机的类型
channel.exchangeDeclare("logs_direct", "direct");
// 创建一个临时队列
String queue = channel.queueDeclare().getQueue();
// 基于route key绑定队列和交换机
channel.queueBind(queue, "logs_direct", "info");
channel.queueBind(queue, "logs_direct", "error");
channel.queueBind(queue, "logs_direct", "warning");
// 获取队列中的消息
channel.basicConsume(queue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
}
});
}
}

image-20211231164051971

image-20211231164117009

image-20211231164148933

image-20211231164155421

主题模型

image-20220101141004814

适用场景

1

注意点

1
只有主题模型才支持通配符

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.wu.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wu.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2022/1/1 14:00
* @since 1.0
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String routeKey = "user.save";
channel.basicPublish("topics",routeKey,null,("topic模型,routeKey: "+routeKey).getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel, connection);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.wu.topics;

import com.rabbitmq.client.*;
import com.wu.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2022/1/1 14:14
* @since 1.0
*/
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"topics","user.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1: "+new String(body));
}
});
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.wu.topics;

import com.rabbitmq.client.*;
import com.wu.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author Haixin Wu
* @date 2022/1/1 14:14
* @since 1.0
*/
public class Customer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"topics","user.#");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2: "+new String(body));
}
});
}
}

image-20220101142446275

image-20220101142455176

与Spring boot整合

pom依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml配置

1
2
3
4
5
6
7
8
9
spring:
application:
name: springboot-rabbitmq
rabbitmq:
host: 180.76.136.123
port: 13399
username: wu
password: 123
virtual-host: /wu_rabbitmq_study

RabbitTemplate 用来简化操作,注入即可

Spring boot点对点模型

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.wu.springbootrabbitmq;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = RabbitmqApplication.class)
@RunWith(SpringRunner.class)
class RabbitmqApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

// hello world
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("hello", "hello world");
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.wu.springbootrabbitmq.hello;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 默认是持久化、非独占、不是自动删除
* @author Haixin Wu
* @date 2022/1/1 14:47
* @since 1.0
*/
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "false", autoDelete = "true"))
public class HelloCustomer {

@RabbitHandler
public void receive(String message){
System.out.println("message = " + message);
}
}

Spring boot任务模型

1
任务模型中,Spring AMQP实现的默认方式是公平调度

生产者

1
2
3
4
5
6
7
// work
@Test
void workTest(){
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("work","work模型");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.wu.springbootrabbitmq.work;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 任务模型中,Spring AMQP实现的默认方式是公平调度
* @author Haixin Wu
* @date 2022/1/1 17:15
* @since 1.0
*/
@Component
public class WorkCustomer {

@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message){
System.out.println("消费者1:message = " + message);
}

@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message){
System.out.println("消费者2:message = " + message);
}

}

Spring boot广播模型

生产者

1
2
3
4
5
// fanout
@Test
void fanoutTest(){
rabbitTemplate.convertAndSend("logs", "", "fanout模型发送消息");
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.wu.springbootrabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author Haixin Wu
* @date 2022/1/1 17:28
* @since 1.0
*/
@Component
public class FanoutCustomer {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "logs", type = "fanout")// 指定交换机
)
})
public void receive1(String message){
System.out.println("消费者1:message = " + message);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "logs", type = "fanout")// 指定交换机
)
})
public void receive2(String message){
System.out.println("消费者2:message = " + message);
}
}



Spring boot路由模型

生产者

1
2
3
4
5
// routing
@Test
void routingTest(){
rabbitTemplate.convertAndSend("directs","error","发送error的key的路由信息");
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.wu.springbootrabbitmq.routing;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author Haixin Wu
* @date 2022/1/1 17:35
* @since 1.0
*/
@Component
public class RoutingCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 创建临时队列
exchange = @Exchange(value = "directs", type = "direct"),// 指定交换机
key = {"info","error","warn"}
)
})
public void receive1(String message){
System.out.println("消费者1:message = " + message);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 创建临时队列
exchange = @Exchange(value = "directs", type = "direct"),// 指定交换机
key = {"error"}
)
})
public void receive2(String message){
System.out.println("消费者2:message = " + message);
}
}

Spring boot主题模型

生产者

1
2
3
4
5
6
// topics
@Test
void topicsTest(){
rabbitTemplate.convertAndSend("topics","user.save","user.save 消息");
rabbitTemplate.convertAndSend("topics","user.save.xx","user.save.xx 消息");
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.wu.springbootrabbitmq.topics;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author Haixin Wu
* @date 2022/1/1 17:41
* @since 1.0
*/
@Component
public class TopicsCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic", name = "topics"),
key = {"user.*"}
)
})
public void receive1(String message){
System.out.println("消费者1 message = " + message);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic", name = "topics"),
key = {"user.#"}
)
})
public void receive2(String message){
System.out.println("消费者2 message = " + message);
}
}

MQ的应用场景

异步处理

image-20220101175143769

image-20220101175240992

应用解耦

image-20220101175525304

流量削峰

image-20220101175701881


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!