一、前言
在准备基于RabbitMQ topic的消息发送与接收
的时候,顺便开始了这篇RabbitMQ初探。 安装过程这里就不再赘述,本博客主要记录一些初探过程。
二、简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、 ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
三、几种模式
Rabbit MQ官网教程上,从六个方面给出了讲解和demo。但是这些demo都是从最基础的层面进行编写,所以我自己尝试时候用了spring boot的相关支持,简化了很多代码。同时在尝试之后,我大概将其分为三类:
- Hello World(Queue)
- Publish/Subscribe(Exchange)
- RPC
下面分别回顾一下上述三类情况:
1.Hello World
我把官网上的Hello World和Work Queues归到这一类。结构如下:
Hello World
Work Queues
它们都属于直接把消息发送给一个queue,然后consumers会与queue相连接。 相关代码可参考:https://github.com/zhangyuyu/learnrabbitmq/commits/master 的第一次、第二次提交。
2.Publish/Subscribe(Exchange)
这种模式使用了Exchange,可以把它理解为消息交换机,它指定消息按什么规则,路由到哪个队列。 因此我把官网上的Publish/Subscribe、Routing、Topics放到这一类中。
根据Exchange的类型,可以分为四类:
- Fanout
- Direct
- Topic
- Headers
Fanout Exchange
该类型路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能 相关代码可参考:https://github.com/zhangyuyu/learnrabbitmq/commits/master 的Rabbitmq Fanout Exchange。
Direct Exchange
该类型路由规则,会把消息路由到那些binding key与routing key完全匹配的Queue中。 相关代码可参考:https://github.com/zhangyuyu/learnrabbitmq/commits/master 的Rabbitmq Direct Exchange。
Topic Exchange
该类型路由规则与上述Direct Exchange规则类似,只是在binding key与routing key的匹配规则上进行了扩展,它约定:
- routing key为一个句点号”. “分隔的字符串(我们将被句点号”. “分隔开的每一段独立的字符串称为一个单词),如”routing.key.for.fist.queue”
- binding key与routing key一样也是句点号”. “分隔的字符串
- binding key中可以存在两种特殊字符”*“与”#”,用于做模糊匹配,其中”“用于匹配一个单词,”#”用于匹配多个单词(可以是零个)。
相关代码可参考:https://github.com/zhangyuyu/learnrabbitmq/commits/master 的Rabbitmq Topic Exchange。
Headers Exchange
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 相关代码可参考:https://github.com/zhangyuyu/learnrabbitmq/commits/master 的Rabbitmq Headers Exchange。
3.RPC
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用),可直接参考官方教程。
RabbitMQ中实现RPC的机制是:
- 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
- 服务器端收到消息并处理
- 服务器端处理完消息之后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性。
- 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理
四、其他
Dashboard:
其中:
- Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
- Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
术语:
- Broker:简单来说就是消息队列服务器实体。
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
- Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
- vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
- producer:消息生产者,就是投递消息的程序。
- consumer:消息消费者,就是接受消息的程序。
- channel:消息通道,在客户端的每个连接里,可以建立多个channel,每个channel代表一个会话任务。
消息队列的使用过程大概如下:
- 客户端连接到消息队列服务器,打开一个channel。
- 客户端声明一个exchange,并设置相关属性。
- 客户端声明一个queue,并设置相关属性。
- 客户端使用routing key,在exchange和queue之间建立好绑定关系。
- 客户端投递消息到exchange。