RabbitMQ高级特性和集群 如何保障消息的可靠性投递 如何保障消息的百分百投递成功 什么是生产端的可靠性投递 1、保障把消息成功发送出去
2、保障MQ服务成功接收到消息
3、消息发送者收到MQ服务的确认应答
4、完善的消息补偿推送机制
方案一:同步落消息表 + 定时补偿 + 人工补偿
在消息生产者端(也就是订单服务)
正常链路流程
第一步(该环节调用了操作了二次数据库):在创建订单的操作的时候,把数据插入到订单相关的表中,并且构造调
用物流模块的数据消息,把消息插入到消息表中,初始状态为0
第二步: 把物流消息投递到消息队列中,
第三步;消息队列返回一个确认消息,并且由订单服务来监控mq server的确认消息
第四步:根据收到的确认消息来更新数据库中的消息记录的状态
异常链路流程
第一步(该环节调用了操作了二次数据库):在创建订单的操作的时候,把数据插入到订单相关的表中,并且构造调
用物流模块的数据消息,把消息插入到消息表中,初始状态为0
第二步: 把物流消息投递到消息队列中,
第三步:由于网络闪断,导致消费端监控mq服务访问的确认消息没有收到,那么在msg_db中的那条消息的
状态永远就是0状态。这个时候,我们需要对这种情况下做出补偿
补偿机制:
启动一个分布式的定时任务,不定时的去扫描msg_db的这个表,状态为0的消息记录,在这里我们可以根据
业务来设置扫描重发规则规则1:插入msg_db 表中5Min后状态还是为0的记录,进行消息重试
规则2:若重试的次数超过五次状态还是为0的话,我们就把消息状态改为2,此时我们需要人工的去确认状态
为2的消息是什么原因导致没有成功的
消息入库打标的缺点:
在第一步的过程中,既插入了业务数据表,也同时插入了消息记录表,进行了二次db操作,在高并发的环
境下,这个环境就会造成性能瓶颈
代码实现
1、消息状态
2、消息结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class MessageContent { private String msgId; private long orderNo; private Date createTime; private Date updateTime; private Integer msgStatus; private String exchange; private String routingKey; private String errCause; private Integer maxRetry; private Integer currentRetry=0 ; private Integer productNo; }
设置响应和回退监听处理
ConfirmCallback
ReturnCallback
方案二:延时消息确认 + 回调检查消息状态补偿
优点:性能较好
缺点:流程复杂,且没有重试上限机制,缺少人工干预补偿
幂等性以及消息的幂等性 什么是接口的幂等性?
简而言之,就是对接口发起的一次调用和多次调用,所产生的结果都是 一致的。
接口不幂等的后果
案例一:比如订单提交的过程中,用户点了一次提交,但是由于网络等原因,导致后端处理延时,客户就连续点击了多次,在没有幂等性的条件下,那么就会造成订单的重复提交。
解决方案:在保存订单的时候,根据生成的系统全局唯一ID(这里订单号+业务类型),并且把该唯一ID 调用 redis 的setnx命令保存起来,在第一次保存的时候,由于redis中没有该key,那么就会 把全局唯一ID 进行设置上,此时订单就会保存成功,。这个时候若出现前端重复点击按钮, 由于第一步已经 setnx上了 就会阻止后面的保存。
如何保证接口幂等
MQ 是如何解决幂等性的 MQ发送消息的流程
第一步:消息生产者向Mq服务端发送消息 第二步:mq 服务端把消息进行落地 第三步:消息服务端向消息生产者发送ack 第四步;消息消费者消费消息 第五步:消费者发送ack 第六步: mq服务将落地消息删除
消息重复发送的原因
为了保障消息的百分之百的投递,我们使用了消息重发,确认机制,使得消息可能被重复发送,由上图可知 道,由于网络原因,第三步的上半场ack丢失还是第五步的下半场ack丢失 都会导致消息重复发送
1 2 3 4 5 6 7 8 1、生产者已把消息发送到mq,在mq给生产者返回ack的时候网络中断,故生产者未收到确 定信息,生产者认为消息未发送成功,但实际情况是,mq已成功接收到了消息,在网络重 连后,生产者会重新发送刚才的消息,造成mq接收了重复的消息 2、消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网 络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再 次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消 息;
消息重复发送的导致后果
上半场消息生产者是用户支付模块,专门是用来给用户扣费的,而下半场的消息消费者服务是会员卡服务, 是通过接受扣费服务发送的消息来进行发卡的, 由于第三步或者是第五步ack丢失,那么就会导致上游服务重复发送消息就会导致扣一次款,发多次卡
服务端是如何保证幂等性的
消息队列的服务中,对每一条消息都会生成一个全局唯一的与业务无关的ID(inner_msg_id),当mq_server 接受到消息的时候,先根据inner_msg_id 是否需要重复发送,再决定消息是否落DB ,这样保证每条消息都 只会落一次DB
消费端如何来做到幂等性的
目前主流做法是 直接避免代码重复消费 和 代码支持重复消费,业务上幂等
1、唯一业务ID
代码接受重复消息,但还是业务上结果是幂等的:如果有插入数据库的业务,根据唯一业务id获取分布式锁,进行插入或者更新(重复消费时) 2、(唯一业务ID+(辅助标志位(UUID、时间戳等是业务场景而定))or 唯一消息id
消费时redis setnx成功则继续消费,失败则视为重复消费,跳过消费返回ack
消息的confirm机制 mq 的confirm机制 1:消息的确认:指的是生产者将消息投递后,如何mq-server接受到消息,就会给生产者一个应答.
2:生产者接受到应答,来确保该条消息是否成功发送到了mq-server
3:confirm机制是消息可靠性投递的核心保障
mq的confirm机制的核心流程图
confirm机制的现实步骤 第一步:在channel 上开启确认模式 channel.confirmSelect();
第二步:在channel上添加监听,用来监听mq-server返回的应答
代码演示 1 2 3 4 5 6 7 8 9 10 11 12 channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck (long deliveryTag, boolean multiple) throws IOException { System.out.println("消息deliveryTag" +deliveryTag+"被正常签收" ); } @Override public void handleNack (long deliveryTag, boolean multiple) throws IOException { System.out.println("消息deliveryTag" +deliveryTag+"没被签收" ); } });
消息return机制 Return Listener是用来处理一些不可路由的消息 生产者发送消息到broker时
情况一:broker中根本没有对应的exchange交换机来接受该消息
情况二:消息能够投递到broker的交换机上,但是交换机根据routingKey 路由不到某一个队列上
处理一;若在消息生产端 的mandatory设置为true 那么就会调用生产端ReturnListener 来处理
处理二;若消息生产端的mandatory设置为false(默认值也是false) 那么mq-broker就会自动删除消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn (int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("记录不可达消息........................" ); System.out.println("replaycode=" +replyCode); System.out.println("replyText=" +replyText); System.out.println("exchange=" +exchange); System.out.println("routingKey=" +routingKey); System.out.println("properties=" +properties); System.out.println("body=" +new String(body)); } }); for (int i = 0 ; i < 5 ; i++) { String message = "hello--" + i; channel.basicPublish("directchange" , "directchange.key" ,true , basicProperties, message.getBytes()); }
消费端如何做限流量 什么是消费端的限流 场景:首先,我们迎来了订单的高峰期,在mq的broker上堆积了成千上万条消息没有处理,这个时候,我们随便打开了 消费者,就会出现下面请 如此多的消息瞬间推送给消费者,我们的消费者不能处理这么多消息 就会导致消费者出现巨大压力,甚至服务器崩溃
解决方案 rabbitmq 提供一个钟qos(服务质量保证),也就是在关闭了消费端的自动ack的前提 下,我们可以设置阈值(出队)的消息数没有被确认(手动确认),那么就不会推送 消息过来. 限流的级别(consumer级别或者是channel级别)
实现的方式 void BasicQos(uint prefetchSize,ushort prefetchCount ,bool global) uint prefetchSize :指定的是设定消息的大小(rabbitmq还没有该功能,所以一般是填写0表示不限制) ushort perfetchCount :表示设置消息的阈值,每次过来几条消息(一般是填写1 一条 一条的处理消息) bool global:表示是channel级别的还是 consumer的限制(channel的限制rabbitmq 还没有该功能)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 消费端 channel.basicQos(0 ,1 ,false ); channel.basicConsume(queueName,false ,new AngleCustomConsumer(channel));public class AngleCustomConsumer extends DefaultConsumer { private Channel channel; public AngleCustomConsumer (Channel channel) { super (channel); this .channel = channel; } @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("自定义的消息消费端" ); System.out.println("consumerTag=" +consumerTag); System.out.println("envelope=" +envelope); System.out.println("properties=" +properties); System.out.println("body=" +new String(body)); } }
消费端的ack 消费端的ack模式 消费端的ack类型:自动ack 和手动ack 做消息限流的时候,我们需要关闭自动ack 然后进行手动ack的确认,若我们业务出现了问题,我们就可以进行nack
手动ack
1 channel.basicAck(envelope.getDeliveryTag(),false );
重回队列 当消费端进行了nack的操作的时候,我们可以通过设置来进行对消息的重回队列的操作(但是一般我们不会设置重回队列的操作,而是通过告警系统然后人工补偿等手段补偿。因为一般而言,第一次消费报错如果放回队列,还是会报错的,放回队列会导致死循环)
手动nack
1 channel.basicNack(envelope.getDeliveryTag(),false ,false );
死信队列&死信交换机 死信队列DLX(Dead-leater-exchange) 什么是死信 就是在队列中的消息如果没有消费者消费,那么该消息就成为一个死信,那这个消息被重新发送到另外一个exchange上的话, 那么后面这个exhcange就是死信队列
消息变成死信的几种情况 消息被拒绝 :(basic.reject/basic.nack)并且requeue(重回队列)的属性设置为 false 表示不需要重回队列,那么该消息就是一个死信消息
消息TTL过期 :消息本身设置了过期时间,或者队列设置了消息过期时间x-message-ttl
队列达到最大长度 :比如队列最大长度是3000 ,那么3001消息就会被送到死信队列上.
如何设置死信队列 绑定正常队列时
1、声明正常交换机、正常队列、死信队列交换、死信队列
2、声明正常队列时,设置argument:x-dead-letter-exchange属性值=死信队列交换机
3、正常交换机绑定正常队列、死信交换机绑定死信队列
示例代码 消费者 1、设置队列最大长度=5
1 2 3 4 5 Map<String,Object> argurments= new HashMap<>() ; argurments.put("x-max-length" , 5 ); channel.queueDeclare(normalQueueName,true ,false ,false ,argurments);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class DLX_CustomConsumer extends DefaultConsumer { private Channel channel; public DLX_CustomConsumer (Channel channel) { super (channel); this .channel = channel; } @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("为了测试死信队列,我们进行nack" ); channel.basicNack(envelope.getDeliveryTag(),false ,false ); } }public class DLX_Consumer { public static void main (String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); String normalExchangeName = "normal_exchange" ; String normalKey = "normaldlx.key" ; String normalQueueName = "normal_queue" ; String exchangeType = "topic" ; String dlxExchangeName = "dlx_exchange" ; String dlxQueueName = "dlx_queue" ; channel.exchangeDeclare(normalExchangeName,exchangeType,true ,false ,null ); Map<String,Object> argurments= new HashMap<>() ; argurments.put("x-dead-letter-exchange" , dlxExchangeName); argurments.put("x-max-length" , 5 ); channel.queueDeclare(normalQueueName,true ,false ,false ,argurments); channel.queueBind(normalQueueName,normalExchangeName,normalKey); channel.exchangeDeclare(dlxExchangeName,exchangeType,true ,false ,false ,null ); channel.queueDeclare(dlxQueueName,true ,false ,false ,null ); channel.queueBind(dlxQueueName,dlxExchangeName,"#" ); channel.basicConsume(normalQueueName,false ,new DLX_CustomConsumer(channel)); } }
生产者 1、设置消息过期时间
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration(“10000”)//消息过期 10s
2、一次性发送10条消息(队列限制5条,其余5条被发至死信队列)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class RabbitmqMessageProducter { public static void main (String[] args) throws IOException, TimeoutException, InterruptedException { String normalExchangeName = "normal_exchange" ; String normalKey = "normaldlx.key" ; Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .contentEncoding("utf-8" ) .correlationId(UUID.randomUUID().toString()) .build(); channel.basicPublish(normalExchangeName, normalKey,basicProperties, "测试消息过期转为死信队列" .getBytes()); for (int i = 0 ; i < 10 ; i++) { channel.basicPublish(normalExchangeName, normalKey,basicProperties, "测试消息积压转为死信队列" .getBytes()); } } }
集群搭建 参考如下
https://blog.csdn.net/u012702547/article/details/121890003