1.2 RebalanceService#run:启动消息端负载均衡服务 。
// RebalanceService#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } // MQClientInstance#doRebalance public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } } // DefaultMQPushConsumerImpl#doRebalance public void doRebalance() { if (!this.pause) { this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } }1.2.1.1.1 RebalanceImpl#doRebalance:负载均衡服务类处理 。
// RebalanceImpl#doRebalance public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); } // RebalanceImpl#rebalanceByTopic switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);// 根据Toipc去除queue if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { ...... // RebalanceImpl#updateProcessQueueTableInRebalance this.dispatchPullRequest(pullRequestList);// RebalancePushImpl分发消息1.2.1.1.1.1.1.1 RebalancePushImpl#dispatchPullRequest:RebalancePushImpl分发 。
// RebalancePushImpl#dispatchPullRequest public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }五.总结
相比Producer的发送流程,Consumer的接收流程稍微复杂一点 。通过上面的源码分析,可以知道RocketMQ是怎样保证消息的有序的:
1.通过ReblanceImp的lockAll方法,每隔一段时间定时锁住当前消费端正在消费的队列 。设置本地队列ProcessQueue的locked属性为true 。保证broker中的每个消息队列只对应一个消费端;
2.另外,消费端也是通过锁,保证每个ProcessQueue只有一个线程消费 。
推荐阅读
- jdk源码阅读顺序 jdk源码垃圾
- 异星探险家astroneer蓄电池作用解析 蓄电池干什么用的?
- 异星探险家astroneer过滤器作用解析 过滤器有什么作用
- 拉结尔天赋树加点流派解析 弓箭手满级成型加点攻略_天赋选择
- 《拉结尔》斗兽之王赛季套装解析 磐石赛季装备一览
- 拉结尔控制魂灯功能解析 如何利用魂灯快速提升英雄等级
- 侍魂胧月传说红魔炼狱怎么打 红魔炼狱玩法解析
- 侍魂胧月传说试合竞技怎么打 试合竞技玩法解析
- 我的世界恶魂为什么会哭 哭泣的恶魂解析
- 异星探险家Astroneer基地板作用解析 基地板有什么作用