分布式-MQ-01RabbitMQ安装以及入门

RabbitMQ安装以及入门

RabbitMQ简介

1、初识RabbitMQ

​ rabbitmq是一个开源的消息代理和队列服务器,通过普通的协议(Amqp协议)来完成不同应用之间的数据共享(消费生产和消费者可以跨语言平台) rabbitmq是通过elang语言来开发的基于amqp协议

2、选择RabbitMQ的原因

1、图形化界面完善

2、Elang语言开发,跨平台不局限于JAVA

3、提供消息确认机制(发送方确认,消费方确认),死信队列、延迟消息等

4、开源、社区活跃、稳定

3、什么是AMQP协议(Advanced message queue protocol) 高级消息队列协议

server :又称为broker,接受客户端连接,实现amqp实体服务

Connection: 连接,应用程序与brokder建立网络连接

channel:网络通道,几乎所有的操作都是在channel中进行的,是进行消息对象的通道,客户端可以建立 多个通道,每一个channel表示一个会话任务

Message: 服务器和应用程序之间传递数据的载体,有properties(消息属性,用来修饰消息,比如消息的优 先级,延时投递)和Body(消息体)

virtual host(虚拟主机): 是一个逻辑概念,最上层的消息路由,一个虚拟主机中可以包含多个exhange 和 queue 但是一个虚拟主机中不能有名称相同的exchange 和queue

exchange 交换机: 消息直接投递到交换机上,然后交换机根据消息的路由key 来路由到对应绑定的队列上

bingding: 绑定 exchange 与queue的虚拟连接,bingding中可以包含route_key

route_key 路由key ,他的作用是在交换机上通过route_key来把消息路由到哪个队列上

queue:队列,用于来保存消息的载体,有消费者监听,然后消费消息

image-20220513080448800

4、RabbitMQ的整体架构模型

image-20220513081348810

5、RabbitMQ的消息是如何流转的

image-20220513081427272

6、RabbitMQ、RabbitMQ、Kafka比较

Kafka RabbitMQ RocketMQ
设计定位 系统间的数据流通道,实时数据处理。常用于日志收集处理、监控数据处理等大数据量、并发量最高 可靠传输消息、处理普通业务,如下单、充值、消息推送等常规业务,并发量较高 可靠传输消息、并发量较高,高于RabbitMQs
顺序消费 支持 支持 支持
定时/延迟消息 不支持 支持 支持
事务消息 / / 支持
消息查询· / / 支持类似mysql的语法查询服务器中的消息
消费方式 cosumer pull comsumer pull / broker push broker push
图形化界面 结合kibana 完美
总结 优点:高吞吐、低延迟
缺点:不支持事务,
优点:生态完善,图形化界面友好、支持多种语言
缺点:不支持事务,吞吐能力有限、消息堆积时,性能明显下降
优点:支持事务消息
缺点:生态不完善,图形化界面不友好、只支持java

![](/images/分布式-MQ-01RabbitMQ安装以及入门/MQ对比:Kafka VS Rocketmq VS Rabbitmq_00.png)

RabbitMQ安装与启动

:安装rabbitmq所需要的依赖包

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-

c++ kernel-devel m4 ncurses-devel tk tc xz

:下载安装包 (PS:老师的下载包的目录是 cd /usr/local/software)

wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm

wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

:安装服务命令

#第一步:安装erlang语言环境

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

#第二步:安装socat加解密软件

rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

#第三步:最后安装rabbitmqrpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

:修改集群用户与连接心跳检测

注意修改vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件

修改:loopback_users 中的 <<”guest”>>,只保留guest(不修改只能通过localhost访问)

image-20220514124004407

5、下载延时组件

wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

解压延时插件: unzip

rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

把延时插件拷贝到指定目录下:cp rabbitmq_delayed_message_exchange-20171215-3.6.x.ez

/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.5/plugins

启动延时插件: rabbitmq-plugins enable rabbitmq_delayed_message_exchange

6、访问ip:15672 访问rabbitMQ管理控制台

username:guest

password:guest

测试生成和消费消息

1、生产消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class RabbitmqProducter {
public static void main(String[] args) throws IOException, TimeoutException {
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();

//2设置连接工厂的属性
connectionFactory.setHost("192.168.136.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

//3:通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();

//4:通过连接创建channel
Channel channel = connection.createChannel();

//5:通过channel发送消息
for(int i=0;i<5;i++) {
String message = "hello--"+i;
/**
* 老师以前讲过说我们的消息会发送的exchange上,
* 但是在这里我们没有指定交换机?那我们的消息发送到哪里了????
* The default exchange is implicitly bound to every queue, with a routing key equal to the queue name.
* It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
* 说明:加入我们消息发送的时候没有指定具体的交换机的话,那么就会发送到rabbimtq指定默认的交换机上,
* 那么该交换机就会去根据routing_key 查找对应的queueName 然后发送的该队列上.
*
*/
channel.basicPublish("","test-queue-01",null,message.getBytes());
}

//6:关闭连接
channel.close();
connection.close();
}
}

2、消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class RabbitmqConsumer {

public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.136.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

//创建连接
Connection connection = connectionFactory.newConnection();

//创建一个channel
Channel channel = connection.createChannel();

//声明队列
String queueName = "test-queue-01";
/**
* queue:队列的名称
* durable:是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,
* 保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
* exclusive:当连接关闭时connection.close()该队列是否会自动删除;
* 二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,
* 没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常
* com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)
* 一般等于true的话用于一个队列只能有一个消费者来消费的场景
* autodelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,
* 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
*/
channel.queueDeclare(queueName,true,false,false,null);

//创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName,true,queueingConsumer);

while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String reserveMsg = new String(delivery.getBody());
System.out.println("消费消息:"+reserveMsg);
}
}
}

Rabbitmq交换机详解

1、作用

​ 接受生产者的消息,然后根据路由键 把消息投递到跟交换机绑定的对应的队列上

image-20220514145837082

2、属性

Name:交换机的名称

Type:交换机的类型,direct,topic,fanout,headers

Durability:是否需要持久化

autodelete:假如没有队列绑定到该交换机,那么该交换机会自动删除

Internal:当前交换机是否用户rabbitmq内部使用不常用,默认为false

Argurements:扩展参数,用户扩展AMQP 定制化协议

3、类型

直连交换机 directExchange

所以发送的direct exhchange 的消息都会被投递到与该交换机绑定且routing key = Key 的queue上。

image-20220514150212644

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 声明一个交换机
* exchange:交换机的名称
* type:交换机的类型 常见的有direct,fanout,topic等
* durable:设置是否持久化。durable设置为true时表示持久化,反之非持久化.持久化可以将交换器存入磁盘,在服务器重启的时候不会丢失相关信息
* autodelete:设置是否自动删除。autoDelete设置为true时,则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后,所有与这个交换器绑定的队列或者交换器都与此解绑。
* 不能错误的理解—当与此交换器连接的客户端都断开连接时,RabbitMq会自动删除本交换器
* arguments:其它一些结构化的参数,比如:alternate-exchange
*/
channel.exchangeDeclare("directChange","direct",false,false,null);
String queueName = "directQueue";

/**
* 声明一个队列
* durable:表示rabbitmq关闭删除队列
* autodelete:表示没有程序和队列建立连接 那么就会自动删除队列
*
*/
channel.queueDeclare(queueName,true,false,false,null);

channel.queueBind(queueName,"directChange","directChange.key");
channel.queueBind(queueName,"directChange","directChange.key1");

/**
* 创建一个消费者
*/
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

/**
* 开始消费
*/
channel.basicConsume(queueName,true,queueingConsumer);

while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String reciverMessage = new String(delivery.getBody());
System.out.println("消费消息:-----"+reciverMessage);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//定义交换机名称
String exchangeName = "directChange";

//定义routingKey
String routingKey = "directChange.key";
//定义routingKey
//String routingKey = "directChange.key1";

//消息体内容
String messageBody = "hello handsomeboy ";
channel.basicPublish(exchangeName,routingKey,null,messageBody.getBytes());
}
}

主题交换机 TopicExchange

​ 就是在队列上绑到topic 交换机上的路由key 可以是通过通配符来匹配的通配符的规则是

​ log.#可以匹配一个单词也可以匹配多个单词,

​ 比如 log.#可以匹配log.a log.a.b log.a.b

​ log.* 可以匹配一个单词

​ 比如 log.* 可以匹配log.a 但是不可以匹配log.a.b

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
消费者1
public class FanoutExchangeConsumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
Connection connection = ConnectionUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
//声明交换机
String exchangeName = "topicExchange";
String exchangeType = "topic";
channel.exchangeDeclare(exchangeName,exchangeType,true,true,null);
//声明队列
String quequName = "topic.queue";
channel.queueDeclare(quequName,true,false,false,null);
//声明绑定关系
String bingdingStr = "topic.*";
channel.queueBind(quequName,exchangeName,bingdingStr);
//声明一个消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//开始消费
channel.basicConsume(quequName,true,queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("FanoutExchangeConsumer接受到消息:"+new String(delivery.getBody()));
}
}
}
FanoutExchangeConsumer接受到消息:我是第一条消息
FanoutExchangeConsumer接受到消息:我是第二条消息


消费者2
public class FanoutExchangeConsumer1 {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
Connection connection = ConnectionUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
//声明交换机
String exchangeName = "topicExchange";
String exchangeType = "topic";
channel.exchangeDeclare(exchangeName,exchangeType,true,true,null);
//声明队列
String quequName = "topic.queue1";
channel.queueDeclare(quequName,true,false,false,null);
//声明绑定关系
String bingdingStr = "topic.#";
channel.queueBind(quequName,exchangeName,bingdingStr);
//声明一个消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//开始消费
channel.basicConsume(quequName,true,queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("FanoutExchangeConsumer1接受到消息:"+new String(delivery.getBody()));
}
}
}
FanoutExchangeConsumer1接受到消息:我是第一条消息
FanoutExchangeConsumer1接受到消息:我是第二条消息
FanoutExchangeConsumer1接受到消息:我是第三条消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class TopicExchangeProductor {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
Connection connection = ConnectionUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
//发送消息
String exchangeName = "topicExchange";
String routingKey1 = "topic.key1";
String routingKey2 = "topic.key2";
String routingKey3 = "topic.key.key3";
channel.basicPublish(exchangeName,routingKey1,null,"我是第一条消息".getBytes());
channel.basicPublish(exchangeName,routingKey2,null,"我是第二条消息".getBytes());
channel.basicPublish(exchangeName,routingKey3,null,"我是第三条消息".getBytes());
}
}

扇形交换机 fanoutExchange

就是消息通过从交换机到队列上不会通过路由key 所以该模式的速度是最快的 只要和交换机绑定的那么消息就会 被分发到与之绑定的队列上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class FanoutExchangeProductor {

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
Connection connection = ConnectionUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
//定义交换机名称
String exchangeName = "fanoutExchange";
//消息体内容
String messageBody = "hello handsome boy ";
channel.basicPublish(exchangeName,"",null,"我是第一条消息".getBytes());
channel.basicPublish(exchangeName,"456",null,"我是第二条消息".getBytes());
channel.basicPublish(exchangeName,"789",null,"我是第三条消息".getBytes());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class FanoutExchangeConsumer {

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
Connection connection = ConnectionUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
//声明交换机
String exchangeName = "fanoutExchange";
String exchangeType = "fanout";
channel.exchangeDeclare(exchangeName,exchangeType,true,true,null);
//声明队列
String quequName = "fanout.queue";
channel.queueDeclare(quequName,true,false,false,null);
//声明绑定关系
channel.queueBind(quequName,exchangeName,"");
//声明一个消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//开始消费
channel.basicConsume(quequName,true,queueingConsumer);

while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("接受到消息:"+new String(delivery.getBody()));
}
}
}

队列、绑定、虚拟主机、消息

绑定: exchange 与之间的连接关系(通过路由规则)

队列:用来存储消息的实体

队列的属性: durability 消息是否被持久化

AutoDelete :表示最后一个监听被移除那么该队列就会被删除

消息:用来生产着和消费者之间传递数据的

消息属性: 包括消息体body 和属性 properties

常用属性:delivery mode headerscontent_type(消息类型) content_encoding(消息编码),priporty(消息优

先级) ,correntlation_id(最为消息唯一的id),reply_to(消息失败做重回队列),expiretion(消息的过期时

),message_id(消息id);timestamp,type,user_id app_id,cluster_id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class RabbitmqMessageProducter {
public static void main(String[] args) throws IOException, TimeoutException {

//3:通过连接工厂创建连接对象
Connection connection = ConnectionUtils.getConnection();
//4:通过连接创建channel
Channel channel = connection.createChannel();

Map<String, Object> headsMap = new HashMap<>();
headsMap.put("company", "study");
headsMap.put("name", "handsome boy");

AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.deliveryMode(2)//2标识持久化消息 1标识 服务重启后 消息不会被持久化
.expiration("10000")//消息过期 10s
.contentEncoding("utf-8")
.correlationId(UUID.randomUUID().toString())
.headers(headsMap)
.build();

//5:通过channel发送消息
for (int i = 0; i < 5; i++) {
String message = "hello--" + i;
/**
* 老师以前讲过说我们的消息会发送的exchange上,
* 但是在这里我们没有指定交换机?那我们的消息发送到哪里了????
* The default exchange is implicitly bound to every queue, with a routing key equal to the queue name.
* It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
* 说明:加入我们消息发送的时候没有指定具体的交换机的话,那么就会发送到rabbimtq指定默认的交换机上,
* 那么该交换机就会去根据routing_key 查找对应的queueName 然后发送的该队列上.
*
*/
channel.basicPublish("directchange", "directchange.key", basicProperties, message.getBytes());
}

//6:关闭连接
channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class RabbitmqMessageConsumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接
Connection connection = ConnectionUtils.getConnection();
//创建一个channel
Channel channel = connection.createChannel();

String exchangeName = "directchange";
String exchangeType = "direct";
String queueName = "directqueue";
String routingKey = "directchange.key";
/**
* 声明一个交换机
* exchange:交换机的名称
* type:交换机的类型 常见的有direct,fanout,topic等
* durable:设置是否持久化。durable设置为true时表示持久化,反之非持久化.持久化可以将交换器存入磁盘,在服务器重启的时候不会丢失相关信息
* autodelete:设置是否自动删除。autoDelete设置为true时,则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后,所有与这个交换器绑定的队列或者交换器都与此解绑。
* 不能错误的理解—当与此交换器连接的客户端都断开连接时,RabbitMq会自动删除本交换器
* arguments:其它一些结构化的参数,比如:alternate-exchange
*/
channel.exchangeDeclare(exchangeName,exchangeType,true,false,null);

/**
* 声明一个队列
*/
channel.queueDeclare(queueName,true,false,false,null);

/**
* 队里和交换机绑定
*/
channel.queueBind(queueName,exchangeName,routingKey);

/**
* 创建一个消费者
*/
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

/**
* 开始消费
*/
channel.basicConsume(queueName,true,queueingConsumer);


while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String reserveMsg = new String(delivery.getBody());
System.out.println("encoding:"+delivery.getProperties().getContentEncoding());
System.out.println("company:"+delivery.getProperties().getHeaders().get("company"));
System.out.println("correlationId:"+delivery.getProperties().getCorrelationId());
}
}
}