返回

强大的RabbitMQ

强大的消息队列-RabbitMQ

分享一个使用过的非常好用的中间件-RabbitMQ

为什么要有消息队列?

我们来模拟一个电商场景:

  • 商品数据保存在数据库中,增删改查也在数据库中完成
  • 搜素服务数据来源是 ElasticSearch,如果数据库商品发生变化,ES 不能及时更新
  • 商品详情页面做了静态化处理,静态页面数据不会随着数据库更新而变化

这样会导致,后台已经修改了商品数据,但是页面展示的依然是旧数据,这样显然不对,如何解决?

也许你会想到:

  1. 每当后台对商品做了数据修改,同时修改 ES 数据并更新静态页面
  2. 页面静态化服务对外提供操作接口,后台在商品数据修改后,调用接口

但是这两种方案都有一个严重问题:代码耦合,后台服务中需要嵌入搜索和商品服务,违背了微服务独立原则

这时候,消息队列出现了!

商品服务对数据修改后,无需操作 ES 和 静态页面,只需向 MQ 发送一条消息(比如包含商品id的消息)。静态页面服务监听 MQ,接收消息,然后分别去处理 ES 和静态页面。

什么是消息队列

“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

为什么选择 RabbitMQ

  • 使用简单、功能强大
  • 基于 AMQP 协议
  • 社区活跃、文档完整
  • 高并发性能好(得益于 Erlang 语言)
  • SpringBoot 默认已继承 RabbitMQ,使用起来更加方便,只需要做简单的配置

工作原理

基本结构:

MQ 工作原理

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange、Queue
  • Exchange:消息队列交换机,按一定规则将消息路由转发到某个队列,对消息进行过滤
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定消费者
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到消息队列
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息

生产者发送消息流程:

  1. 生产者和 Broker 建立 TCP 连接
  2. 生产者和 Broker 建立通道
  3. 生产者通过通道消息发送给 Broker,由 Exchange 将消息进行转发
  4. Exchange 将消息转发到指定的 Queue

消费者接收消息流程:

  1. 消费者和 Broker 建立 TCP 连接
  2. 消费者和 Broker 建立通道
  3. 消费者监听指定的 Queue
  4. 当有消息到达 Queue 时 Broker 默认将消息推送给消费者
  5. 消费者接受到消息
  6. ack 回复

常见消息模型

①基本消息模型:

img

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接收者,会一直等待消息到来
  • queue(红色部分):消息队列,可以缓存消息;生产者向其中投递消息,消费者从中取出消息

②work消息模型:

工作队列或者竞争消费者模式

img

与基本消息模型相比,该模型多了消费端,多个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。

这个消息模型在 Web 应用程序特别有用,可以处理短的 HTTP 请求窗口无法处理复杂的任务。

当一个消费者比较耗时,处理消息效率就会降低很多:

通过如下代码:

1
channel.basicQos(1);

basicQos 方法设置 prefetchCount 为 1,这样 mq 会使得每个 consumer 在同一时间点最多处理一个消息。所以,如果该消费者没有确认消息,队列不会将新的消息分发给它。相反,会分派给不忙碌的下一个消费者。

**注意:**prefetchCount 在手动 ack 的情况下才生效,自动 ack 不生效。

开启手动 ack:

basicConsume 方法第二个参数设置为 false

1
channel.basicConsume(QUEUE_NAME, false, consumer);

一般情况下,对于不重要的消息(丢失也无所谓)可以采取自动 ack,方便

如果消息非常重要,不容丢失,那么最好在消费完成后手动 ack,否则接收消息后就自动 ack,mq 会把消息队列中该消息删除,如果此时消费者宕机,那么消息丢失。

③Publish/subscribe(交换机类型:Fanout,也成为广播)

img

生产者:

和前两种模式不同:

  1. 声明Exchange,不再声明Queue
  2. 发送消息到Exchange,不再发送到Queue

与work queues有何区别?

异:

  1. work queues不用定义交换机,而publish/subscribe需要定义交换机
  2. p/s 的生产者是面向交换机发送消息,work queues生产者是面向队列发送消息(底层使用默认交换机)
  3. p/s 需要设置队列和交换机的绑定,work queues不需要设置,实际上work queues会将队列绑定到默认交换机

同:

​ 两者实现的发布/订阅效果一样,多个消费端监听同一个队列不会重复消费消息

④Routing 路由模型(交换机类型:direct)

img

P:生产者:向Exchange发送消息,发送消息时,会指定一个routing key X:Exchange(交换机):接收生产者的消息,然后把消息递给与 routing key 完全匹配的队列

C1:消费者:所在队里指定了需要routing key 为error 的消息

C2:消费者:所在队列指定了需要routing key 为info、error、warning的消息

⑤Topics 通配符模式(交换机类型:topics)

img

每个消费者监听自己的队列,并且设置带通配符的 routing key,生产者将消息发送给 broker,由交换机根据 routing key来转发消息到指定的队列

结束语

文中没有给出 RabbitMQ 相关代码,可以去官网文档学习。

RabbitMQ 的功能也远远不止本文描述,感兴趣的话可以去官网学习

这里给出 RabbitMQ 的官网地址:Documentation: Table of Contents — RabbitMQ


Built with Hugo
主题 StackJimmy 设计
本博客已稳定运行