RabbitMQ的六类工作模式-案例代码梳理
目录
- 1、概述
- 2、工具类封装
- 3、简单列队模式
- 4、工作列队模式
- 5、发布/订阅模式
- 6、路由模式
- 7、主题模式
- 8、交换机说明
- 9、发布确认模式(消息确认机制)
-
- 9.1、消费者实现
- 9.2、单个确认发布(生产者实现)
- 9.3、批量确认发布(生产者实现)
- 9.4、异步确认发布(生产者实现)
- 9.5、三类确认发布区别对比
- 10、源码地址
1、概述
六模式:简单模式、工作模式、发布订阅模式、路由模式、主题模式、发布确认模式
参考代码:https://gitee.com/lhzlx/rabbit-simple-demo.git
2、工具类封装
在下面每种模式的笔记中,会进行代码演示,为了方便,进行工具类的封装,代码如下:
public class RabbitMqUtils { /** * 得到一个连接的 channel * * @return * @throws Exception */ public static Channel getChannel() throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); return connection.createChannel(); }}
3、简单列队模式
由生产者
通过直接将消息发送到消费者
代码包路径:
lhz.simple
public class Producer { /** * 设置队列名称 */ private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { // 获取Channel Channel channel = RabbitMqUtils.getChannel(); /* * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello world"; /* * 发送一个消息 * 1.发送到那个交换机(可以没有) * 2.路由的 key是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送完毕"); }}
消费者:
public class Consumer { /** * 设置队列名称 */ private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { // 获取Channel Channel channel = RabbitMqUtils.getChannel(); // 消费队列消息的一个回调接口 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("消息消费成功,内容:"); System.out.println(message); }; // 取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); }; /* * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答 * 3.消费者未成功消费的回调 * 3.消费者取消消费的的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); System.out.println("等待接收消息...."); }}
4、工作列队模式
工作队列: 用来将耗时的任务分发给多个消费者,并且一个消息只能被消费一次,默认情况下RabbitMQ 将按顺序将每条消息发送给下一个消费者==(即轮询分发)==。
主要解决问题: 处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。
代码包路径:
lhz.work
public class Producer { /** * 设置队列名称 */ private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel();) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 从控制台当中接受信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息完成:" + message); } } }}
消费者:
消费者需要两个,分别为:Consumer01
、Consumer02
两者代码一致,只是名称不同,下面以Consumer01
代码为例:
// Consumer01与Consumer02代码一致public class Consumer01 { /** * 设置队列名称 */ private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { // 获取Channel Channel channel = RabbitMqUtils.getChannel(); // 消费队列消息的一个回调接口 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("Consumer01消息消费成功,内容:"); System.out.println(message); }; // 取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Consumer01消息消费被中断"); }; //采用自动应答 channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); System.out.println("Consumer01等待接收消息...."); }}
结果:
为了演示工作队列的轮询分发,需要启动两个Consumer
实例,然后再启动Producer
并且在控制台多次输入内容,可以看到两个Consumer
会依次接收消息,结果如下:
5、发布/订阅模式
发布/订阅模式是:
生产者将消息发送到交换机中,由交换机发送给不同类型的消费者,做到发布一次,消费多个,如果消费者绑定的队列名称一样,将按照轮询进行消费,所以保证了:同一个队列的中的消息不会被重复消费;
比如:
它包含一个生产者、多个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,所有消费者接收并消费消息。
代码包路径:
lhz.fanout
public class Producer { /** * 定义交换机和队列名称 */ private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { // 获取Channel Channel channel = RabbitMqUtils.getChannel(); /*绑定的交换机 参数1交互机名称 参数2 exchange类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); for (int i = 0; i < 10; i++) { String message = "消息:" + i; // 发送一个消息,1.发送到那个交换机,2.路由的 key是哪个,3.其他的参数信息,4.发送消息的消息体 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); } System.out.println("消息发送完毕"); }}
消费者:
消费者需要两个,分别为:Consumer01
、Consumer02
两者代码一致,只是名称不同,下面以Consumer01
代码为例:
/***两个消费者逻辑代码一样,只是绑定的队列不同,Consumer01:consumerFanout_sms;Consumer02:consumerFanout_email*/public class Consumer01(02) { /** * 设置队列及交换机名称 */ private static final String QUEUE_NAME = "consumerFanout_sms"; private static final String QUEUE_NAME = "consumerFanout_email"; private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { // 获取Channel Channel channel = RabbitMqUtils.getChannel(); //消费者关联队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 消费队列消息的一个回调接口 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("Consumer01(02)消息消费成功,内容:"); System.out.println(message); }; // 取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Consumer01(02)消息消费被中断"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); System.out.println("Consumer01(02)等待接收消息...."); }}
结果:
先启动一次生产者,再启动两个消费者,绑定到交换机A,以及两个不同的队列、然后再重新启动生产者,绑定到交换机A;
在生产者发生消息以后,两个不同名称的消费者队列都可以接收到消息相同
的内容,这些因为不同的消费者绑定了同一个交换机
注意:
如果消费者绑定的队列名称一样,将按照轮询进行消费,所以保证了:同一个队列的中的消息不会被重复消费;**
6、路由模式
路由模式:
跟发布订阅模式类似,在订阅模式的基础上修改了exchange类型以及加上了路由键,如果消费者的路由键一样,其效果和发布/订阅模式一致,订阅模式是分发到所有绑定到交换机的所有队列,路由模式只分发到绑定在交换机上面指定路由键的队列,一个队列可以绑定多个不同的路由
注意:
消息可能匹配多个消费者,但是同一个队列的中的消息不会被重复消费;;
我们可以看一下下面这张图:
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green。
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black、green的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
说明: 生产者发送消息到交换机,同时定义了一个路由 routingKey,多个消费者声明多个队列,与交换机进行绑定,同时定义路由 routingKey,只有和生产者发送消息时的路由 routingKey相同的消费者才能消费数据;
注意: 如果交换机和路由绑定后,需要修改路由就要修改交换机名称
代码包路径:
lhz.route
生产者:
public class Producer { /** * 定义交换机和队列名称 */ private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { // 获取Channel Channel channel = RabbitMqUtils.getChannel(); /*绑定的交换机 参数1交互机名称 参数2 exchange类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 发送消息 String message = "", sendType = ""; for (int i = 0; i < 20; i++) { if (i % 2 == 0) { sendType = "info"; message = "我是 info 级别的消息类型:" + i; } else { sendType = "error"; message = "我是 error 级别的消息类型:" + i; } System.out.println("[send]:" + message + " " + sendType); // 第二个参数就是路由键 channel.basicPublish(EXCHANGE_NAME, sendType, null, message.getBytes()); } System.out.println("消息发送完毕"); }}
消费者:
消费者需要两个,分别为:Consumer01
、Consumer02
两者代码一致,只是名称不同,下面以Consumer01
代码为例:
/*** 两个消费者逻辑代码一样,只是绑定的队列不同和不同的路由键* Consumer01:"info"、"consumer_info";Consumer02:"error"、"consumer_error";*/public class Consumer01(02) { /** * 设置队列及交换机名称 */ private static final String ROUTING_KEY = "info"; private static final String ROUTING_KEY = "error"; private static final String QUEUE_NAME = "consumer_info"; private static final String QUEUE_NAME = "consumer_error"; private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { // 获取Channel Channel channel = RabbitMqUtils.getChannel(); //消费者关联队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 消费队列消息的一个回调接口 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("Consumer01(02)消息消费成功,内容:"); System.out.println(message); }; // 取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Consumer01(02)消息消费被中断"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); System.out.println("Consumer01(02)等待接收消息...."); }}
结果:
先启动一次生产者,再启动两个消费者,绑定到交换机A,以及两个不同的队列和不同的路由键;最后重新启动生产者,绑定到交换机A;
7、主题模式
主题模式:
跟 routing 路由模式
类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由routingKey,类似于SQL中 = 和 like 的关系
注意:
消息可能匹配多个消费者,但是同一个队列的中的消息不会被重复消费;
要求:
要求
Topic 模式消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 “.” 或者 “#” 分隔开。这些单词可以是任意单词,这个单词列表最多不能超过 255 个字节。分隔符
"*(星号)":可以代替一个单词
"#(井号)":可以替代零个或多个单词
比如
- 中间带 orange 带3个单词:
*.orange.*
- 最后一个词是 rabbit 的3 个单词:
*.*.rabbit
- 以 lazy开头的多个单词
lazy.#
图示:
注意: 如果交换机和路由绑定后,需要修改路由就要修改交换机名称
代码包路径:
lhz.toptic
生产者:
public class Producer { /** * 定义交换机和队列名称 */ private static final String EXCHANGE_NAME = "topic"; public static void main(String[] args) throws Exception { // 获取Channel Channel channel = RabbitMqUtils.getChannel(); /*绑定的交换机 参数1交互机名称 参数2 exchange类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 定义路由key String routingKey = "mq.info.log"; String message = "topic_exchange_msg:" + routingKey; System.out.println("[send] = " + message); // 发送消息 // 第二个参数就是路由键 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("消息发送完毕"); }}
消费者:
public class Consumer { /** * 设置路由匹配规则 */ private static final String ROUTING_KEY = "#.log"; /** * 设置队列及交换机名称 */ private static final String QUEUE_NAME = "topic_consumer"; private static final String EXCHANGE_NAME = "topic"; public static void main(String[] args) throws Exception { // 获取Channel Channel channel = RabbitMqUtils.getChannel(); //消费者关联队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 消费队列消息的一个回调接口 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("消息消费成功,内容:"); System.out.println(message); }; // 取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); System.out.println("等待接收消息...."); }}
结果:
先启动消费者,绑定到交换机A,通过通配符模糊匹配路由;再启动生产者,绑定到交换机A,设置具体的路由键;
8、交换机说明
在没有交换机的情况,生产者直接往队列发送消息,消费者绑定队列消费相消息,但是同一个队列中一个消息只会被消费一次,所以无法满足一个消息同时被多个消费者使用;
交换机的作用就可以解决这个问题,一个交换机可以绑定多个不同的队列,一个队列绑定多个消费者;生产者将消息发送到交换机中,所有绑定了该交换机的队列都可以收到消息;所以生产者发送一次消息,可以被不同的队列**(消费者)**进行消费;当同一个队列中存在多个消费者时,消息不会被重复消费;
9、发布确认模式(消息确认机制)
概念: 生产者将信道设置成 确认(confirm) 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。
优点: confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者就可以在等待信道返回确认的同时继续发送下一条消息,当消息最终得到ack之后,生产者可以通过回调方法来处理该ack消息;如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息;
使用: 发布确认模式默认是没有开启的,生产者通过调用方法 confirmSelect
实现开启
9.1、消费者实现
代码包路径:
lhz.confirm
public class Consumer { //设置队列名称 private final static String QUEUE_NAME = "confirm_queue"; public static void main(String[] args) throws Exception { // 获取Channel Channel channel = RabbitMqUtils.getChannel(); // 消费队列消息的一个回调接口 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("消息消费成功,内容:"); System.out.println(message); }; // 取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); }; // 消费者消费消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); System.out.println("等待接收消息...."); }}
9.2、单个确认发布(生产者实现)
单个确认发布:
是一种简单的确认方式,它是一种**同步确认发布** 的方式,也就是发布的消息只有被确认发布之后,后续的消息才能继续发布,通过waitForConfirmsOrDie(long outTime)
方法,指定时间范围内(单位:毫秒)这个消息没有被确认那么它将抛出异常;通过waitForConfirms()
对broker响应的消息进行确认;
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量,当然对于某 些应用程序来说这可能已经足够了。
代码包路径:
lhz.confirm
类:SingleProducer
说明: 确认发布的实现只是对生产者代码做修改,所以消费者代码不变,参考:《9.1、消费者实现》;
步骤: 启动消费者,发送消息,然后观察耗时即可;
public class SingleProducer { // 设置队列名称 private final static String QUEUE_NAME = "confirm_queue"; // 发送消息数量 private final static Integer MESSAGE_COUNT = 100; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); //开启发布确认 channel.confirmSelect(); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //服务端返回确认状态,如果 false或超时时间内未返回,生产者可以消息重发 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功"); } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms"); } }}
9.3、批量确认发布(生产者实现)
批量确认发布:
与单个等待确认消息相比,会先发布一批消息然后一起确认 可以极大地提高吞吐量,它也是一种**同步确认发布** 的方式。
这种方式的缺点就是: 当消息发布出现问题时,不知道是哪个消息出现问题了;
代码包路径:
lhz.confirm
类:BatchProducer
**说明:**确认发布的实现只是对生产者代码做修改,所以消费者代码不变,参考:《9.1、消费者实现》;
步骤: 启动消费者,发送消息,然后观察耗时即可;
public class BatchProducer { // 设置队列名称 private final static String QUEUE_NAME = "confirm_queue"; // 发送消息数量 private final static Integer MESSAGE_COUNT = 100; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 开启发布确认 channel.confirmSelect(); // 批量确认消息大小 int batchSize = 100; // 未确认消息个数 int outstandingMessageCount = 0; long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); outstandingMessageCount++; // 达到设置的批处理大小时,进行确认 if (outstandingMessageCount == batchSize) { channel.waitForConfirms(); outstandingMessageCount = 0; } } // 为了确保还有剩余没有确认消息,进行再次确认 if (outstandingMessageCount > 0) { channel.waitForConfirms(); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms"); } }}
9.4、异步确认发布(生产者实现)
异步确认发布:
虽然编程逻辑比上两个要复杂,但是可靠性和效率更高;他是利用回调函数来保证是否投递成功。逻辑图如下:
代码包路径:
lhz.confirm
类:AsynProducer
**说明:**确认发布的实现只是对生产者代码做修改,所以消费者代码不变,参考:《9.1、消费者实现》;
步骤: 启动消费者,发送消息,然后观察耗时即可;
public class AsynProducer { /** * 设置队列名称 */ private final static String QUEUE_NAME = "confirm_queue"; /** * 发送消息数量 */ private final static Integer MESSAGE_COUNT = 500; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); //开启发布确认 channel.confirmSelect(); /* * 线程安全有序的一个哈希表,适用于高并发的情况 * 1.轻松的将序号与消息进行关联 * 2.轻松批量删除条目 只要给到序列号 * 3.支持并发访问 */ ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap(); /* * 确认收到消息的一个回调 * 1.消息序列号 * 2.true可以确认小于等于当前序列号的消息 * false确认当前序列号消息 */ ConfirmCallback ackCallback = (sequenceNumber, multiple) -> { if (multiple) { //返回的是小于等于当前序列号的未确认消息 是一个 map ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true); //清除该部分未确认消息 confirmed.clear(); } else { //只清除当前序列号的消息 outstandingConfirms.remove(sequenceNumber); } }; // 未被确认消息回调 ConfirmCallback nackCallback = (sequenceNumber, multiple) -> { String message = outstandingConfirms.get(sequenceNumber); System.out.println("未被确认消息:" + message + ",序列号" + sequenceNumber); }; /* * 添加一个异步确认的监听器 * 1.确认收到消息的回调 * 2.未收到消息的回调 */ channel.addConfirmListener(ackCallback, nackCallback); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "消息" + i; /* * channel.getNextPublishSeqNo()获取下一个消息的序列号 * 通过序列号与消息体进行一个关联 * 全部都是未确认的消息体 */ outstandingConfirms.put(channel.getNextPublishSeqNo(), message); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms"); } }}
9.5、三类确认发布区别对比
-
单独发布消息
同步等待确认,简单,但吞吐量非常有限,较耗时。 -
批量发布消息
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条 消息出现了问题,效率较高。 -
异步处理:
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些,效率高。
10、源码地址
源码地址:https://gitee.com/lhzlx/rabbit-simple-demo.git