MQ(Message Queue)是什么?
作用
实现系统间的解耦
原理
通过利用高效的消息传递机制进行平台无关的数据交流,并基于数据通信进行分布式系统的集成
模型

- 生成者:不断向消息队列中生产消息
- 消费者:不断向队列中获取消息
- 消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松地实现系统间解耦
消息中间件
如何实现,就用到了消息中间件

AMQP协议

安装



插件
1
| rabbitmq-plugins enable rabbitmq_management
|
启动服务
常用命令
1
| systemctl start rabbitmq-server
|
1
| systemctl status rabbitmq-server
|
1
| systemctl restart rabbitmq-server
|
1
| systemctl stop rabbitmq-server
|
管理命令和管理界面
通过自行配置开放端口号进入到WEB的管理界面
在没有WEB界面下,可以通过以下命令进行管理

默认端口说明
- 15672(HTTP):HTTP WEB界面端口
- 5672(AMQP):TCP 通讯端口(Java操作时会用到的端口)
- 25672(CLUSTERING):集群通讯
AMQP协议
Advanced Message Queuing Protocal 高级消息队列协议是一个进程间传递异步消息的网路协议

Virtual Host 相当于MySQL中的库,操作时,常常为每一个应用建立一个虚拟主机
七种消息发布模式
1
| https://www.rabbitmq.com/getstarted.html
|
点对点模型

适用场景
生成者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.wu.helloworld;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wu.utils.RabbitMQUtils; import org.junit.Test;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Provider { @Test public void testSendMessage() throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicPublish("","hello",null,"hello rabbit".getBytes()); RabbitMQUtils.closeConnectionAndChanel(channel, connection); } }
|
绑定队列和发送队列
1 2 3 4 5 6 7 8 9
|
channel.queueDeclare("hello", false, false, false, null);
|
1 2 3 4 5 6 7 8
|
channel.basicPublish("","hello",null,"hello rabbit".getBytes());
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package com.wu.helloworld;
import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils; import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Customer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume("hello", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("new String(body) = " + new String(body)); } }); } }
|
任务模型

平均消费消息
生成者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.wu.task;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wu.utils.RabbitMQUtils; import org.junit.Test;
import java.io.IOException;
public class Provider { @Test public void taskProviderTest() throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",false,false,false,null); for (int i = 0; i < 200; i++) { channel.basicPublish("","work",null,(i + " hello task").getBytes()); } RabbitMQUtils.closeConnectionAndChanel(channel, connection); } }
|
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.wu.task;
import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",false,false,false,null); channel.basicConsume("work",true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Customer1: " + new String(body)); } }); } }
|
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.wu.task;
import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",false,false,false,null); channel.basicConsume("work",true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Customer2: " + new String(body)); } }); } }
|


总结
1 2 3 4 5
| 默认情况下,RabbitMQ在任务模型下会将消息按顺序发送给下一个使用者。
如果消息总量能够整除消费者的数量,那么每个消费者都能够收到相同数量的消息。这种分发消息的方式成为循环。
但是对于存在一个消费者快,一个消费者慢的情况而言,循环分发消息的方式就不适应了。
|
消息自动确认机制
1 2 3 4 5 6 7 8 9 10 11 12
|
channel.basicConsume("work",true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Customer2: " + new String(body)); } });
|
避免信息丢失的方法
第一步
第二步
1 2 3 4 5 6 7
| channel.basicConsume("work",false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Customer1: " + new String(body)); } });
|
第三步
1 2 3 4
|
channel.basicAck(envelope.getDeliveryTag(), false);
|
能者多劳消费消息
1
| 通过上例方法不仅能够实现避免消息丢失,而且还能实现能者多劳。
|
广播/发布/订阅模型

注意点
适用场景
- 注册
- 提交订单时,既要与订单表进行交互,也要和库存进行交互
- 生成日志
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.wu.fanout;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
public class Provider { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare("logs", "fanout");
channel.basicPublish("logs","",null,"faount type message".getBytes()); RabbitMQUtils.closeConnectionAndChanel(channel, connection); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.wu.fanout;
import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", ""); channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } }
|


路由模型

适用场景
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.wu.direct;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
public class Provider { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct", "direct"); String routingKey = "info"; channel.basicPublish("logs_direct", routingKey, null, ("这是direct模型发布的基于route key:["+routingKey+"]发送的消息").getBytes()); RabbitMQUtils.closeConnectionAndChanel(channel, connection); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.wu.direct;
import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct", "direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "logs_direct", "error"); channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.wu.direct;
import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct", "direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "logs_direct", "info"); channel.queueBind(queue, "logs_direct", "error"); channel.queueBind(queue, "logs_direct", "warning"); channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } }
|




主题模型

适用场景
注意点
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.wu.topics;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
public class Provider { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics","topic"); String routeKey = "user.save"; channel.basicPublish("topics",routeKey,null,("topic模型,routeKey: "+routeKey).getBytes()); RabbitMQUtils.closeConnectionAndChanel(channel, connection); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package com.wu.topics;
import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics","topic"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"topics","user.*"); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1: "+new String(body)); } }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.wu.topics;
import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
import java.io.IOException;
public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics","topic"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"topics","user.#"); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2: "+new String(body)); } }); } }
|


与Spring boot整合
pom依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
yml配置
1 2 3 4 5 6 7 8 9
| spring: application: name: springboot-rabbitmq rabbitmq: host: 180.76.136.123 port: 13399 username: wu password: 123 virtual-host: /wu_rabbitmq_study
|
RabbitTemplate
用来简化操作,注入即可
Spring boot点对点模型
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.wu.springbootrabbitmq;
import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = RabbitmqApplication.class) @RunWith(SpringRunner.class) class RabbitmqApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test void contextLoads() { rabbitTemplate.convertAndSend("hello", "hello world"); }
}
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.wu.springbootrabbitmq.hello;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "false", autoDelete = "true")) public class HelloCustomer {
@RabbitHandler public void receive(String message){ System.out.println("message = " + message); } }
|
Spring boot任务模型
1
| 任务模型中,Spring AMQP实现的默认方式是公平调度
|
生产者
1 2 3 4 5 6 7
| @Test void workTest(){ for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("work","work模型"); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.wu.springbootrabbitmq.work;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class WorkCustomer {
@RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message){ System.out.println("消费者1:message = " + message); }
@RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String message){ System.out.println("消费者2:message = " + message); }
}
|
Spring boot广播模型
生产者
1 2 3 4 5
| @Test void fanoutTest(){ rabbitTemplate.convertAndSend("logs", "", "fanout模型发送消息"); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.wu.springbootrabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class FanoutCustomer {
@RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(value = "logs", type = "fanout")// 指定交换机 ) }) public void receive1(String message){ System.out.println("消费者1:message = " + message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(value = "logs", type = "fanout")// 指定交换机 ) }) public void receive2(String message){ System.out.println("消费者2:message = " + message); } }
|
Spring boot路由模型
生产者
1 2 3 4 5
| @Test void routingTest(){ rabbitTemplate.convertAndSend("directs","error","发送error的key的路由信息"); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package com.wu.springbootrabbitmq.routing;
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class RoutingCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue,// 创建临时队列 exchange = @Exchange(value = "directs", type = "direct"),// 指定交换机 key = {"info","error","warn"} ) }) public void receive1(String message){ System.out.println("消费者1:message = " + message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue,// 创建临时队列 exchange = @Exchange(value = "directs", type = "direct"),// 指定交换机 key = {"error"} ) }) public void receive2(String message){ System.out.println("消费者2:message = " + message); } }
|
Spring boot主题模型
生产者
1 2 3 4 5 6
| @Test void topicsTest(){ rabbitTemplate.convertAndSend("topics","user.save","user.save 消息"); rabbitTemplate.convertAndSend("topics","user.save.xx","user.save.xx 消息"); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package com.wu.springbootrabbitmq.topics;
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class TopicsCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic", name = "topics"), key = {"user.*"} ) }) public void receive1(String message){ System.out.println("消费者1 message = " + message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic", name = "topics"), key = {"user.#"} ) }) public void receive2(String message){ System.out.println("消费者2 message = " + message); } }
|
MQ的应用场景
异步处理


应用解耦

流量削峰
