rocketmq源码解析 rocketmq源码部署( 三 )

2.1.2.1
ConsumeMessageOrderlyService#lockMQPeriodically:定时向broker发送批量锁住当前正在消费的队列集合的消息 。
2.1.2.1.1 RebalanceImpl#lockAll:锁住所有正在消息的队列 。
// ConsumeMessageOrderlyService#lockMQPeriodically if (!this.stopped) { this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } // RebalanceImpl#lockAll HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();// 根据brokerName从processQueueTable获取正在消费的队列集合 ...... Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 向Broker发送锁住消息队列的指令 for (MessageQueue mq : lockOKMQSet) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { if (!processQueue.isLocked()) { log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq); } processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } ......2.1.3 MQClientInstance#start:启动MQClientInstance 。过程较复杂,放到大标题四中分析 。
// DefaultMQPushConsumerImpl#start mQClientFactory.start();四.源码分析 – Consumer接收顺序消息(二)
1 MQClientInstance#start:启动客户端实例MQClientInstance 。
// MQClientInstance#start synchronized (this) { switch (this.serviceState) { case CREATE_JUST: ...... // Start pull service 启动拉取消息服务 this.pullMessageService.start(); // Start rebalance service 启动消费端负载均衡服务 this.rebalanceService.start(); ......1.1 PullMessageService#run:启动拉取消息服务 。实际调用的是DefaultMQPushConsumerImpl的pullMessage方法 。
// PullMessageService#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); } // PullMessageService#pullMessage private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest);// 调用DefaultMQPushConsumerImpl的pullMessage } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }1.1.1.1 DefaultMQPushConsumerImpl#pullMessage:拉取消息 。提交到
ConsumeMessageOrderlyService的线程池consumeExecutor中执行 。
// DefaultMQPushConsumerImpl#pullMessage ...... PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; ...... DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); ......1.1.1.1.1.1.1 ConsumeRequest#run:处理消息消费的线程 。
// ConsumeMessageOrderlyService.ConsumeRequest#run List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); ...... long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);// 实际消费消息的地方,回调消息监听器的consumeMessage方法 } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs,messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } ......

推荐阅读