rocketmq源码解析 rocketmq源码部署( 二 )

1.4.1.1 MQClientAPIImpl#sendMessageSync:发送同步消息 。
// MQClientAPIImpl#sendMessageSync private SendResult sendMessageSync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); }1.4.1.1.1 NettyRemotingClient#invokeSync:构造RemotingCommand,调用的方式是同步 。
// NettyRemotingClient#invokeSyncRemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response;三.源码分析 – Consumer接收顺序消息(一)
1 DefaultMQPushConsumer#registerMessageListener:把Consumer传入的消息监听器加入到messageListener中 。
// DefaultMQPushConsumer#registerMessageListener public void registerMessageListener(MessageListenerOrderly messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }1.1 DefaultMQPushConsumerImpl#registerMessageListener:把Consumer传入的消息监听器加入到messageListenerInner中 。
// DefaultMQPushConsumerImpl#registerMessageListener public void registerMessageListener(MessageListener messageListener) { this.messageListenerInner = messageListener; }2 DefaultMQPushConsumer#start:启动Consumer 。
// DefaultMQPushConsumer#start public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); }2.1 DefaultMQPushConsumerImpl#start:启动ConsumerImpl 。
// DefaultMQPushConsumerImpl#start switch (this.serviceState) { case CREATE_JUST:// 刚刚创建 ...... if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {// 有序消息服务 this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 并发无序消息服务 this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } ...... this.consumeMessageService.start();// 启动消息服务 ...... mQClientFactory.start();// 启动MQClientInstance ......2.1.1 new
ConsumeMessageOrderlyService():构造顺序消息服务 。
// ConsumeMessageOrderlyService#ConsumeMessageOrderlyService public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor(// 主消息消费线程池,正常执行收到的ConsumeRequest 。多线程 this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); }2.1.2
ConsumeMessageOrderlyService#start:启动消息队列客户端实例 。
// DefaultMQPushConsumerImpl#start this.consumeMessageService.start(); // ConsumeMessageOrderlyService#start public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically();// 定时向broker发送批量锁住当前正在消费的队列集合的消息 } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }

推荐阅读