分布式-MQ-03RabbitMQ整合SpringBoot&集群搭建

RabbitMQ整合SpringBoot&集群搭建

RabbitMQ整合SpringBoot

Producer

1、配置文件 application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring.rabbitmq.host=192.168.136.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#开启消息确认模式
spring.rabbitmq.publisher-confirms=true
#开启消息可达监听
spring.rabbitmq.publisher-returns=true
#开启不可达消息不会被broker给删除
spring.rabbitmq.template.mandatory=true
#设置连接超时
spring.rabbitmq.connection-timeout=10000

2、设置RabbitMQ的交换机、队列、绑定关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
public class RabbitMqConfig {
@Bean
public DirectExchange testBootDirectExchange() {
DirectExchange directExchange = new DirectExchange("springboot.direct.exchange",true,false);
return directExchange;
}
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayExchange", "x-delayed-message",true, false,args);
}
@Bean
public Queue testBootQueue() {
Queue queue = new Queue("testBootQueue",true,false,false);
return queue;
}
@Bean
public Queue testClusterQueue() {
Queue queue = new Queue("testClusterQueue",true,false,false);
return queue;
}
@Bean
public Queue testBootDelayQueue() {
Queue queue = new Queue("testBootDelayQueue",true,false,false);
return queue;
}
@Bean
public Binding testBootBinder() {
return BindingBuilder.bind(testBootQueue()).to(testBootDirectExchange()).with("springboot.key");
}
@Bean
public Binding testClusterBinder() {
return BindingBuilder.bind(testClusterQueue()).to(testBootDirectExchange()).with("rabbitmq.cluster.key");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(testBootDelayQueue()).to(delayExchange()).with("springboot.delay.key").noargs();
}
}

3、声明消息发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Component
public class TestMsgSender{
@Autowired
private RabbitTemplate rabbitTemplate;


/**
* 测试发送我们的消息
* @param msg 消息内容
* @param msgProp 消息属性
*/
public void sendMsg(Object msg, Map<String,Object> msgProp) {

MessageHeaders messageHeaders = new MessageHeaders(msgProp);

//构建消息对象
Message message = MessageBuilder.createMessage(msg,messageHeaders);

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//构建correlationData 用于做可靠性投递得,ID:必须为全局唯一的 根据业务规则
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//开启确认模式
rabbitTemplate.setConfirmCallback(new MsgConfirmCallBack());

//开启消息可达监听
rabbitTemplate.setReturnCallback(new MsgRetrunCallBack());

rabbitTemplate.convertAndSend("springboot.direct.exchange","springboot.key",message,correlationData);

rabbitTemplate.convertAndSend("springboot.direct.exchange","springboot.key2",message,correlationData);

}
}

@Slf4j
public class MsgConfirmCallBack implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("correlationData:========>{}",correlationData.getId());
log.info("ack:================{}",ack);
if(ack) {
log.info("mq生产端消息已经成功投递到了broker,更新我们消息日志表");
}else {
log.warn("mq生产端没有被broker ack,原因:{}",cause);
}
}
}


/**
* 消息不可达监听
*/
@Slf4j
public class MsgRetrunCallBack implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

log.warn("correlationId:{}",message.getMessageProperties().getCorrelationId());
log.warn("replyText:{}",replyText);
log.warn("消息内容:{}",new String(message.getBody()));
log.warn("replyCode:{}",replyCode);
log.warn("交换机:{}",exchange);
log.warn("routingKey:{}",routingKey);
log.info("需要更新数据库日志表得消息记录为不可达");
}
}

4、测试消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void sendMsg() {
Map<String,Object> msgProp = new HashMap<>();
msgProp.put("company","testCompany");
msgProp.put("name","testName");
/* String msgBody = "hello tuling ";*/
Order order = new Order();
order.setOrderNo(UUID.randomUUID().toString());
order.setUserName("test");
order.setPayMoney(10000.00);
order.setCreateDt(new Date());
testMsgSender.sendMsg(JSON.toJSONString(order),msgProp);
}

image-20220527091547220

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
server.port=8888
spring.rabbitmq.host=192.168.136.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=1
spring.rabbitmq.listener.simple.prefetch=5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
@Slf4j
public class TestMsgConsumer {

@RabbitListener(queues = {"testBootQueue"})
public void consumerMsg(Message message, Channel channel) throws IOException {

log.info("消费消息:{}", JSON.parseObject(new String(message.getBody()),Order.class));
//手工签收
Long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("接受deliveryTag:{}",deliveryTag);
channel.basicAck(deliveryTag,false);
}
}

可以处理的消息类型

​ java对象、json、String、图片、excel文件等

延时队列的实现

下载启动延时组件

wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

解压延时插件: unzip

rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

把延时插件拷贝到指定目录下:cp rabbitmq_delayed_message_exchange-20171215-3.6.x.ez

/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.5/plugins

启动延时插件: rabbitmq-plugins enable rabbitmq_delayed_message_exchange

声明延时队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayExchange", "x-delayed-message",true, false,args);
}

@Bean
public Queue testBootDelayQueue() {
Queue queue = new Queue("testBootDelayQueue",true,false,false);
return queue;
}

@Bean
public Binding binding() {
return BindingBuilder.bind(testBootDelayQueue()).to(delayExchange()).with("springboot.delay.key").noargs();
}

发送延时消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 发送延时消息
* @param msgTxtBo
*/
public void senderDelayCheckMsg(MsgTxtBo msgTxtBo) {

log.info("发送的消息ID:{}",msgTxtBo.getOrderNo());
//表示为延时消息
CorrelationData correlationData = new CorrelationData(msgTxtBo.getMsgId()+"_"+msgTxtBo.getOrderNo()+"_delay");
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend("delayExchange", "springboot.delay.key", msgTxtBo, new MessagePostProcessor() {
@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 30000);//设置延迟时间
return message;
}
},correlationData);

}

可靠消息方案案例

消息成功投递 + 消费幂等

分布式事务解决方案(消息可靠性投递之定时任务版本)_00

相关实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
DROP TABLE IF EXISTS `message_content`;
CREATE TABLE `message_content` (
`msg_id` varchar(50) NOT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`msg_status` int(10) DEFAULT NULL COMMENT '(0,"发送中"),(1,"mq的broker确认接受到消息"),(2,"没有对应交换机"),(3,"没有对应的路由"),(4,"消费端成功消费消息")',
`exchange` varchar(50) DEFAULT NULL,
`routing_key` varchar(50) DEFAULT NULL,
`err_cause` varchar(1000) DEFAULT NULL,
`order_no` bigint(32) DEFAULT NULL,
`max_retry` int(10) DEFAULT NULL,
`current_retry` int(10) DEFAULT NULL,
`product_no` int(10) DEFAULT NULL,
PRIMARY KEY (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
`order_no` bigint(32) NOT NULL AUTO_INCREMENT,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`user_name` varchar(50) DEFAULT NULL,
`money` double(10,2) DEFAULT NULL,
`product_no` int(10) DEFAULT NULL,
`order_status` int(10) DEFAULT NULL,
PRIMARY KEY (`order_no`)
) ENGINE=InnoDB AUTO_INCREMENT DEFAULT CHARSET=utf8;

DROP TABLE IF EXISTS `product_info`;
CREATE TABLE `product_info` (
`product_no` int(32) NOT NULL,
`product_name` varchar(50) DEFAULT NULL,
`product_num` int(10) DEFAULT NULL,
PRIMARY KEY (`product_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `product_info` VALUES ('1', '华为meta30', '61');
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@Data
public class MessageContent {

private String msgId;

private long orderNo;

private Date createTime;

private Date updateTime;

private Integer msgStatus;

private String exchange;

private String routingKey;

private String errCause;

private Integer maxRetry;

private Integer currentRetry=0;

private Integer productNo;
}
@ToString
@Getter
@Setter
public class OrderInfo {

private long orderNo;

private Date createTime;

private Date updateTime;

private String userName;

private double money;

private Integer productNo;
}

@Data
public class ProductInfo {

private Integer productNo;

private String productName;

private String productNum;
}

@Getter
public enum MsgStatusEnum {

SENDING(0,"发送中"),

SENDING_SUCCESS(1,"消息发送成功"),

SENDING_FAIL(2,"消息发送失败"),

CONSUMER_SUCCESS(3,"消费成功"),

CONSUMER_FAIL(4,"消费失败");

private Integer code;

private String msgStatus;

MsgStatusEnum(Integer code, String msgStatus) {
this.code = code;
this.msgStatus = msgStatus;
}
}

@Data
public class MsgTxtBo implements Serializable {

private long orderNo;

private int productNo;

private String msgId;
}

消息可靠性投递基于定时任务解决方案(消息生产端)

1、application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#数据源配置
spring:
datasource:
druid:
username: root
password: 123456
jdbcUrl: jdbc:mysql://localhost:3306/rabbitmq?serverTimezone=Asia/Shanghai

driverClassName: com.mysql.jdbc.Driver
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
filters: stat,wall
maxPoolPreparedStatementPerConnectionSize: 20
useGlobalDataSourceStat: true
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500
#rabbitmq生产端配置
rabbitmq:
host: 192.168.136.1
port: 5672
virtual-host: /
username: guest
password: guest
publisher-confirms: true
publisher-returns: true
#防止不可达消息被删除
template:
mandatory: true
connection-timeout: 1000000
#配置mybatis
mybatis:
mapper-locations: classpath:mybatis/mapper/*.xml
configuration:
map-underscore-to-camel-case: true
logging:
level:
com.study.mapper: debug

2、MQ绑定关系配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class RabbitmqConfig {

@Bean
public DirectExchange orderToProductExchange() {
DirectExchange directExchange = new DirectExchange(MqConst.ORDER_TO_PRODUCT_EXCHANGE_NAME,true,false);
return directExchange;
}

@Bean
public Queue orderToProductQueue() {
Queue queue = new Queue(MqConst.ORDER_TO_PRODUCT_QUEUE_NAME,true,false,false);
return queue;
}

@Bean
public Binding orderToProductBinding() {
return BindingBuilder.bind(orderToProductQueue()).to(orderToProductExchange()).with(MqConst.ORDER_TO_PRODUCT_ROUTING_KEY);
}
}

3、生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
@Slf4j
public class MsgSender implements InitializingBean {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private MsgComfirm msgComfirm;

@Autowired
private MsgRetrunListener msgRetrunListener;


/**
* 方法实现说明:真正的发送消息
* @author:smlz
* @param msgTxtBo:发送的消息对象
* @return:
* @exception:
* @date:2019/10/11 20:01
*/
public void senderMsg(MsgTxtBo msgTxtBo){

log.info("发送的消息ID:{}",msgTxtBo.getMsgId());

CorrelationData correlationData = new CorrelationData(msgTxtBo.getMsgId());

rabbitTemplate.convertAndSend(MqConst.ORDER_TO_PRODUCT_EXCHANGE_NAME,MqConst.ORDER_TO_PRODUCT_ROUTING_KEY,msgTxtBo,correlationData);
}

@Override
public void afterPropertiesSet() throws Exception {
rabbitTemplate.setConfirmCallback(msgComfirm);
rabbitTemplate.setReturnCallback(msgRetrunListener);
//设置消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
}
}

4、消息确认回调监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Component
@Slf4j
public class MsgComfirm implements RabbitTemplate.ConfirmCallback{

@Autowired
private MsgContentMapper msgContentMapper;

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String msgId = correlationData.getId();

if(ack) {
log.info("消息Id:{}对应的消息被broker签收成功",msgId);
updateMsgStatusWithAck(msgId);
}else{
log.warn("消息Id:{}对应的消息被broker签收失败:{}",msgId,cause);
updateMsgStatusWithNack(msgId,cause);
}
}

/**
* 方法实现说明:更新消息表状态为
* @author:smlz
* @param msgId:消息ID
* @exception:
* @date:2019/10/11 18:01
*/
private void updateMsgStatusWithAck(String msgId) {
MessageContent messageContent = builderUpdateContent(msgId);
messageContent.setMsgStatus(MsgStatusEnum.SENDING_SUCCESS.getCode());
msgContentMapper.updateMsgStatus(messageContent);
}

private void updateMsgStatusWithNack(String msgId,String cause){

MessageContent messageContent = builderUpdateContent(msgId);

messageContent.setMsgStatus(MsgStatusEnum.SENDING_FAIL.getCode());
messageContent.setErrCause(cause);
msgContentMapper.updateMsgStatus(messageContent);
}

private MessageContent builderUpdateContent(String msgId) {
MessageContent messageContent = new MessageContent();
messageContent.setMsgId(msgId);
messageContent.setUpdateTime(new Date());
return messageContent;
}
}

5、消息拒绝返回监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
@Slf4j
public class MsgRetrunListener implements RabbitTemplate.ReturnCallback {

@Autowired
private MsgContentMapper msgContentMapper;

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
try{
ObjectMapper objectMapper = new ObjectMapper();
MsgTxtBo msgTxtBo = objectMapper.readValue(message.getBody(),MsgTxtBo.class);
log.info("无法路由消息内容:{},cause:{}",msgTxtBo,replyText);

//构建消息对象
MessageContent messageContent = new MessageContent();
messageContent.setErrCause(replyText);
messageContent.setUpdateTime(new Date());
messageContent.setMsgStatus(MsgStatusEnum.SENDING_FAIL.getCode());
messageContent.setMsgId(msgTxtBo.getMsgId());
//更新消息表
msgContentMapper.updateMsgStatus(messageContent);
}catch (Exception e) {
log.error("更新消息表异常:{}",e);
}
}
}

6、生成消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RestController
public class OrderController {
@Autowired
private IOrderInfoService orderInfoService;
@RequestMapping("/saveOrder")
public String saveOrder() throws JsonProcessingException {

OrderInfo orderInfo = new OrderInfo();
orderInfo.setOrderNo(System.currentTimeMillis());
orderInfo.setCreateTime(new Date());
orderInfo.setUpdateTime(new Date());
orderInfo.setUserName("handsome boy");
orderInfo.setMoney(10000);
orderInfo.setProductNo(1);
orderInfoService.saveOrderInfoWithMessage(orderInfo);
return "ok";
}
}

7、定时任务扫描重试推送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
@Slf4j
public class RetryMsgTask {
@Autowired
private MsgSender msgSender;
@Autowired
private MsgContentMapper msgContentMapper;
/**
* 延时5s启动
* 周期10S一次
*/
@Scheduled(initialDelay = 10000,fixedDelay = 10000)
public void retrySend() {
System.out.println("-----------------------------");
//查询创建时间超过30s && 状态 != 消费成功 && 重试次数没超过限制的 消息
List<MessageContent> messageContentList = msgContentMapper.qryNeedRetryMsg(MsgStatusEnum.CONSUMER_SUCCESS.getCode(), MqConst.TIME_DIFF);
for(MessageContent messageContent:messageContentList) {
if(messageContent.getMaxRetry()>messageContent.getCurrentRetry()) {
MsgTxtBo msgTxtBo = new MsgTxtBo();
msgTxtBo.setMsgId(messageContent.getMsgId());
msgTxtBo.setProductNo(messageContent.getProductNo());
msgTxtBo.setOrderNo(messageContent.getOrderNo());
//更新消息重试次数
msgContentMapper.updateMsgRetryCount(msgTxtBo.getMsgId());
msgSender.senderMsg(msgTxtBo);
}else {
log.warn("消息:{}以及达到最大重试次数",messageContent);
}
}
}
}

消费者幂等消费

1、application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
spring:
datasource:
druid:
username: root
password: 123456
jdbcUrl: jdbc:mysql://localhost:3306/rabbitmq?serverTimezone=Asia/Shanghai
driverClassName: com.mysql.jdbc.Driver
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
filters: stat,wall
maxPoolPreparedStatementPerConnectionSize: 20
useGlobalDataSourceStat: true
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500
rabbitmq:
host: 192.168.136.1
port: 5672
virtual-host: /
username: guest
password: guest
listener:
simple:
concurrency: 5
max-concurrency: 10
acknowledge-mode: manual
prefetch: 1
default-requeue-rejected: false
redis:
host: 127.0.0.1
port: 6379



mybatis:
mapper-locations: classpath:mybatis/mapper/*.xml
configuration:
map-underscore-to-camel-case: true
logging:
level:
com.tuling.mapper: debug
server:
port: 8888

2、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@Component
@Slf4j
public class MqConsumer {
/**队列名称*/
public static final String ORDER_TO_PRODUCT_QUEUE_NAME = "order-to-product.queue";

public static final String LOCK_KEY="LOCK_KEY";

@Autowired
private IProductService productService;

@Autowired
private MsgContentMapper msgContentMapper;

@Autowired
private RedisTemplate redisTemplate;

@RabbitListener(queues = {ORDER_TO_PRODUCT_QUEUE_NAME})
@RabbitHandler
public void consumerMsgWithLock(Message message, Channel channel) throws IOException {

ObjectMapper objectMapper = new ObjectMapper();
MsgTxtBo msgTxtBo = objectMapper.readValue(message.getBody(), MsgTxtBo.class);
Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();

if (redisTemplate.opsForValue().setIfAbsent(LOCK_KEY + msgTxtBo.getMsgId(), msgTxtBo.getMsgId())) {
log.info("消费消息:{}", msgTxtBo);
try {
//更新消息表也业务表
productService.updateProductStore(msgTxtBo);
//消息签收
//模拟网络异常
//System.out.println(1/0);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
/**
* 更新数据库异常说明业务没有操作成功需要删除分布式锁
*/
if (e instanceof BizExp) {
BizExp bizExp = (BizExp) e;
log.info("数据业务异常:{},即将删除分布式锁", bizExp.getErrMsg());
//删除分布式锁
redisTemplate.delete(LOCK_KEY);
}

//更新消息表状态
MessageContent messageContent = new MessageContent();
messageContent.setMsgStatus(MsgStatusEnum.CONSUMER_FAIL.getCode());
messageContent.setUpdateTime(new Date());
messageContent.setErrCause(e.getMessage());
messageContent.setMsgId(msgTxtBo.getMsgId());
msgContentMapper.updateMsgStatus(messageContent);
channel.basicReject(deliveryTag,false);
}

} else {
log.warn("请不要重复消费消息{}", msgTxtBo);
channel.basicReject(deliveryTag,false);
}

}
}

@Service
@Slf4j
public class ProductServiceImpl implements IProductService {

@Autowired
private ProductInfoMapper productInfoMapper;

@Autowired
private MsgContentMapper msgContentMapper;

@Transactional
@Override
public boolean updateProductStore(MsgTxtBo msgTxtBo) {
boolean updateFlag = true;
try{
//更新库存
productInfoMapper.updateProductStoreById(msgTxtBo.getProductNo());

//更新消息表状态
MessageContent messageContent = new MessageContent();
messageContent.setMsgId(msgTxtBo.getMsgId());
messageContent.setUpdateTime(new Date());
messageContent.setMsgStatus(MsgStatusEnum.CONSUMER_SUCCESS.getCode());
msgContentMapper.updateMsgStatus(messageContent);
//模拟业务异常
//System.out.println(1/0);
}catch (Exception e) {
log.error("更新数据库失败:{}",e);
throw new BizExp(0,"更新数据库异常");
}
return updateFlag;
}
}

消费者的几种写法

目前主流做法是 直接避免代码重复消费代码支持重复消费,业务上幂等

1、唯一业务ID

​ 代码接受重复消息,但还是业务上结果是幂等的:如果有插入数据库的业务,根据唯一业务id获取分布式锁,进行插入或者更新(重复消费时)

2、(唯一业务ID +(辅助标志位(UUID、时间戳等是业务场景而定))or 唯一消息id

​ 消费时redis setnx成功则继续消费,失败则视为重复消费,跳过消费返回ack

3、消费者模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@Slf4j
public abstract class BaseMessageListener<T> implements ChannelAwareMessageListener {

private RedisTemplate redisTemplate;

private static final String CHARSET = "UTF-8";

/**
* 消息处理器
*
* @param mqParam 接收到MQ消息
* @return 业务处理是否成功
* @throws Exception 抛出业务处理异常 保存数据库
*/
public abstract boolean handleMessage(T mqParam) throws Exception;

/**
* 提供给子类进行异常日志打印的方法,默认
* 子类可以根据自己选择进行重写
*
* @param e 异常
*/
public void logErrorMsg(Exception e) {
log.error("Listener:{} MQ消费异常:", beanName, e);
}

/**
* 获得当前类的bean名称,用来保存错误信息
*/
@Getter
private String beanName;

private Class<T> clazz;

private boolean singleMqListener;

private Channel channel;

private Message message;


@PostConstruct
@SuppressWarnings("unchecked")
public void init() {
Class<? extends BaseMessageListener> aClass = this.getClass();
Component annotation = aClass.getAnnotation(Component.class);
if (null == annotation) {
throw new SystemException("必须手动设置@Component注解,并设置beanName");
}
String value = annotation.value();
if (StrUtil.isBlank(value)) {
throw new SystemException("必须手动设置beanName");
}
beanName = value;
// clazz = (Class<T>) ((ParameterizedType) aClass.getGenericSuperclass()).getActualTypeArguments()[0];
Type genType = aClass.getGenericSuperclass();
if (genType instanceof ParameterizedType) {
Type[] params = ((ParameterizedType) genType).getActualTypeArguments();
Type param = params[0];
if (param instanceof Class) {
clazz = (Class<T>) param;
} else if ((param instanceof ParameterizedTypeImpl)) {
ParameterizedTypeImpl paramImpl = (ParameterizedTypeImpl) param;
if (paramImpl.getRawType() != null) {
clazz = (Class<T>) paramImpl.getRawType();
}
}
}
if (null == clazz) {
throw new SystemException("初始化BaseMessageListener时出现未知错误");
}

SingleMqListener singleMqAnnotation = aClass.getAnnotation(SingleMqListener.class);
singleMqListener = singleMqAnnotation != null;
}


@Override
public void onMessage(Message message, Channel channel) throws Exception {
this.channel = channel;
this.message = message;

boolean handleSucceed = false;
try {
// 判断是否单线程mq,如果是单线程mq,需要获取分布式锁,没有获得锁,将消息退回给mq
if (singleMqListener) {
//redisTemplate.tryMacLock(getMacLockName(), TIME_10_SECOND, TIME_1_SECOND);
//获取分布式锁
boolean tryMacLock = true;
if (!tryMacLock) {
//放回队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
return;
}
}
//转换消息
T mqParam = this.convertMsg(message, CHARSET);
handleSucceed = handleMessage(mqParam);

}catch(ConcurrentHandlerException e){
log.error("Listener:{} MQ捕捉到并发异常:{}", beanName, e);
basicReject();
return;
}catch (ValidateException e) {
log.warn("Listener:{} MQ捕捉到验证异常:{}", beanName, e);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
return;
}catch (Exception e) {
logErrorMsg(e);
handleErrorMsg(message, channel, e.getMessage(), CHARSET);
return;
} finally {
if (singleMqListener) {
//释放分布式锁
}
}

if (handleSucceed) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}

}
/**
* mq输入参数转型
*
* @param message 消息体
* @param charSet 字符集
* @return 参数
* @throws Exception 消息转换异常
*/
private T convertMsg(Message message, String charSet) throws Exception {
String body = new String(message.getBody(), charSet);
if(StrUtil.isEmpty(body.trim())) {
throw new ValidateException("非业务报错。MQ 接收到的数据为空格");
}
log.info("Listener:{} MQ:{}", beanName, body);
JSON parse = (JSON) JSONObject.parse(body);
return parse.toJavaObject(clazz);
}

/**
* 记录异常信息, 没有保存成功的时候,退回queue。让其循环报错
*
* @param errorMsg 错误信息
*/
private void handleErrorMsg(Message message, Channel channel, String errorMsg, String charSet) throws IOException {
if (null == message) {
log.info("接收到空信息,不保存错误信息");
return;
}
if (null == message.getMessageProperties()) {
log.info("接收到空信息,不保存错误信息");
return;
}
String jsonString = new String(message.getBody(), charSet);
String msgClassName = clazz.getName();
try {
JSON parse = (JSON) JSONObject.parse(jsonString);
//errorMsgService.saveErrorMsg(parse, msgClassName, beanName, errorMsg);
//记录消息状态
} catch (Exception e) {
log.error("MQ异常信息保存数据库失败", e);
}
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

}

protected void basicReject() throws IOException{
if(channel == null || message == null){
throw new SystemException("手工抛回队列异常,必要数据为空");
}
this.channel.basicReject(this.message.getMessageProperties().getDeliveryTag(), true);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Bean(name = "XXXContainer")
public SimpleMessageListenerContainer XXXMessageListenerContainer(
@Qualifier("XXXConnectionFactory") ConnectionFactory connectionFactory,
@Qualifier("XXXListener") ChannelAwareMessageListener cuttingBedChangeListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(new Queue(Constant.MES_PIMS_CUTTING_BED_QUEUE, true));
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(cuttingBedChangeListener);
container.setAutoStartup(false);
return container;
}

public class XXXListener extends BaseMessageListener<XXXDTO>{
@Override
public boolean handleMessage(XXXDTO dto) throws Exception{
//业务逻辑
}
}