分布式-MQ-02RabbitMQ高级特性和集群

RabbitMQ高级特性和集群

如何保障消息的可靠性投递

如何保障消息的百分百投递成功

什么是生产端的可靠性投递

1、保障把消息成功发送出去

2、保障MQ服务成功接收到消息

3、消息发送者收到MQ服务的确认应答

4、完善的消息补偿推送机制

方案一:同步落消息表 + 定时补偿 + 人工补偿

image-20220518091304978

在消息生产者端(也就是订单服务)

正常链路流程

第一步(该环节调用了操作了二次数据库):在创建订单的操作的时候,把数据插入到订单相关的表中,并且构造调

用物流模块的数据消息,把消息插入到消息表中,初始状态为0

第二步: 把物流消息投递到消息队列中,

第三步;消息队列返回一个确认消息,并且由订单服务来监控mq server的确认消息

第四步:根据收到的确认消息来更新数据库中的消息记录的状态

异常链路流程

第一步(该环节调用了操作了二次数据库):在创建订单的操作的时候,把数据插入到订单相关的表中,并且构造调

用物流模块的数据消息,把消息插入到消息表中,初始状态为0

第二步: 把物流消息投递到消息队列中,

第三步:由于网络闪断,导致消费端监控mq服务访问的确认消息没有收到,那么在msg_db中的那条消息的

状态永远就是0状态。这个时候,我们需要对这种情况下做出补偿

补偿机制:

​ 启动一个分布式的定时任务,不定时的去扫描msg_db的这个表,状态为0的消息记录,在这里我们可以根据

业务来设置扫描重发规则规则1:插入msg_db 表中5Min后状态还是为0的记录,进行消息重试

规则2:若重试的次数超过五次状态还是为0的话,我们就把消息状态改为2,此时我们需要人工的去确认状态

为2的消息是什么原因导致没有成功的

消息入库打标的缺点:

在第一步的过程中,既插入了业务数据表,也同时插入了消息记录表,进行了二次db操作,在高并发的环

境下,这个环境就会造成性能瓶颈

代码实现

image-20220519222745273

1、消息状态

image-20220519222352598

2、消息结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MessageContent {

private String msgId;

private long orderNo;

private Date createTime;

private Date updateTime;

private Integer msgStatus;

private String exchange;

private String routingKey;

private String errCause;

private Integer maxRetry;

private Integer currentRetry=0;

private Integer productNo;
}

设置响应和回退监听处理

image-20220519232158671

ConfirmCallback

image-20220519232411268

ReturnCallback

image-20220519232559024

方案二:延时消息确认 + 回调检查消息状态补偿

image-20220518091431630

优点:性能较好

缺点:流程复杂,且没有重试上限机制,缺少人工干预补偿

幂等性以及消息的幂等性

什么是接口的幂等性?

​ 简而言之,就是对接口发起的一次调用和多次调用,所产生的结果都是 一致的。

接口不幂等的后果

​ 案例一:比如订单提交的过程中,用户点了一次提交,但是由于网络等原因,导致后端处理延时,客户就连续点击了多次,在没有幂等性的条件下,那么就会造成订单的重复提交。

​ 解决方案:在保存订单的时候,根据生成的系统全局唯一ID(这里订单号+业务类型),并且把该唯一ID 调用 redis 的setnx命令保存起来,在第一次保存的时候,由于redis中没有该key,那么就会 把全局唯一ID 进行设置上,此时订单就会保存成功,。这个时候若出现前端重复点击按钮, 由于第一步已经 setnx上了 就会阻止后面的保存。

如何保证接口幂等

MQ 是如何解决幂等性的

MQ发送消息的流程

image-20220520092020713

第一步:消息生产者向Mq服务端发送消息
第二步:mq 服务端把消息进行落地
第三步:消息服务端向消息生产者发送ack
第四步;消息消费者消费消息
第五步:消费者发送ack
第六步: mq服务将落地消息删除

消息重复发送的原因

​ 为了保障消息的百分之百的投递,我们使用了消息重发,确认机制,使得消息可能被重复发送,由上图可知 道,由于网络原因,第三步的上半场ack丢失还是第五步的下半场ack丢失 都会导致消息重复发送

1
2
3
4
5
6
7
8
1、生产者已把消息发送到mq,在mq给生产者返回ack的时候网络中断,故生产者未收到确
定信息,生产者认为消息未发送成功,但实际情况是,mq已成功接收到了消息,在网络重
连后,生产者会重新发送刚才的消息,造成mq接收了重复的消息

2、消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网
络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再
次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消
息;

消息重复发送的导致后果

​ 上半场消息生产者是用户支付模块,专门是用来给用户扣费的,而下半场的消息消费者服务是会员卡服务, 是通过接受扣费服务发送的消息来进行发卡的, 由于第三步或者是第五步ack丢失,那么就会导致上游服务重复发送消息就会导致扣一次款,发多次卡

服务端是如何保证幂等性的

​ 消息队列的服务中,对每一条消息都会生成一个全局唯一的与业务无关的ID(inner_msg_id),当mq_server 接受到消息的时候,先根据inner_msg_id 是否需要重复发送,再决定消息是否落DB ,这样保证每条消息都 只会落一次DB

消费端如何来做到幂等性的

目前主流做法是 直接避免代码重复消费代码支持重复消费,业务上幂等

1、唯一业务ID

​ 代码接受重复消息,但还是业务上结果是幂等的:如果有插入数据库的业务,根据唯一业务id获取分布式锁,进行插入或者更新(重复消费时)
2、(唯一业务ID+(辅助标志位(UUID、时间戳等是业务场景而定))or 唯一消息id

​ 消费时redis setnx成功则继续消费,失败则视为重复消费,跳过消费返回ack

消息的confirm机制

mq 的confirm机制

1:消息的确认:指的是生产者将消息投递后,如何mq-server接受到消息,就会给生产者一个应答.

2:生产者接受到应答,来确保该条消息是否成功发送到了mq-server

3:confirm机制是消息可靠性投递的核心保障

mq的confirm机制的核心流程图

image-20220521145848720

confirm机制的现实步骤

第一步:在channel 上开启确认模式 channel.confirmSelect();

第二步:在channel上添加监听,用来监听mq-server返回的应答

代码演示

1
2
3
4
5
6
7
8
9
10
11
12
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息deliveryTag"+deliveryTag+"被正常签收");
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息deliveryTag"+deliveryTag+"没被签收");
}
});

消息return机制

Return Listener是用来处理一些不可路由的消息

生产者发送消息到broker时

情况一:broker中根本没有对应的exchange交换机来接受该消息

情况二:消息能够投递到broker的交换机上,但是交换机根据routingKey 路由不到某一个队列上

处理一;若在消息生产端 的mandatory设置为true 那么就会调用生产端ReturnListener 来处理

处理二;若消息生产端的mandatory设置为false(默认值也是false) 那么mq-broker就会自动删除消息

image-20220521151501989

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("记录不可达消息........................");
System.out.println("replaycode="+replyCode);
System.out.println("replyText="+replyText);
System.out.println("exchange="+exchange);
System.out.println("routingKey="+routingKey);
System.out.println("properties="+properties);
System.out.println("body="+new String(body));
//处理消息表消息状态=发送失败
}
});

//5:通过channel发送消息
for (int i = 0; i < 5; i++) {
String message = "hello--" + i;
//第三个参数是 mandatory
channel.basicPublish("directchange", "directchange.key",true, basicProperties, message.getBytes());
}

消费端如何做限流量

什么是消费端的限流

场景:首先,我们迎来了订单的高峰期,在mq的broker上堆积了成千上万条消息没有处理,这个时候,我们随便打开了 消费者,就会出现下面请 如此多的消息瞬间推送给消费者,我们的消费者不能处理这么多消息 就会导致消费者出现巨大压力,甚至服务器崩溃

解决方案

rabbitmq 提供一个钟qos(服务质量保证),也就是在关闭了消费端的自动ack的前提 下,我们可以设置阈值(出队)的消息数没有被确认(手动确认),那么就不会推送 消息过来. 限流的级别(consumer级别或者是channel级别)

实现的方式 void BasicQos(uint prefetchSize,ushort prefetchCount ,bool global) uint prefetchSize :指定的是设定消息的大小(rabbitmq还没有该功能,所以一般是填写0表示不限制) ushort perfetchCount :表示设置消息的阈值,每次过来几条消息(一般是填写1 一条 一条的处理消息) bool global:表示是channel级别的还是 consumer的限制(channel的限制rabbitmq 还没有该功能)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
消费端
//gloabl设置为ture 那么就是channel级别的限流,若为false 就是consumer级别的限制流量
channel.basicQos(0,1,false);
/**
* 关闭自动签收
*/
channel.basicConsume(queueName,false,new AngleCustomConsumer(channel));

public class AngleCustomConsumer extends DefaultConsumer {
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
private Channel channel;
public AngleCustomConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("自定义的消息消费端");
System.out.println("consumerTag="+consumerTag);
System.out.println("envelope="+envelope);
System.out.println("properties="+properties);
System.out.println("body="+new String(body));
//消费端的手动签收,假如关闭手动签收,也关闭自动签收,那么消费端只会接收到一条消息
//channel.basicAck(envelope.getDeliveryTag(),false);
}
}

消费端的ack

消费端的ack模式

消费端的ack类型:自动ack手动ack 做消息限流的时候,我们需要关闭自动ack 然后进行手动ack的确认,若我们业务出现了问题,我们就可以进行nack

手动ack

1
channel.basicAck(envelope.getDeliveryTag(),false);

重回队列

当消费端进行了nack的操作的时候,我们可以通过设置来进行对消息的重回队列的操作(但是一般我们不会设置重回队列的操作,而是通过告警系统然后人工补偿等手段补偿。因为一般而言,第一次消费报错如果放回队列,还是会报错的,放回队列会导致死循环)

手动nack

1
channel.basicNack(envelope.getDeliveryTag(),false,false);

死信队列&死信交换机

死信队列DLX(Dead-leater-exchange)

什么是死信

就是在队列中的消息如果没有消费者消费,那么该消息就成为一个死信,那这个消息被重新发送到另外一个exchange上的话, 那么后面这个exhcange就是死信队列

消息变成死信的几种情况

消息被拒绝:(basic.reject/basic.nack)并且requeue(重回队列)的属性设置为 false 表示不需要重回队列,那么该消息就是一个死信消息

消息TTL过期:消息本身设置了过期时间,或者队列设置了消息过期时间x-message-ttl

队列达到最大长度:比如队列最大长度是3000 ,那么3001消息就会被送到死信队列上.

如何设置死信队列

绑定正常队列时

1、声明正常交换机、正常队列、死信队列交换、死信队列

2、声明正常队列时,设置argument:x-dead-letter-exchange属性值=死信队列交换机

3、正常交换机绑定正常队列、死信交换机绑定死信队列

示例代码

消费者

1、设置队列最大长度=5

1
2
3
4
5
Map<String,Object> argurments= new HashMap<>() ;
//设置正常队列中的死信发往哪个队列
//限制队列存储长度
argurments.put("x-max-length", 5);
channel.queueDeclare(normalQueueName,true,false,false,argurments);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class DLX_CustomConsumer extends DefaultConsumer {
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
private Channel channel;
public DLX_CustomConsumer(Channel channel) {
super(channel);
this.channel = channel;
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("为了测试死信队列,我们进行nack");
//把消息变为死信 通过nack 且requeue不进行重新发送
channel.basicNack(envelope.getDeliveryTag(),false,false);
}
}

public class DLX_Consumer {

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接
Connection connection = ConnectionUtils.getConnection();
//创建一个channel
Channel channel = connection.createChannel();
String normalExchangeName = "normal_exchange";
String normalKey = "normaldlx.key";
String normalQueueName = "normal_queue";
String exchangeType = "topic";

String dlxExchangeName = "dlx_exchange";
String dlxQueueName = "dlx_queue";

//声明一个正常的业务队列
channel.exchangeDeclare(normalExchangeName,exchangeType,true,false,null);
Map<String,Object> argurments= new HashMap<>() ;
//设置正常队列中的死信发往哪个队列
argurments.put("x-dead-letter-exchange", dlxExchangeName);
//限制队列存储长度
argurments.put("x-max-length", 5);
channel.queueDeclare(normalQueueName,true,false,false,argurments);
channel.queueBind(normalQueueName,normalExchangeName,normalKey);

//声明死信队列
channel.exchangeDeclare(dlxExchangeName,exchangeType,true,false,false,null);
channel.queueDeclare(dlxQueueName,true,false,false,null);
channel.queueBind(dlxQueueName,dlxExchangeName,"#");
channel.basicConsume(normalQueueName,false,new DLX_CustomConsumer(channel));
}

}

生产者

1、设置消息过期时间

AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration(“10000”)//消息过期 10s

2、一次性发送10条消息(队列限制5条,其余5条被发至死信队列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class RabbitmqMessageProducter {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

String normalExchangeName = "normal_exchange";
String normalKey = "normaldlx.key";
//3:通过连接工厂创建连接对象
Connection connection = ConnectionUtils.getConnection();
//4:通过连接创建channel
Channel channel = connection.createChannel();

//5:通过channel发送消息
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
//.expiration("10000")//消息过期 10s
.contentEncoding("utf-8")
.correlationId(UUID.randomUUID().toString())
.build();
//channel.basicPublish(normalExchangeName, normalKey,basicProperties, "测试消息被拒绝转为死信队列".getBytes());
channel.basicPublish(normalExchangeName, normalKey,basicProperties, "测试消息过期转为死信队列".getBytes());
for (int i = 0; i < 10; i++) {
channel.basicPublish(normalExchangeName, normalKey,basicProperties, "测试消息积压转为死信队列".getBytes());
}

}
}

集群搭建

参考如下

https://blog.csdn.net/u012702547/article/details/121890003