快速入门
消息队列功能介绍
应用解耦
复杂的应用里会存在多个子系统,比如在电商应用中有订单系统、库存系统、物流系统、支付系统等。这时候如果各个子系统之间的耦合性太高,整体系统的可用性就会大幅降低。多个低错误率的子系统强耦合在一起,得到的是一个高错误率的整体系统。
以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出现故障或因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
当转变成基于消息队列的方式后,系统可用性就高多了,比如物流系统因为发生故障,需要几分钟的时间来修复,在这几分钟的时间里,物流系统要处理的内容被缓存在消息队列里,用户的下单操作可以正常完成。当物流系统恢复后,补充处理存储在消息队列里的订单信息即可,终端用户感知不到物流系统发生过几分钟的故障。
流量消峰
通过利用消息队列,把大量的请求暂存起来,分散到相对长的一段时间内处理,能大大提高系统的稳定性和用户体验。
举个例子,如果订单系统每秒最多能处理一万次下单,这个处理能力应对正常时段的下单是绰绰有余的,正常时段我们下单后一秒内就能返回结果。在双十一零点的时候,如果没有消息队列这种缓存机制,为了保证系统稳定,只能在订单超过一万次后就不允许用户下单了;如果有消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单后十几秒才能收到下单成功的状态,但是也比不能下单的体验要好。
使用消息队列进行流量消峰,很多时候不是因为能力不够,而是出于经济型的考量。比如有的业务系统,流量最高峰也不会超过一万QPS,而平时只有一千左右的QPS。这种情况下我们就可以用个普通性能的服务器(只支持一千左右的QPS就可以),然后加个消息队列作为高峰期的缓冲,无须花大笔资金部署能处理上万QPS的服务器。
消息分发
在大数据时代,数据对很多公司来说就像金矿,公司需要依赖对数据的分析,进行用户画像、精准推送、流程优化等各种操作,并且对处理的实时性要求越来越高。数据是不断产生的,各个分析团队、算法团队都要依赖这些数据来进行工作,这个时候有个可持久化的消息队列就非常重要。数据的产生方只需要把各自的数据写入一个消息队列即可,数据使用放根据各自需求订阅感兴趣的数据,不同数据团队所订阅的数据可以重复也可以不重复,互不干扰,也不必和数据产生方关联。
各个子系统将日志数据不停地写入消息队列,不同的数据处理系统有各自的Offset,互不影响。甚至某个团队处理完的结果数据也可以写入消息队列,作为数据的产生方,供其他团队使用,避免重复计算。在大数据时代,消息队列已经成为数据处理系统不可或缺的一部分。
除了上述列出的应用解耦、流量消峰、消息分发等功能外,消息队列还有保证最终一致性、方便动态扩容等功能。
RocketMQ简介
时间 | MQ | 拉取方式 | 解决问题 |
---|---|---|---|
2007 | Notify | 推模型 | 解决了事务消息 |
2010 | Napoli | ||
2011 | MetaQ | 拉模型 | 解决了顺序消息和海量堆积 |
2012 | RocketMQ | 长轮询 | 兼具第一代Notify和第二代MetaQ的优点 |
RocketMQ是使用Java语言开发的,比起Kafka的Scala语言和RabbitMQ的Erlang语言,更容易找到技术人员进行定制开发。
快速上手RocketMQ
RocketMQ的下载、安装和配置
官网下载链接:http://rocketmq.apache.org/dowloading/releases
1 | unzip rocketmq-all-4.9.0-bin-release.zip |
目录文件:
LICENSE
:版权声明NOTICE
:版权声明README.md
:功能说明信息benchmark/
:包括运行benchmark程序的shell脚本bin/
:含有各种使用RocketMQ的shell脚本和cmd脚本conf/
:一些示例配置文件,包括三种方式的broker配置文件、logback日志配置文件等,用户在写配置文件的时候,一般基于这些示例配置文件,加上自己特殊的需求即可lib/
:包括RocketMQ各个模块编译成的jar包,以及RocketMQ依赖的一些jar包,比如Netty、commons-lang、FastJSON等
启动/关闭消息队列服务
启动单机的消息队列服务比较简单,不需要写配置文件,只需要依次启动本机的NameServer和Broker即可。
(1)启动NameServer
1 | nohup bin/mqnamesrv & |
(2)启动Broker
1 | nohup bin/mqbroker -n localhost:9876 & |
(3)关闭NameServer
1 | bin/mqshutdown namesrv |
(4)关闭Broker
1 | bin/mqshutdown broker |
用命令行发送和接收消息
1 | export NAMESRV_ADDR=localhost:9876 |
1 | bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer |
生产环境下的配置和使用
RocketMQ各部分角色介绍
RocketMQ由四部分组成:
- 发信者:Producer
- 收信者:Consumer
- 负责暂存、传输:Broker
- 负责协调各个地方邮局的管理机构:NameServer
启动RocketMQ的顺序是先启动NameServer,再启动Broker,这时候消息队列已经可以提供服务了,想发送消息就是用Producer来发送,想接收消息就是用Consumer来接收。
很多应用程序既要发送,又要接收,可以启动多个Producer和Consumer来发送多种消息,同时接收多种消息。
为了消除单点故障,增加可靠性或大吞吐量,可以在多台机器上部署多个NameServer和Broker,为每个Broker部署一个或多个Slave。
- Topic:一个分布式消息队列中间件部署好以后,可以给很多个业务提供服务,同一个业务也有不同类型的消息要投递,这些不同类型的消息以不同的Topic名称来区分。所以发送和接收消息前,先创建Topic,针对某个Topic发送和接收消息。
- Message Queue:有了Topic以后,还需要解决性能问题。如果一个Topic要发送和接收的数据量非常大,需要能支持增加并行处理的机器来提高处理速度,这时候一个Topic可以根据需求设置一个或多个Message Queue,Message Queue类似分区或Partition。Topic有了多个Message Queue后,消息可以并行地向各个Message Queue发送,消费者也可以并行地从多个Message Queue读取消息并消费。
配置参数介绍
conf/broker.conf
:
namesrvAddr=xxx.xxx.xxx.xxx:9876; xxx.xxx.xxx.xxx:9876
:NameServer的地址,可以是多个brokerClusterName=DefaultCluster
:Cluster的地址,如果集群机器数比较多,可以分成多个Cluster,每个Cluster供一个业务群使用。brokerName=broker-a
:Broker的名称,Master和Slave通过使用相同的Broker名称来表明相互关系,以说明某个Slave是那个Master的SlavebrokerId=0
:一个Master Broker可以有多个Slave,0表示Master,大于0表示不同Slave的ID。fileReservedTime=48
:在磁盘上保存消息的时长,单位是小时,自动删除超时的消息。deleteWhen=4
:与fileReservedTime参数呼应,表明在几点做消息删除动作,默认值04表示凌晨4点。brokerRole=SYNC_MASTER
: brokerRole有3种:SYNC_MASTER
、AYNC_MASTER
、SLAVE
。关键词SYNC
和ASYNC
表示Master和Slave之间同步消息的机制,`SYNC·的意思是当Slave和Master消息同步完成后,再发送成功的状态。flushDiskType=ASYNC_FLUSH
:flushDiskType表示刷盘策略,分为SYNC_FLUSH
和ASYNC_FLUSH
两种,分别代表同步刷盘和异步刷盘。同步刷盘情况下,消息真正写入磁盘后再返回成功状态;异步刷盘情况下,消息写入page_cache后就返回成功状态。listenPort=10911
:Broker监听的端口号,如果一台机器上启动了多个Broker,则要设置不同的端口号,避免冲突。storePathRootDir=/home/rocketmq/store-a
:存储消息以及一些配置信息 的根目录brokerIP1=xxx.xxx.xxx.xxx
:现在使用云服务或多网卡的机器比较普遍,Broker自动探测获得的ip地址可能不符合要求,通过brokerIP1设置Broker机器对外暴露的ip地址。
单机器配置和部署
以腾讯云服务器(IP=159.75.242.242
)为例:
注意需要设置防火墙打开9876, 10911
端口
启动NameServer:
nohup bin/mqnamesrv >> namesrv_nohup.out &
配置broker.conf并启动Broker:
conf/broker.conf
:
1 | brokerClusterName = DefaultCluster |
nohup bin/mqbroker -n 159.75.242.242:9876 -c conf/broker.conf >> broker_nohup.out &
命令行发送接收消息
1 | export NAMESRV_ADDR=localhost:9876 |
1 | bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer |
Spring发送和接收消息
Add Dependency
maven:
1 | <dependency> |
gradle:
1 | compile 'org.apache.rocketmq:rocketmq-client:4.3.0' |
Send Messages
Send Messages Synchronously
Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS notification, SMS marketing system, etc..
1 | private static void sycProducer() throws Exception{ |
Send Messages Asynchronously
Asynchronous transmission is generally used in response time sensitive business scenarios.
1 | private static void asyncProducer() throws Exception{ |
Send Messages in One-way Mode
One-way transmission is used for cases requiring moderate reliability, such as log collection.
1 | private static void onewayProducer() throws Exception{ |
Consume Messages
1 | private static void consumer() throws InterruptedException, MQClientException { |
多机集群配置和部署
略
用适合的方式发送和接收消息
不同类型的消费者
根据使用者对读取操作的控制情况,消费者可分为两种类型:
(1)DefaultMQPushConsumer
,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;
(2)DefaultMQPullConsumer
,读取操作中的大部分功能由使用者自主控制。
DefaultMQPushConsumer的使用
使用DefaultMQPushConsumer主要是设置好各种参数和传入消息的函数。系统收到消息后自动调用处理函数来处理消息,自动保存Offset,而且加入新的DefaultMQPushConsumer后会自动做负载均衡。
1 | private static void pushConsumer() throws Exception{ |
DefaultMQPushConsumer需要设置三个参数:
(1)GroupName
:用于把多个Consumer组织到一起,提高并发处理能力,GroupName需要和消息模式(MessageModel)配合使用。
RocketMQ支持两种消息模式:
Clustering
模式:同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的。Broadcasting
模式:同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。
(2)NameServer的地址和端口号,可以填写多个,用分号隔开,达到消除单调故障的目的;
(3)Topic名称用来标识消息类型,需要提前创建。如果不需要消费某个Topic下的所有消息,可以通过制定消息的Tag进行消息过滤,比如consumer.subscribe("TopicTest", "tag1 || tag2 || tag3")
表示这个Consumer要消费TopicTest下带有tag1或tag2或tag3的消息(Tag是在发送消息时设置的标签)。在填写Tag参数的位置,用null或者“*”表示要消费这个Topic的所有消息。
DefaultMQPushConsumer的处理流程
具体参考DefaultMQPushConsumerImpl
.
通过“长轮询”方式达到了Push效果的方法,长轮询方式既有Pull的优点,又兼具Push方式的实时性。
推送方式 | 说明 | 优点 | 缺点 |
---|---|---|---|
Push方式 | Server端接收到消息后,主动把消息推送到Client端 | 实时性高 | 对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端:①首先加大Server端的工作量,进而影响Server的性能;②其次,Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。 |
Pull方式 | Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。 | 循环拉取的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;每个Pull的时间间隔太长,Server端有消息到来时,有可能没有被及时处理。 | |
长轮询 | 服务端接到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次waitForRunning一段时间(默认5秒),然后再Check。默认情况下当Broker一直没有新消息,第三次Check的时候,等待时间超过Request里面的BrokerSuspendMaxTimeMillis,就返回空结果。在等待的过程中,Broker收到了新的消息后会直接调用notifyMessageArriving函数返回请求结果。“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。 | 通过Client端和Server端的配合,达到既拥有Pull的优点,又能达到保证实时性的目的。主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。 | 在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。 |
DefaultMQPushConsumer的流量控制
PushConsumer的核心还是Pull方式,所以采用这种方式的客户端能够根据自身的处理速度调整获取消息的操作速度。因为采用多线程处理方式实现,流量控制方面比单线程要复杂得多。
PushConsumer有个线程池,消息处理逻辑在各个线程里同时执行:
1 | this.consumeExecutor = new ThreadPoolExecutor( |
Pull获得的消息,如果直接提交到线程池里执行,很难监控和控制,比如如何得知当前消息堆积的数量?如何重复处理某些消息?如何延迟处理某些消息?RocketMQ定义了一个快照类ProcessQueue来解决这些问题,在PushConsumer运行的时候,每个Message Queue都会有个对应的ProcessQueue对象,保存了这个Message Queue消息处理状态的快照。
ProcessQueue对象里主要的内容是一个TreeMap和一个读写锁。TreeMap里以Message Queue的Offset作为Key,以消息内容的引用为Value,保存了所有从MessageQueue获取到,但是还未被处理的消息;读写锁控制着多个线程对TreeMap对象的并发访问。
有了ProcessQueue对象,流量控制就方便和灵活多了,客户端在每次Pull请求前会做下面三个判断来控制流量:
1 | public void pullMessage(final PullRequest pullRequest) { |
PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,任何一个值超过设定大小就隔一段时间再拉取消息,从而达到流量控制的目的。此外processQueue还可以辅助实现顺序消费的逻辑。
DefaultMQPullConsumer
使用DefaultMQPullConsumer像使用DefaultMQPushConsumer一样需要设置各种参数,写处理消息的函数,同时还需要做额外的事情。
1 | private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>(); |
逐个读取Topic下所有Message Queue的内容,读完一遍后退出,主要处理额外的三件事情:
(1)获取Message Queue并遍历:
一个Topic包括多个Message Queue,如果这个Consumer需要获取Topic下所有的消息,就要遍历多个Message Queue。如果有特殊情况,也可以选择某些特定的Message Queue来读取消息。
(2)维护Offsetstore
从一个Message Queue里拉取消息的时候,要传入Offset参数(long类型的值),随着不断你读取消息,Offset会不断增长。这个时候由用户负责把Offset存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等。
(3)根据不同的消息状态做不通的处理
拉取消息的请求发出后,会返回:FOUND、NO_MATCHED_MSG、NO_NEW_MSG、OFFSET_ILLEGAL四种状态,需要根据每个状态做出不同的处理。比较重要的两个状态时FOUNT和NO_NEW_MSG,分别表示获取到消息和没有新的消息。
Consumer的启动、关闭流程
消息队列一般是提供一个不间断的持续性服务,Cosumer在使用过程中,如何才能优雅地启动和关闭,确保不漏掉或者重复消费消息呢?
Consumer分为Push和Pull两种方式,对于PullConsumer来说,使用者主动权很高,可以根据实际需要暂停、停止、启动消费过程。需要注意的是Offset的保存,要在程序的异常处理部分增加把Offset写入磁盘方面的处理,记准了每个Message Queue的Offset,才能保证消息消费的准确性。
DefaultMQPushConsumer的退出,要调用shutdown()函数,以便释放资源、保存Offset等。这个调用要加到Consumer所在应用的退出逻辑中。
PushConsumer在启动的时候,会做各种配置检查,然后连接NameServer获取Topic信息,启动时如果遇到异常,比如无法连接NameServer,程序仍然可以正常启动不报错(日志里有WARN信息)。在单机环境下可以测试这种情况,启动DefaultMQPushConsumer时故意把NameServer地址填错,程序仍然可以正常启动,但是不会收到消息。
为什么DefaultMQPushConsumer在无法连接NameServer时不直接报错退出呢?
这和分布式系统的设计有关,RocketMQ集群可以有多个NameServer、Broker,某个机器出异常后整体服务依然可用。所以DefaultMQPushConsumer被设计成当发现某个连接异常时不立刻退出,而是不断尝试重新连接。可以进行这样一个测试,在DefaultMQPushConsumer正常运行的时候,手动kill掉Broker或NameServer,过一会儿再启动。会发现DefaultMQPushConsumer不会出错退出,在服务恢复后正常运行,在服务不可用的这段时间,仅仅会在日志里报异常信息。
如果需要在DefaultMQPushConsumer启动的时候,及时暴露配置问题,该如何操作呢?
可以在Consumer.start()语句后调用Consumer.fetchSubscribeMessageQueues(“TopicName”),这时如果配置信息写得不准确,或者当前服务不可用,这个语句会报MQClientException异常。
不同类型的生产者
DefaultMQProducer
生产者发送消息默认使用的DefaultMQProducer类,详细代码如下:
1 | private static void defaultProducer() throws MQClientException, InterruptedException{ |
发送消息要经过五个步骤:
1)设置Producer的GroupName
2)设置InstanceName,当一个JVM需要启动多个Producer的时候,通过设置不同的InstanceName来区分,不设置的话系统使用默认的名称“DEFAULT”;
3)设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次。
4)设置NameServer地址;
5)组装消息并发送;
消息的发送有同步和异步两种方式,上面的代码使用的是异步方式。
消息发送的返回状态有如下四种:
FLUSH_DISK_TIMEOUT
:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成SYNC_FLUSH才会这个错误)FLUSH_SLAVE_TIMEOUT
:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在设定时间内完成主从同步;SLAVE_NOT_AVAILABLE
:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。SEND_OK
:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?是否被同步到了Slave上?在Slave上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。简单理解为,没发生上述列出的三个问题状态就是SEND_OK。
发送延迟消息
RocketMQ支持发送延迟消息,Broker收到这类消息后,延迟一段时间再处理,使消息在规定的一段时间后生效。
延迟消息的使用方法是在创建Message对象时,调用setDelayTimeLevel(int level)方法设置延迟时间,然后再把这个消息发送出去。目前延迟的事件不支持任意设置,仅支持预设值的时间长度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。
自定义消息发送规则
一个Topic会有多个Message Queue,如果使用Producer的默认配置,这个Producer会轮流向各个Message Queue发送消息。Consumer在消费消息的时候,会根据负载均衡策略,消费被分配到的Message Queue,如果不经过特定的设置,某条消息被发往哪个Message Queue,被哪个Consumer消费是未知的。
如果业务需要我们把消息发送到指定的Message Queue里,比如把同一类型的消息都发往相同的Message Queue,可以用MessageQueueSelector。
1 | package com.louris.springboot; |
发送消息的时候,把MessageQueueSelector的对象作为参数,使用public SendResult send(Message msg, MessageQueueSelector selector, Object arg)函数发送消息即可。在MessageQueueSelector的实现中,根据传入的Object参数,或者根据Message消息内容确定把消息发往那个Message Queue,返回被选中的Message Queue。
对事务的支持
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账,A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银行账户扣除一万元”这个操作同时成功或者同时失败。
RocketMQ采用两阶段提交的方式实现事务消息,TransactionMQProducer处理上面情况的流程是,先发一个“准备从B银行账户增加一万元”的消息,发送成功后做从A银行账户扣除一万元的操作,根据操作结果是否成功,确定之前的“准备从B银行账户增加一万元”的消息是做commit还是rollback,具体流程如下:
略
三个类来支持用户实现事务消息:
LocalTransactionExecutor
TransactionMQProducer
TransactionCheckListener
如何存储队列位置信息
实际运行中的系统,难免会遇到重新消费某条消息、跳过一段时间内的消息等情况。这些异常情况的处理,都和Offset有关。
Offset的含义:
RocketMQ中,一种类型的消息会放到一个Topic里,为了能够并行,一般一个Topic会有多个Message Queue(也可以设置成一个),Offset是指某各Topic下的一条消息在某个Message Queue里的位置,通过Offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后继续处理。
Offset的类结构,主要分为本地文件类型和Broker代存的类型两种。
DefaultMQPushConsumer:
- 默认
CLUSTERING
模式,也就是同一个Consumer group里的多个消费者每人消费一部分,各自收到的消息内容不一样。这种情况下,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore
结构。 BROADCASTING
模式下,每个Consumer都收到这个Topic的全部消息,各个Consumer间相互没有干扰,RocketMQ使用LocalFileOffsetStore
,把Offset存到本地。
OffsetStore使用Json格式存储:
1 | {"OffsetTable":{{"brokerName":"localhost", "QueueId":1,"Topic":"broker1"}:1 |
DefaultMQPushConsumer处理
在使用DefaultMQPushConsumer的时候,不需要关心OffsetStore的事,但是如果PullConsumer,就需要自己处理OffsetStore了。
之前是将Offset存到了内存,没有持久化。
以下为磁盘存储Offset的程序:
1 | package com.louris.springboot.utils; |
自定义日志输出
Log是监控系统状态,排查问题的重要手段,RocketMQ的默认Log存储位置是:${user.home}$/Logs/rocketmqLogs,Log配置文件的设置可以通过JVM启动参数、环境变量、代码中的设置语句这三种方式来配置。
略
分布式消息队列的协调者
对于一个消息队列集群来说,系统由很多台机器组成,每个机器的角色、IP地址都不相同,而且这些信息是变动的。这种情况下,如果一个新的Producer或Consumer加入,怎么配置连接信息呢?NameServer的存在主要是为了解决这类问题,由NameServer维护这些配置信息、状态信息,其他角色都通过NameServer来协同执行。
NameServer的功能
NameServer是整个消息队列中的状态服务器,集群的各个组件通过它来了解全局信息。同时,各个角色的机器都要定期向NameServer上报自己的状态,超时不上报的话,NameServer会认为某个机器出故障不可用了,其他的组件会把这个机器从可用列表移出。
NameServer可以部署多个,相互之间独立,其他角色同时向多个NameServer机器上报状态信息,从而达到热备份的目的。NameServer本身是无状态的,也就是说NameServer中的Broker、Topic等状态信息不会持久存储,都是由各个角色定时上报并存储在内存中的(NameServer支持配置参数的持久化,一般用不到)。
集群状态的存储结构
在org.apache.rocketmq.namesrv.routeinfo的RouteInfoManager类中,有五个变量,集群的状态就保存在这五个变量中
- private final HashMap<String, List
> topicQueueTable:Key是Topic的名称; - private final HashMap<String, BrokerData> brokerAddrTable:Key是BrokerName,BrokerData存储这包括一个BrokerName对应的所属Cluster名称,一个Master Broker和多个Slave Broker的地址信息;
- private final HashMap<String, Set
> clusterAddrTable:Key是ClusterName,集合是一个Cluster对应的BrokerName组成的集合; - private final HashMap<String, BrokerLiveInfo> brokerLiveTable:Key是BrokerAddr;
- private final HashMap<String, List
> filterServerTable:Key是BrokerAddr,list为Filter Server过滤器。Filter Server是过滤服务器,是RocketMQ的一种服务端过渡方式,一个Broker可以有一个或多个Filter Server。这个结构的Key是Broker的地址,Value是和这个Broker关联的多个Filter Server的地址。
状态维护逻辑
当NameServer和Broker的长连接断掉以后,onChannelDestroy函数会被调用,把这个Broker的信息清理出去。
NameServer还有定时检查时间戳的逻辑,Broker向NameServer发送的心跳会更新时间戳,当NameServer检查到时间戳长时间没有更新后,便会触发清理逻辑,每10秒检查一次,时间戳超过2分钟则认为Broker已失效。
各个角色间的交互流程
交互流程源码分析
略
为何不用ZooKeeper
ZooKeeper是Apache的一个开源软件,为分布式应用程序提供协调服务。那为什么RocketMQ要自己造轮子,开发集群的管理程序呢?答案是ZooKeeper的功能很强大,包括自动Master选举等,RocketMQ的架构设计决定了它不需要进行Master选举,用不到这些复杂的功能,只需要一个轻量级的元数据服务器就足够了。
中间件对稳定 性要求很高,RocketMQ的NameServer只有很少的代码,容易维护,所以不需要再依赖另一个中间件,从而减少整体维护成本。
底层通信机制
分布式系统各个角色间的通信效率很关键,通信效率的高低直接影响系统性能,基于Socket实现一个高效的TCP通信协议是很有挑战的。
Remoting模块
顶层接口
1 | // |
具体接口
1 | // |
1 | // |
具体实现类
- NettyRemotingClient
- NettyRemotingServer
两者都继承了NettyRemotingAbstract类,分别实现了上节两个接口
具体略
协议设计和编解码
RocketMQ自己定义了一个通信协议,使得模块间传输的二进制消息和有意义的内容之间互相转换,协议格式
1 | |←4→|←4→|←→|←→| |
- 第一部分是大端4个字节整数,值等于第二、三、四部分长度的总和;
- 第二部分是大端4个字节整数,值等于第三部分的长度;
- 第三部分是通过Json序列化的数据;
- 第四部分是通过应用自定义二进制序列化的数据。
消息的解码过程在RemotingCommand的decode函数里
1 | public static RemotingCommand decode(ByteBuffer byteBuffer) { |
消息编码在RemotingCommand的encode函数里:
1 | public ByteBuffer encode() { |
Netty库
RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体通信实现的,Netty是个事件驱动的网络编程框架,它屏蔽了Java Socket、NIO等复杂细节,用户只需用好Netty,就可以实现一个“网络编程专家+并发编程专家”水平的Server、Client网络程序。应用Netty有一定的门槛,需要了解它的EventLoopGroup、Channel、Handler模型以及各种具体的配置。
消息队列的核心机制
Broker是RocketMQ的核心,大部分“重量级”工作都是由Broker完成的,包括接收Producer发过来的消息、处理Consumer的消费消息请求、消息的持久化存储、消息的HA机制以及服务端过滤功能等。
消息存储和发送
分布式队列因为有高可靠性的要求,所以数据要通过磁盘进行持久化存储。用磁盘存储消息,速度会不会很慢呢?能满足实时性和高吞吐量的要求吗?
磁盘读写特点:
- 顺序写速度:600MB/S
- 随机写速度:100KB/S
两者相差6000倍!因此好的消息队列系统会比普通的消息队列系统速度快多个数量级
举例:
Linux操作系统分为“用户态”和“内核态”,文件操作、网络操作需要涉及这两个形态的切换,免不了进行数据复制,一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read(file, tmp_buf, len);读取本地文件内容;
2)write(socket, tmp_buf, len);将读取的内容通过网络发送出去;
tmp_buf是预先申请的内存,这两个看似简单的操作,实际进行了4次数据复制:
- 从磁盘复制数据到内核态内存;
- 从内核态内存复制到用户态内存(完成read(file, tmp_buf, len));
- 从用户态内存复制到网络驱动的内核态内存
- 从网络驱动的内核态内存复制到网卡中进行传输(完成write(socket, tmp_buf, len));
相应的解决方案为“零拷贝”技术,通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。
RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。
消息存储结构
RocketMQ的具体消息存储结构是怎样的呢?如何尽量保证顺序写的呢?
- RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog;
- ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。
- 每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件,文件地址在
${$storeRoot}\consumequeue\${topicName}\${queueId}\${fileName}
。
CommitLog以物理文件的方式存放,每台Broker上的CommitLog被本机器所有ConsumeQueue共享,文件地址:${user.home}\store\${commitlog}\${fileName}
。在CommitLog中,一个消息的存储长度是不固定的,RocketMQ采取一些机制, 尽量向CommitLog中顺序写,但是随机读。ConsumeQueue的内容也会被写到磁盘里作持久存储。
存储机制这样设计有以下几个好处:
- CommitLog顺序写,可以大大提高写入效率;
- 虽然是随机读,但是利用操作系统的pagecache机制,可以批量地从磁盘读取,作为cache存到内存汇总,加速后续的读取速度。
- 为了保证完全的顺序写,需要ConsumeQueue这个中间结构,因为ConsumeQueue里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性,CommitLog里存储了Consume Queues、Message Key、Tag等所有信息,即使ConsumeQueue丢失,也可以通过commitLog完全恢复出来。
高可用性机制
服务器端Broker的高可用性
- RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的,首先说一下Master和Slave的区别:在Broker的配置文件中,参数brokerId的值为0表明这个Broker是Master,大于0表示这个Broker是Slave,同时brokerRole参数也会说明这个Broker是Master还是Slave。
- Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是Producer只能和Master角色的Broker连接写入消息;Consumer可以连接Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
- RocketMQ目前不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。
消费端Consumer的高可用性
- 在Consumer的配置文件中,并不需要设置是从Master读还是从Slave读,当Master不可用或者繁忙的时候,Consumer会被自动切换到Slave读;
- 有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用。
生产端Producer的高可用性
在创建Topic的时候哦,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。
同步刷盘和异步刷盘
RocketMQ的消息是存储到磁盘上的,这样技能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写入磁盘方式:
- 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入;
- 同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
同步刷盘还是异步刷盘,是通过Broker配置文件里的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH、ASYNC_FLUSH中的一个。
同步复制和异步复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。
- 同步复制方式:等Master和Slave均写成功后才反馈给客户端写成功状态;
- 异步复制方式:只要Master写成功即可反馈给客户端写成功状态。
两者优劣:
- 在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失;
- 在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的一个。
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式,尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把Master和Slave配置成ASYNC_FLUSH的刷盘方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。
可靠性优先的使用场景
顺序消息
顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这3个消息必须按顺序处理才行。顺序消息分为全局顺序消息和部分顺序消息,全局顺序消息指某各Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一个订单ID的三个消息能按顺序消费即可。
全局顺序消息
RocketMQ在默认情况下不保证顺序,比如创建一个Topic,默认八个写队列,八个读队列。这个时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个Consumer,每个Consumer也可能启动多个线程并行处理所以消息被那个Consumer消费,被消费的顺序和写入的顺序是否一致是不确定的。
要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发设置也要是一。简单来说,为了保证整个Topic的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。这时高并发、高吞吐量的功能完全用不上了。
在实际应用中,更多的是像订单类消息那样,只需要部分有序即可。在这种情况下,我们经过合适的配置,依然可以利用RocketMQ高并发、高吞吐量的能力。
部分顺序消息
要保证部分消息有序,需要发送端和消费端配合处理。
- 在发送端,要做到把同一业务ID的消息发送到同一个Message Queue;
- 在消费过程中,要做到从同一个Message Queue读取的消息不被并发处理,这样才能达到部分有序;
发送端使用MessageQueueSelector类来控制把消息发往哪个Message Queue:
1 | private static void sycProducer() throws Exception{ |
消费端通过使用MessageListenerOrderly类来解决单Message Queue的消息被并发处理的问题。
1 | private static void consumer() throws InterruptedException, MQClientException { |
Consumer使用MessageListenerOrderly的时候,下面四个Consumer的设置依旧可以使用:
- setConsumeThreadMin:设置COnsumer的线程数
- setConsumeThreadMax:同上
- setPullBatchSize:指一次从Broker的一个Message Queue获取消息的最大数量,默认值是32
- setConsumeMessageBatchMaxSize:指的是这个Consumer的Executor(也就是调用MessageListener处理的地方)一次传入的消息数(List
msgs这个链表的最大长度),默认值是1。
上述四个参数可以使用,说明MessageListenerOrderly并不是简单地禁止并发处理。在MessageListenerOrderly的实现中,为每个Consumer Queue加个锁,消费每个消息前,需要先获得这个消息对应的Consumer Queue所对应的锁,这样保证了同一时间,同一个Consumer Queue的消息不被并发消费,但不同Consumer Queue的消息可以并发处理。
消息重复问题
对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,也就是所谓的“有且仅由一次”。在鱼和熊掌不可兼得的情况下,RocketMQ选择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
消息重复一般情况下不会发生,但是如果消息量大,网络有波动,消息重复就是个大概率事件。比如Producer有个函数setRetryTimesWhenSendFailed,设置在同步方式下自动重试的次数,默认值是2,这样当第一次发送消息时,Broker端接收到了消息但是没有正确返回发送成功的状态,就造成了消息重复。
解决消息重复有两种方法:
- 第一种方法是保证消费逻辑的幂等性(多次调用和一次调用效果相同);
- 第二种方法是维护一个已消费消息的记录,消费前查询这个消息是否被消费过。
这两种方法都需要使用者自己实现。
动态增减机器
一个消息队列集群由多台机器组成,持续稳定地提供服务,因为业务需求或硬件故障,经常需要增加或减少各个角色的机器。
动态增减NameServer
NameServer是RocketMQ集群的协调者,集群的各个组件是通过NameServer获取各种属性和地址信息的。
主要功能包括:
- 各个Broker定期上报自己的状态信息到NameServer;
- 各个客户端,包括Producer、Consumer,以及命令行工具,通过NameServer获取最新的状态信息。
所以,在启动Broker、生产者和消费者之前,必须告诉它们NameServer的地址,为了提高可靠性,建议启动多个NameServer。
NameServer占用资源不多,可以和Broker部署在同一台机器。有多个NameServer后,减少某个NameServer不会对其他组件产生影响。
有四种方式可以设置NameServer的地址,下面按优先级由高到低依次介绍:
- 通过代码设置;
- 使用Java启动参数设置,对应的option是rocketmq.namesrv.addr;
- 通过Linux环境变量设置,在启动前设置变量:NAMESRV_ADDR;
- 通过HTTP服务来设置,当上述方法都没有使用,程序会向一个HTTP地址发送请求来获取NameServer地址,默认的URL是
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
(淘宝的测试地址),通过rocketmq.namesrv.domain参数来覆盖jmenv.tbsite.net;通过rocketmq.namesrv.domain.subgroup参数来覆盖nsaddr。
第四种方式看似繁琐,但它是唯一支持动态增加NameServer,无须重启其他组件的方式。使用这种方式后其他组件会每隔2分钟请求一次该URL,获取最新的NameServer地址。
动态增减Broker
由于业务增长,需要对集群进行扩容的时候,可以动态增加Broker角色的机器。只增加Broker不会对原有的Topic产生影响,原来创建好的Topic中数据的读写依然在原来的那些Broker上进行。
集群扩容后:
- 一是可以把新建的Topic指定到新的Broker机器上,均衡利用资源;
- 另一种方式是通过updateTopic命令更改现有的Topic配置,在新加的Broker上创建新的队列。比如TestTopic是现有的一个Topic,因为数据量增大需要扩容,新增的一个Broker机器地址是192.168.0.1:10911,这个时候执行命令
sh ./bin/mqadmin updateTopic -b 192.168.0.1:10911 -t TestTopic -n 192.168.0.100:9876
,结果是在新增的Broker机器上,为TestTopic新创建了8个读写队列。
如果因为业务变动或者置换机器需要减少Broker,此时该如何操作呢?
减少Broker要看是否有持续运行的Producer:
1)当一个Topic只有一个Master Broker,停掉这个Broker后,消息的发送肯定会受到影响,需要在停止这个Broker前,停止发送消息。
2)当某个Topic有多个Master Broker,停了其中一个,这时候是否会丢失消息和Producer使用的发送消息的方式有关
- 如果使用同步方式send(msg)发送,在DefaultMQProducer内部有个自动重试逻辑,其中一个Broker停了,会自动向另一个Broker发消息,不会发生丢消息现象。
- 如果使用异步方式发送send(msg, callback),或者用sendOneWay方式,会丢失切换过程中的消息。因为在这两种方式下,producer.setRetryTimesWhenSendFailed设置不起作用,发送失败不会重试。DefaultMQProducer默认每30秒到NameServer请求最新的路由消息,Producer如果获取不到已停止的Broker下的队列消息,后续就自动不再向这些队列发送消息。
3)如果Producer程序能够暂停,在有一个Master和一个Slave的情况下也可以顺利切换。可以关闭Producer后关闭Master Broker,这个时候所有的读取都会被定向到Slave机器,消费消息不受影响。把Master Broker机器置换完后,基于原来的数据启动这个Master Broker,然后再启动Producer程序正常发送消息。
用Linux的kill pid命令就可以正确地关闭Broker,BrokerController下有个shutdown函数,这个函数被加到了ShutdownHook里,当用Linux的kill命令时(不能用kill -9),shutdown函数会先被执行。也可以通过RocketMQ提供的工具(mqshutdown broker)来关闭Broker,它们的原理是一样的。
各种故障对消息的影响
1)Broker正常关闭,启动;
2)Broker异常Crash,然后启动;
3)OS Crash,重启;
4)机器断电,但能马上恢复供电;
5)磁盘损坏;
6)CPU、主板、内存等关键设备损坏。
假设现有的RocketMQ集群,每个Topic都配有多Master角色的Broker供写入,并且每个Master都至少有一个Slave机器(用两台物理机就可以实现上述配置),我们来看看在上述情况下消息的可靠性情况。
第1种情况属于可控的软件问题,内存中的数据不会丢失。如果重启过程中有持续运行的Consumer,Master机器出故障后,Consumer会自动重连到对应的Slave机器,不会有消息丢失和偏差。当Master角色的机器重启以后,Consumer又会重新连接到Master机器(注意在启动Master机器的时候,如果Consumer正在从Slave消费消息,不要停止Consumer。假如此时先停止Consumer后再启动Master机器,然后再启动Consumer,这个时候Consumer就会去读Master机器上已经滞后的offset值,造成消息大量重复。)
如果第1种情况出现时有持续运行的Producer,一台Master出故障后,Producer只能向Topic下其他的Master机器发送消息,如果Producer采用同步发送方式,不会有消息丢失。
第2、3、4种情况属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,如果Master、Slave都配置成SYNC_FLUSH,可以达到和第1种情况相同的效果。
第5、6种情况属于硬件故障,原有机器的磁盘数据可能丢失。如果Master和Slave机器配置成同步复制方式,某一台机器发生5或6的故障,也可以达到消息不丢失的效果。如果Master和Slave机器间是异步复制,两次Sync间的消息会丢失。
总的来说,当设置成:
1)多Master,每个Master带有Slave;
2)主从之间设置成SYNC_MASTER;
3)Producer用同步方式写;
4)刷盘策略设置成SYNC_FLUSH。
就可以消除单点依赖,即使某台机器出现极端故障也不会丢消息。
消息优先级
有些场景、需要应用程序处理几种类型的消息,不同消息的优先级不同。
RocketMQ是个先进先出的队列,不支持消息级别或者Topic级别的优先级。
业务中简单的优先级需求,可以通过间接的方式解决,下面列举三种优先级相关需求的具体处理方法:
(1)第一种比较简单的情况,如果当前Topic里有多种相似类型的消息,比如类型A、B、C,当B、C的消息量大,但是处理速度比较慢的时候,队列里会有很多B、C类型的消息在等候处理,这个时候如果有少量A类型的消息加入,就会排在B、C类型消息后面,需要等待很长时间才能被处理。
如果业务需要A类型的消息被及时处理,可以把这三种相似类型的消息分拆到两个Topic里,比如A类型的消息在一个单独的Topic,B、C类型的消息在另外一个Topic。把消息分到两个Topic中以后,应用程序创建两个Consumer,分别订阅不同的Topic,这样消息A在单独的Topic里,不会因为B、C类型的消息太多而被长时间延时处理。
(2)第二种情况和第一种情况类似,但是不用创建大量的Topic。例如,一个订单处理系统,接收从100家快递门店过来的请求,把这些请求通过Producer写入RocketMQ;订单处理程序通过Consumer从队列里读取消息并处理,每天最多处理1万单。如果这100个快递门店订单量大增,比如门店一接了个大客户,一个上午就发出2万单消息请求,这样其他的99家门店可能被迫等待门店一的2万单处理完,也就是两天后订单才能被处理,显然很不公平。
这时可以创建一个Topic,设置Topic的MessageQueue数量超过100个,Producer根据订单的门店号,把每个门店的订单写入一个MessageQueue。DefaultMQPushConsumer默认是采用循环的方式逐个读取一个Topic的所有MessageQueue,这样如果某家门店订单量大增,这家门店对应的MessageQueue消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。
DefaultMQPushConsuemr默认的pullBatchSize是32,也就是每次从某个MessageQueue读取消息的时候,最多可以读32个。在上面的场景中,为了更加公平,可以把pullBatchSize设置为1。
(3)第三种情况是强优先级需求,上两种情况对消息的“优先级”要求不高,更像一个保证公平处理的机制,避免某类消息的增多阻塞其他类型的消息。现在有一个应用程序同时处理TypeA、TypeB、TypeC三类消息。
- TypeA处于第一优先级,要确保只要有TypeA消息,必须优先处理;
- TypeB处于第二优先级;
- TypeC处于第三优先级。
对于这种要求,或者逻辑更加复杂的要求,就要用户自己编码实现优先级控制:
- 如果上述的三类消息在一个Topic里,可以使用PullConsumer,自主控制MessageQueue的遍历,以及消息的读取;
- 如果上述三类消息在三个Topic下,需要启动三个Consumer,实现逻辑控制三个Consumer的消费。
吞吐量优先的使用场景
在Broker端进行消息过滤
在Broker端进行消息过滤,可以减少无效消息发送到Consumer,少占用网络带宽从而提高吞吐量。Broker端有三种方式进行消息过滤。
消息的Tag和Key
对于一个应用来说,尽可能只用一个Topic,不同的消息子类型用Tag来标识(每条消息只能有一个Tag),服务端基于Tag进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了Tag以后,消费方在订阅消息时,才可以利用Tag在Broker端做消息过滤。
其次是消息的Key。对发送的消息设置好Key,以后可以根据这个Key来查找消息。所以这个Key一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker会创建专门的索引文件,来存储Key到消息的映射,由于是哈希索引,应尽量使得Key唯一,避免潜在的哈希冲突。
Tag和Key的主要差别是使用场景不同,Tag用在Consumer的代码中,用来进行服务端消息过滤,Key主要用于通过命令行查询消息。
通过Tag进行过滤
略
用SQL表达式的方式进行过滤
使用Tag方式过滤虽然高效,但是支持的逻辑比较简单,在构造Message的时候,还可以通过putUserProperty函数来增加多个自定义的属性,基于这些属性可以做复杂的过滤逻辑:
1 | Message msg = new Message("TopicTest", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); |
过滤逻辑:(目前只支持在PushConsumer中实现这种过滤)
1 | private static void consumer() throws InterruptedException, MQClientException { |
类似SQL的过滤表达式、支持如下语法:
- 数字对比,比如>、>=、<、<=、BETWEEN、=;
- 字符串对比,比如=、<>、IN;
- IS NULL or IS NOT NULL;
- 逻辑符号AND、OR、NOT;
支持的数据类型:
- 数字型,比如123,,31415;
- 字符型,比如’abc’、注意必须单引号;
- NULL,这个特殊字符;
- 布尔型,TRUE or FALSE;
SQL表达式方式的过滤需要Broker先读出消息里的属性内容,然后做SQL计算,增大磁盘压力,没有Tag方式高效。
Filter Server方式过滤
Filter Server是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。
- 要使用Filter Server,首先要在启动Broker前在配置文件里加上
filterServerNums=3
这样的配置,Broker在启动的时候,就会在本机启动3个Filter Server进程。 - Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consumer。这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。
- 上传的Java代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。
实现
1 | package com.louris.springboot.utils; |
1 | private static void consumer() throws InterruptedException, MQClientException, IOException { |
在使用Filter Server的Consumer例子中,主要是把实现过滤逻辑的类作为参数传到Broker端,Broker端的Filter Server会解析这个类,然后根据match函数里的逻辑进行过滤。
提高Consumer处理能力
当Consumer的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高Consumer的处理能力。
提高消费并行度
在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度,通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。注意总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息。此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(修改consumeThreadMin
和consumeThreadMax
)。
以批量方式进行消费
某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update某个数据库,一次更新10条的时间会大大小于十次更新1条数据的时间。这是可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的consumeMessageBatchMaxSize
逐个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表。
检测延时情况,跳过非重要消息
Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆积,这个时候选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。
1 | consumer.registerMessageListener(new MessageListenerConcurrently() { |
如代码所示,当某各队列的消息堆积到90000条以上,就直接丢弃,以便快速追上发送消息的进度。
Consumer的负载均衡
上节讲到,想要提高Consumer的处理速度,可以启动多个Consumer并发处理,这个时候就设计如何在多个Consumer之间负载均衡的问题,接下来结合源码分析COnsumer的负载均衡实现。
要做到负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个COnsumer,知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个Consumer。在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。
DefaultMQPushConsumer的负载均衡
DefaultMQPushConsumer的负载均衡不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer启动后,会马上触发一个doRebalance动作;而且在同一个ConsumerGroup里加入新的DefaultMQPushConsumer时,各个Consumer都会被触发doRebalance动作。
具体的负载均衡算法有五种:
- AllocateMessageQueueAveragely(默认)
- AllocateMessageQueueAveragelyByCircle
- AllocateMessageQueueByConfig
- AllocateMessageQueueByMachineRoom
- AllocateMessageQueueConsistentHash
负载均衡的结果与Topic的Message Queue数量,以及ConsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只到Message Queue,把Topic下的所有Message Queue分配到不同的Consumer中,所以Message Queue和Consumer的数量关系,或者整除关系影响负载均衡结果。
以AllocateMessageQueueAveragely策略为例,如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的手,有一个Consumer无法收到消息,其他3个Consumer各处理三分之一的消息。可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的Message Queue数设置为16;
DefaultMQPullConsumer的负载均衡
Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
两个辅助方法可以帮助实现负载均衡
(1)registerMessageQueueListener函数:在有新的Consumer加入或退出时被触发。
1 | DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name"); |
(2)辅助工具类MQPullConsumerScheduleService
1 | package com.louris.springboot.utils; |
提高Producer的发送速度
- 发送一条消息出去要经过三步,一是客户端发送请求到服务器,二是服务器处理该请求,三是服务器向客户端返回应答,一次消息的发送耗时是上述三个步骤的总和。
- 在写对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用,可以采用Oneway方式发送,Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果,用这种方式发送消息的耗时可以缩短到微秒级。
- 另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送,不用担心多Producer同时写会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都能保持较高的写入性能。目前在阿里内部经过调优的服务器上,写入性能达到90万+的TPS。
- 在Linux操作系统层级进行调优,推荐使用EXT4文件系统,IO调度算法使用deadline算法。
另外,IO调度算法也推荐调整为deadline。deadline算法大致思想如下:
- 实现四个队列,其中两个处理正常的read和write操作,另外两个处理超时的read和write操作。
- 正常的read和write队列中,元素按扇区号排序,进行正常的IO合并处理以提高吞吐量。因为IO请求可能会集中在某些磁盘位置,这样会导致新来的请求一直被合并,可能会有其他磁盘位置的IO请求被饿死。
- 超时的read和write的队列中,元素按请求创建时间排序,如果有超时的请求出现,就放进这两个队列,调度算法保证超时(达到最终期限时间)的队列中的IO请求会优先被处理。
系统性能调优的一般流程
系统指完成某项功能的软硬件整体,比如我们用RocketMQ,加上自己写的Producer、Consumer程序,部署到一台服务器上,组成一个消息处理系统。
首先搭建测试环境,查看硬件利用率。把测试系统搭建好以后,要想办法模拟实际使用时的情况,并且逐步增大请求量,同时检测系统的TPS。在请求量增大到一定程度时,系统的QPS达到峰值,这个时候维持这种请求量,保持系统在峰值状态下运行。然后查看此时系统硬件使用情况:
(1)使用TOP命令查看CPU和内存利用率
(2)使用Linux的sar命令查看网卡使用情况
(3)netstat -t查看网卡的连接情况,看是否有大量的连接造成阻塞;
(4)iostat查看磁盘的使用情况。
CPU、网卡还是磁盘被占满,如果网卡满了,可以判断使发送的数据量超出了网卡的带宽等。
如果三者都没有到使用极限,说明CPU利用率没有发挥出来,可能是锁的机制有bug,造成线程阻塞。
对于Java程序来说,接下来可以用Java的profiling工具来找出程序的具体问题,比如jvisualvm、jstack、perfJ等。
通过上述工具,可以逐步定位出事哪些Java线程比较慢,哪个函数占用的时间多,是否因为存在锁造成了忙等的情况,然后通过不断的更改测试,找到影响性能的关键代码,最终解决问题。