RabbitMQ

作用

  • 解耦

  • 异步

  • 削峰

缺点

  • 系统可用性降低:需要保证消息中间件的正常
  • 系统复杂性提高:需要保证消息的正确使用,重复消费、丢失、顺序问题、一致性问题等都需要考虑

常见消息中间件对比

特性 ActiveMQ RabbitMQ RocketMQ Kafka
语言 Java Erlang Java Scala
单机吞吐 十万 十万
时效性 ms us ms ms(以内)
可用性 高(主从架构) 高(主从架构) 非常高 (分布式架构) 非常高 (分布式架构)
功能特性 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 MQ功能比较完备,扩展性佳 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。

安装

Erlang

1 下载 erlang 安装包

在官网下载然后上传到 Linux 上或者直接使用下面的命令下载对应的版本。

1
[root@SnailClimb local]#wget https://erlang.org/download/otp_src_19.3.tar.gz

erlang 官网下载:https://www.erlang.org/downloads

2 解压 erlang 安装包

1
[root@SnailClimb local]#tar -xvzf otp_src_19.3.tar.gz

3 删除 erlang 安装包

1
[root@SnailClimb local]#rm -rf otp_src_19.3.tar.gz

4 安装 erlang 的依赖工具

1
[root@SnailClimb local]#yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel

5 进入erlang 安装包解压文件对 erlang 进行安装环境的配置

新建一个文件夹

1
[root@SnailClimb local]# mkdir erlang

对 erlang 进行安装环境的配置

1
2
[root@SnailClimb otp_src_19.3]# 
./configure --prefix=/usr/local/erlang --without-javac

6 编译安装

1
2
[root@SnailClimb otp_src_19.3]# 
make && make install

7 验证一下 erlang 是否安装成功了

1
[root@SnailClimb otp_src_19.3]# ./bin/erl

运行下面的语句输出“hello world”

1
io:format("hello world~n", []).

8 配置 erlang 环境变量

1
[root@SnailClimb etc]# vim profile

追加下列环境变量到文件末尾

1
2
3
4
#erlang
ERL_HOME=/home/jungle/erlang/otp_src_19.3/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH

运行下列命令使配置文件profile生效

1
[root@SnailClimb etc]# source /etc/profile

输入 erl 查看 erlang 环境变量是否配置正确

1
[root@SnailClimb etc]# erl

RabbitMQ

1. 下载rpm

1
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.8/rabbitmq-server-3.6.8-1.el7.noarch.rpm

或者直接在官网下载

https://www.rabbitmq.com/install-rpm.html

2. 安装rpm

1
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

紧接着执行:

1
yum install rabbitmq-server-3.6.8-1.el7.noarch.rpm

中途需要你输入”y”才能继续安装。

3 开启 web 管理插件

1
rabbitmq-plugins enable rabbitmq_management

4 设置开机启动

1
chkconfig rabbitmq-server on

5. 启动服务

1
service rabbitmq-server start

6. 查看服务状态

1
service rabbitmq-server status

工作模式

简单模式

工作队列

Work queues

发布/订阅模式

Publish/Subscribe

路由模式

Routing

主题模式

Topics

远程调用

RPC

发送确认

Publisher Confirms

高可用

RabbitMQ分为三种模式:单机普通集群镜像集群

单位:生产不用

普通集群:创建的queue仅保存在一台实例,其他实例拉取该实例的元数据(配置信息,可通过此找到数据真正存放处)

  • 如果消费时连接的不是queue信息保存的该实例,则连接的实例会通过元数据(配置信息)去拉取queue信息,拉取数据有消耗
  • 如果保证消费的queue一直连接固定实例,那就是单机了
  • queue数据仅保存在一台实例,遇到宕机,得等到实例恢复,没用高可用

镜像集群:每个实例都有全部元数据queue

  • 可以保证高可用,但是每次消息更新都需要同步到所有实例,会有很高的带宽消耗,queue的存储大小超过了实例容量,也无法进行扩展

重复消费(保证幂等)

具体业务考虑

可靠性(不丢失)

生产者 -> RabbitMQ ->消费者

生产者弄丢消息

解决方式一:开启事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
try {
// 通过工厂创建连接
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启事务
channel.txSelect();

// 这里发送消息
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

// 模拟出现异常
int result = 1 / 0;

// 提交事务
channel.txCommit();
} catch (IOException | TimeoutException e) {
// 捕捉异常,回滚事务
channel.txRollback();
}

​ 同步方法,吞吐量太小

解决方式二:使用confirm模式

1.普通 confirm 模式:每发送一条消息后,调用 waitForConfirms() 方法,等待服务器端 confirm,如果服务端返回 false 或者在一段时间内都没返回,客户端可以进行消息重发。

1
2
3
4
5
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if (!channel.waitForConfirms()) {
// 消息发送失败
// ...
}

2.批量 confirm 模式:每发送一批消息后,调用 waitForConfirms() 方法,等待服务端 confirm。

1
2
3
4
5
6
7
8
channel.confirmSelect();
for (int i = 0; i < batchCount; ++i) {
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if (!channel.waitForConfirms()) {
// 消息发送失败
// ...
}

3.异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后客户端会回调这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}

public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});

while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}

RabbitMQ弄丢消息

  • 创建queue的时候设置为持久化,持久化queue的元数据(不会持久化queue里数据)
  • 发送消息时,设置deliveryMode为2,持久化消息

消费者弄丢消息

使用ack机制,在声明队列时,设置noAck=false 使RabbitMQ在收到消费者ack信号后才会在队列中删除消息

保证顺序

每个queue一个consumer