分享一个使用过的非常好用的中间件-RabbitMQ
为什么要有消息队列?
我们来模拟一个电商场景:
- 商品数据保存在数据库中,增删改查也在数据库中完成
- 搜素服务数据来源是 ElasticSearch,如果数据库商品发生变化,ES 不能及时更新
- 商品详情页面做了静态化处理,静态页面数据不会随着数据库更新而变化
这样会导致,后台已经修改了商品数据,但是页面展示的依然是旧数据,这样显然不对,如何解决?
也许你会想到:
- 每当后台对商品做了数据修改,同时修改 ES 数据并更新静态页面
- 页面静态化服务对外提供操作接口,后台在商品数据修改后,调用接口
但是这两种方案都有一个严重问题:代码耦合,后台服务中需要嵌入搜索和商品服务,违背了微服务独立原则
这时候,消息队列出现了!
商品服务对数据修改后,无需操作 ES 和 静态页面,只需向 MQ 发送一条消息(比如包含商品id的消息)。静态页面服务监听 MQ,接收消息,然后分别去处理 ES 和静态页面。
什么是消息队列
“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
为什么选择 RabbitMQ
- 使用简单、功能强大
- 基于 AMQP 协议
- 社区活跃、文档完整
- 高并发性能好(得益于 Erlang 语言)
- SpringBoot 默认已继承 RabbitMQ,使用起来更加方便,只需要做简单的配置
工作原理
基本结构:
- Broker:消息队列服务进程,此进程包括两个部分:Exchange、Queue
- Exchange:消息队列交换机,按一定规则将消息路由转发到某个队列,对消息进行过滤
- Queue:消息队列,存储消息的队列,消息到达队列并转发给指定消费者
- Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到消息队列
- Consumer:消息消费者,即消费方客户端,接收MQ转发的消息
生产者发送消息流程:
- 生产者和 Broker 建立 TCP 连接
- 生产者和 Broker 建立通道
- 生产者通过通道消息发送给 Broker,由 Exchange 将消息进行转发
- Exchange 将消息转发到指定的 Queue
消费者接收消息流程:
- 消费者和 Broker 建立 TCP 连接
- 消费者和 Broker 建立通道
- 消费者监听指定的 Queue
- 当有消息到达 Queue 时 Broker 默认将消息推送给消费者
- 消费者接受到消息
- ack 回复
常见消息模型
①基本消息模型:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接收者,会一直等待消息到来
- queue(红色部分):消息队列,可以缓存消息;生产者向其中投递消息,消费者从中取出消息
②work消息模型:
工作队列或者竞争消费者模式
与基本消息模型相比,该模型多了消费端,多个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
这个消息模型在 Web 应用程序特别有用,可以处理短的 HTTP 请求窗口无法处理复杂的任务。
当一个消费者比较耗时,处理消息效率就会降低很多:
通过如下代码:
|
|
basicQos
方法设置prefetchCount
为 1,这样mq
会使得每个consumer
在同一时间点最多处理一个消息。所以,如果该消费者没有确认消息,队列不会将新的消息分发给它。相反,会分派给不忙碌的下一个消费者。
**注意:**prefetchCount 在手动 ack 的情况下才生效,自动 ack 不生效。
开启手动 ack:
basicConsume 方法第二个参数设置为 false
|
|
一般情况下,对于不重要的消息(丢失也无所谓)可以采取自动 ack,方便
如果消息非常重要,不容丢失,那么最好在消费完成后手动 ack,否则接收消息后就自动 ack,mq 会把消息队列中该消息删除,如果此时消费者宕机,那么消息丢失。
③Publish/subscribe(交换机类型:Fanout,也成为广播)
生产者:
和前两种模式不同:
- 声明Exchange,不再声明Queue
- 发送消息到Exchange,不再发送到Queue
与work queues有何区别?
异:
- work queues不用定义交换机,而publish/subscribe需要定义交换机
- p/s 的生产者是面向交换机发送消息,work queues生产者是面向队列发送消息(底层使用默认交换机)
- p/s 需要设置队列和交换机的绑定,work queues不需要设置,实际上work queues会将队列绑定到默认交换机
同:
两者实现的发布/订阅效果一样,多个消费端监听同一个队列不会重复消费消息
④Routing 路由模型(交换机类型:direct)
P:生产者:向Exchange发送消息,发送消息时,会指定一个routing key X:Exchange(交换机):接收生产者的消息,然后把消息递给与 routing key 完全匹配的队列
C1:消费者:所在队里指定了需要routing key 为error 的消息
C2:消费者:所在队列指定了需要routing key 为info、error、warning的消息
⑤Topics 通配符模式(交换机类型:topics)
每个消费者监听自己的队列,并且设置带通配符的 routing key,生产者将消息发送给 broker,由交换机根据 routing key来转发消息到指定的队列
结束语
文中没有给出 RabbitMQ 相关代码,可以去官网文档学习。
RabbitMQ 的功能也远远不止本文描述,感兴趣的话可以去官网学习
这里给出 RabbitMQ 的官网地址:Documentation: Table of Contents — RabbitMQ