MQ(Message Queue)是什么?
作用
实现系统间的解耦
原理
通过利用高效的消息传递机制进行平台无关的数据交流,并基于数据通信进行分布式系统的集成
模型

- 生成者:不断向消息队列中生产消息
 
- 消费者:不断向队列中获取消息
 
- 消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松地实现系统间解耦
 
消息中间件
如何实现,就用到了消息中间件

AMQP协议

安装



插件
1
   | rabbitmq-plugins enable rabbitmq_management
 
  | 
 
启动服务
常用命令
1
   | systemctl start rabbitmq-server
 
  | 
 
1
   | systemctl status rabbitmq-server
 
  | 
 
1
   | systemctl restart rabbitmq-server
 
  | 
 
1
   | systemctl stop rabbitmq-server
 
  | 
 
管理命令和管理界面
通过自行配置开放端口号进入到WEB的管理界面
在没有WEB界面下,可以通过以下命令进行管理

默认端口说明
- 15672(HTTP):HTTP WEB界面端口
 
- 5672(AMQP):TCP 通讯端口(Java操作时会用到的端口)
 
- 25672(CLUSTERING):集群通讯
 
AMQP协议
Advanced Message Queuing Protocal 高级消息队列协议是一个进程间传递异步消息的网路协议

Virtual Host 相当于MySQL中的库,操作时,常常为每一个应用建立一个虚拟主机
七种消息发布模式
1
   | https://www.rabbitmq.com/getstarted.html
 
  | 
 
点对点模型

适用场景
生成者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
   | package com.wu.helloworld;
  import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wu.utils.RabbitMQUtils; import org.junit.Test;
  import java.io.IOException; import java.util.concurrent.TimeoutException;
 
 
 
 
 
  public class Provider {     @Test     public void testSendMessage() throws IOException, TimeoutException {         Connection connection = RabbitMQUtils.getConnection();                  Channel channel = connection.createChannel();         
 
 
 
 
 
 
          channel.queueDeclare("hello", false, false, false, null);         
 
 
 
 
 
          channel.basicPublish("","hello",null,"hello rabbit".getBytes());         RabbitMQUtils.closeConnectionAndChanel(channel, connection);     } }
 
 
 
  | 
 
绑定队列和发送队列
1 2 3 4 5 6 7 8 9
   | 
 
 
 
 
 
 
  channel.queueDeclare("hello", false, false, false, null);
 
  | 
 
1 2 3 4 5 6 7 8
   | 
 
 
 
 
 
  channel.basicPublish("","hello",null,"hello rabbit".getBytes());
 
  | 
 
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
   | package com.wu.helloworld;
  import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils; import java.io.IOException; import java.util.concurrent.TimeoutException;
 
 
 
 
 
  public class Customer {     public static void main(String[] args) throws IOException, TimeoutException {                  Connection connection = RabbitMQUtils.getConnection();                  Channel channel = connection.createChannel();         
 
 
 
 
 
 
          channel.queueDeclare("hello", false, false, false, null);         
 
 
 
 
          channel.basicConsume("hello", true, new DefaultConsumer(channel){             @Override              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 System.out.println("new String(body) = " + new String(body));             }         });              } }
 
  | 
 
任务模型

平均消费消息
生成者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
   | package com.wu.task;
  import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wu.utils.RabbitMQUtils; import org.junit.Test;
  import java.io.IOException;
 
 
 
 
 
  public class Provider {     @Test     public void taskProviderTest() throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();         channel.queueDeclare("work",false,false,false,null);         for (int i = 0; i < 200; i++) {             channel.basicPublish("","work",null,(i + " hello task").getBytes());         }         RabbitMQUtils.closeConnectionAndChanel(channel, connection);     } }
 
  | 
 
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
   | package com.wu.task;
  import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
  import java.io.IOException;
 
 
 
 
 
  public class Customer1 {     public static void main(String[] args) throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();         channel.queueDeclare("work",false,false,false,null);         channel.basicConsume("work",true, new DefaultConsumer(channel){             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 System.out.println("Customer1: " + new String(body));             }         });     } }
 
  | 
 
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
   | package com.wu.task;
  import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
  import java.io.IOException;
 
 
 
 
 
  public class Customer2 {     public static void main(String[] args) throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();         channel.queueDeclare("work",false,false,false,null);         channel.basicConsume("work",true, new DefaultConsumer(channel){             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 System.out.println("Customer2: " + new String(body));             }         });     } }
 
  | 
 


总结
1 2 3 4 5
   | 默认情况下,RabbitMQ在任务模型下会将消息按顺序发送给下一个使用者。
  如果消息总量能够整除消费者的数量,那么每个消费者都能够收到相同数量的消息。这种分发消息的方式成为循环。
  但是对于存在一个消费者快,一个消费者慢的情况而言,循环分发消息的方式就不适应了。
 
  | 
 
消息自动确认机制
1 2 3 4 5 6 7 8 9 10 11 12
   | 
 
 
 
 
  channel.basicConsume("work",true, new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {          System.out.println("Customer2: " + new String(body));     } });
 
  | 
 
避免信息丢失的方法
第一步
第二步
1 2 3 4 5 6 7
   |  channel.basicConsume("work",false, new DefaultConsumer(channel){       @Override       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {             System.out.println("Customer1: " + new String(body));       } });
 
  | 
 
第三步
1 2 3 4
   | 
 
  channel.basicAck(envelope.getDeliveryTag(), false);
 
  | 
 
能者多劳消费消息
1
   | 通过上例方法不仅能够实现避免消息丢失,而且还能实现能者多劳。
 
  | 
 
广播/发布/订阅模型

注意点
适用场景
- 注册
 
- 提交订单时,既要与订单表进行交互,也要和库存进行交互
 
- 生成日志
 
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
   | package com.wu.fanout;
  import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wu.utils.RabbitMQUtils;
  import java.io.IOException;
 
 
 
 
 
  public class Provider {     public static void main(String[] args) throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();         
 
 
 
          channel.exchangeDeclare("logs", "fanout");         
 
          channel.basicPublish("logs","",null,"faount type message".getBytes());         RabbitMQUtils.closeConnectionAndChanel(channel, connection);     } }
 
 
  | 
 
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
   | package com.wu.fanout;
  import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
  import java.io.IOException;
 
 
 
 
 
  public class Customer1 {     public static void main(String[] args) throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();                  channel.exchangeDeclare("logs", "fanout");                  String queueName = channel.queueDeclare().getQueue();                  channel.queueBind(queueName, "logs", "");                  channel.basicConsume(queueName, true, new DefaultConsumer(channel){             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 System.out.println("消费者1:" + new String(body));             }         });     } }
 
  | 
 


路由模型

适用场景
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
   | package com.wu.direct;
  import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wu.utils.RabbitMQUtils;
  import java.io.IOException;
 
 
 
 
 
  public class Provider {     public static void main(String[] args) throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();                  channel.exchangeDeclare("logs_direct", "direct");                  String routingKey = "info";         channel.basicPublish("logs_direct", routingKey, null, ("这是direct模型发布的基于route key:["+routingKey+"]发送的消息").getBytes());         RabbitMQUtils.closeConnectionAndChanel(channel, connection);     } }
 
  | 
 
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
   | package com.wu.direct;
  import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
  import java.io.IOException;
 
 
 
 
 
  public class Customer1 {     public static void main(String[] args) throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();                  channel.exchangeDeclare("logs_direct", "direct");                  String queue = channel.queueDeclare().getQueue();                  channel.queueBind(queue, "logs_direct", "error");                  channel.basicConsume(queue, true, new DefaultConsumer(channel){             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 System.out.println("消费者1:" + new String(body));             }         });     } }
 
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
   | package com.wu.direct;
  import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
  import java.io.IOException;
 
 
 
 
 
  public class Customer2 {     public static void main(String[] args) throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();                  channel.exchangeDeclare("logs_direct", "direct");                  String queue = channel.queueDeclare().getQueue();                  channel.queueBind(queue, "logs_direct", "info");         channel.queueBind(queue, "logs_direct", "error");         channel.queueBind(queue, "logs_direct", "warning");                  channel.basicConsume(queue, true, new DefaultConsumer(channel){             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 System.out.println("消费者1:" + new String(body));             }         });     } }
 
  | 
 




主题模型

适用场景
注意点
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
   | package com.wu.topics;
  import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wu.utils.RabbitMQUtils;
  import java.io.IOException;
 
 
 
 
 
  public class Provider {     public static void main(String[] args) throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();         channel.exchangeDeclare("topics","topic");         String routeKey = "user.save";         channel.basicPublish("topics",routeKey,null,("topic模型,routeKey: "+routeKey).getBytes());         RabbitMQUtils.closeConnectionAndChanel(channel, connection);     } }
 
  | 
 
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
   | package com.wu.topics;
  import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
  import java.io.IOException;
 
 
 
 
 
  public class Customer1 {     public static void main(String[] args) throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();         channel.exchangeDeclare("topics","topic");         String queue = channel.queueDeclare().getQueue();         channel.queueBind(queue,"topics","user.*");         channel.basicConsume(queue,true,new DefaultConsumer(channel){             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 System.out.println("消费者1: "+new String(body));             }         });     } }
 
 
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
   | package com.wu.topics;
  import com.rabbitmq.client.*; import com.wu.utils.RabbitMQUtils;
  import java.io.IOException;
 
 
 
 
 
  public class Customer2 {     public static void main(String[] args) throws IOException {         Connection connection = RabbitMQUtils.getConnection();         Channel channel = connection.createChannel();         channel.exchangeDeclare("topics","topic");         String queue = channel.queueDeclare().getQueue();         channel.queueBind(queue,"topics","user.#");         channel.basicConsume(queue,true,new DefaultConsumer(channel){             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 System.out.println("消费者2: "+new String(body));             }         });     } }
 
  | 
 


与Spring boot整合
pom依赖
1 2 3 4
   | <dependency> 	<groupId>org.springframework.boot</groupId> 	<artifactId>spring-boot-starter-amqp</artifactId> </dependency>
 
  | 
 
yml配置
1 2 3 4 5 6 7 8 9
   | spring:   application:     name: springboot-rabbitmq   rabbitmq:     host: 180.76.136.123     port: 13399     username: wu     password: 123     virtual-host: /wu_rabbitmq_study
 
  | 
 
RabbitTemplate 用来简化操作,注入即可
Spring boot点对点模型
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
   | package com.wu.springbootrabbitmq;
  import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
  @SpringBootTest(classes = RabbitmqApplication.class) @RunWith(SpringRunner.class) class RabbitmqApplicationTests {
      @Autowired     private RabbitTemplate rabbitTemplate;
           @Test     void contextLoads() {         rabbitTemplate.convertAndSend("hello", "hello world");     }
  }
 
  | 
 
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | package com.wu.springbootrabbitmq.hello;
  import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
 
 
 
 
 
 
  @Component @RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "false", autoDelete = "true")) public class HelloCustomer {
      @RabbitHandler     public void receive(String message){         System.out.println("message = " + message);     } }
 
  | 
 
Spring boot任务模型
1
   | 任务模型中,Spring AMQP实现的默认方式是公平调度
 
  | 
 
生产者
1 2 3 4 5 6 7
   |  @Test void workTest(){     for (int i = 0; i < 20; i++) {         rabbitTemplate.convertAndSend("work","work模型");     } }
 
  | 
 
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
   | package com.wu.springbootrabbitmq.work;
  import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
 
 
 
 
 
 
  @Component public class WorkCustomer {
      @RabbitListener(queuesToDeclare = @Queue("work"))     public void receive1(String message){         System.out.println("消费者1:message = " + message);     }
      @RabbitListener(queuesToDeclare = @Queue("work"))     public void receive2(String message){         System.out.println("消费者2:message = " + message);     }
  }
 
  | 
 
Spring boot广播模型
生产者
1 2 3 4 5
   |  @Test void fanoutTest(){     rabbitTemplate.convertAndSend("logs", "", "fanout模型发送消息"); }
 
  | 
 
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
   | package com.wu.springbootrabbitmq.fanout;
  import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
 
 
 
 
 
  @Component public class FanoutCustomer {
      @RabbitListener(bindings = {             @QueueBinding(                     value = @Queue, // 创建临时队列                     exchange = @Exchange(value = "logs", type = "fanout")// 指定交换机             )     })     public void receive1(String message){         System.out.println("消费者1:message = " + message);     }
      @RabbitListener(bindings = {             @QueueBinding(                     value = @Queue, // 创建临时队列                     exchange = @Exchange(value = "logs", type = "fanout")// 指定交换机             )     })     public void receive2(String message){         System.out.println("消费者2:message = " + message);     } }
 
 
 
 
  | 
 
Spring boot路由模型
生产者
1 2 3 4 5
   |  @Test void routingTest(){     rabbitTemplate.convertAndSend("directs","error","发送error的key的路由信息"); }
 
  | 
 
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
   | package com.wu.springbootrabbitmq.routing;
  import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
 
 
 
 
 
  @Component public class RoutingCustomer {     @RabbitListener(bindings = {             @QueueBinding(                     value = @Queue,// 创建临时队列                     exchange = @Exchange(value = "directs", type = "direct"),// 指定交换机                     key = {"info","error","warn"}             )     })     public void receive1(String message){         System.out.println("消费者1:message = " + message);     }
      @RabbitListener(bindings = {             @QueueBinding(                     value = @Queue,// 创建临时队列                     exchange = @Exchange(value = "directs", type = "direct"),// 指定交换机                     key = {"error"}             )     })     public void receive2(String message){         System.out.println("消费者2:message = " + message);     } }
 
  | 
 
Spring boot主题模型
生产者
1 2 3 4 5 6
   |  @Test void topicsTest(){     rabbitTemplate.convertAndSend("topics","user.save","user.save 消息");     rabbitTemplate.convertAndSend("topics","user.save.xx","user.save.xx 消息"); }
 
  | 
 
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
   | package com.wu.springbootrabbitmq.topics;
  import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
 
 
 
 
 
  @Component public class TopicsCustomer {     @RabbitListener(bindings = {             @QueueBinding(                     value = @Queue,                     exchange = @Exchange(type = "topic", name = "topics"),                     key = {"user.*"}             )     })     public void receive1(String message){         System.out.println("消费者1 message = " + message);     }
      @RabbitListener(bindings = {             @QueueBinding(                     value = @Queue,                     exchange = @Exchange(type = "topic", name = "topics"),                     key = {"user.#"}             )     })     public void receive2(String message){         System.out.println("消费者2 message = " + message);     } }
 
  | 
 
MQ的应用场景
异步处理


应用解耦

流量削峰
