南昌网站建设优化公司排名,seo外推上排名,怎么样建设自己的网站,做网站公司排名是什么RocketMQ的Broker分为Master和Slave两个角色#xff0c;为了保证高可用性#xff0c;Master角色的机器接收到消息后#xff0c;要把内容同步到Slave机器上#xff0c;这样一旦Master宕机#xff0c;Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实… RocketMQ的Broker分为Master和Slave两个角色为了保证高可用性Master角色的机器接收到消息后要把内容同步到Slave机器上这样一旦Master宕机Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实现的源码。
1 同步属性信息
Slave需要和Master同步的不只是消息本身一些元数据信息也需要同步比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在启动的时候判断自己的角色是否是Slave是的话就启动定时同步任务如代码清单12-1所示。
代码清单12-1 Slave角色定时同步元数据信息 if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() ! null this.messageStoreConfig.getHaMasterAddress().length() 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically false; } else { this.updateMasterHAServerAddrPeriodically true; } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error(ScheduledTask syncAll slave exception, e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } 在syncAll函数里调用syncTopicConfig、getAllConsumerOffset、syncDelayOffset和syncSubscriptionGroupConfig进行元数据同步。我们以syncConsumerOffset为例来看看底层的具体实现如代码清单12-2所示。
代码清单12-2 getAllConsumerOffset具体实现 public ConsumerOffsetSerializeWrapper getAllConsumerOffset( final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBroker-Exception { RemotingCommand request RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); RemotingCommand response this.remotingClient.invokeSync(addr, request, 3000); assert response ! null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class); } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); } getAllConsumerOffset的基本逻辑是组装一个RemotingCommand底层通过Netty将消息发送到Master角色的Broker然后获取Offset信息。
2 同步消息体
下面介绍Master和Slave之间同步消息体内容的方法也就是同步CommitLog内容的方法。CommitLog和元数据信息不同首先CommitLog的数据量比元数据要大其次对实时性和可靠性要求也不一样。元数据信息是定时同步的在两次同步的时间差里如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一致不过这种情况还可以补救手动调整Offset重启Consumer等。CommitLog在高可靠性场景下如果没有及时同步一旦Master机器出故障消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。
主要的实现代码在Broker模块的org.apache.rocketmq.store.ha包中里面包括HAService、HAConnection和WaitNotifyObject这三个类。
HAService是实现commitLog同步的主体它在Master机器和Slave机器上执行的逻辑不同默认是在Master机器上执行见代码清单12-3。
代码清单12-3 根据Broker角色确定是否设置HaMasterAddress if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() ! null this.messageStoreConfig .getHaMasterAddress().length() 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically false; } else { this.updateMasterHAServerAddrPeriodically true; } 当Broker角色是Slave的时候MasterAddr的值会被正确设置这样HAService在启动的时候在HAClient这个内部类中connectMaster会被正确执行如代码清单12-4所示。
代码清单12-4 Slave角色连接Master private boolean connectMaster() throws ClosedChannelException { if (null socketChannel) { String addr this.masterAddress.get(); if (addr ! null) { SocketAddress socketAddress RemotingUtil.string2SocketAddress(addr); if (socketAddress ! null) { this.socketChannel RemotingUtil.connect(socketAddress); if (this.socketChannel ! null) { this.socketChannel.register(this.selector, SelectionKey.OP_READ); } } } this.currentReportedOffset HAService.this.defaultMessageStore.getMaxPhyOffset(); this.lastWriteTimestamp System.currentTimeMillis(); } return this.socketChannel ! null; } 从代码中可以看出HAClient试图通过Java NIO函数去连接Master角色的Broker。Master角色有相应的监听代码如代码清单12-5所示。
代码清单12-5 监听Slave的HA连接 public void beginAccept() throws Exception { this.serverSocketChannel ServerSocketChannel.open(); this.selector RemotingUtil.openSelector(); this.serverSocketChannel.socket().setReuseAddress(true); this.serverSocketChannel.socket().bind(this.socketAddressListen); this.serverSocketChannel.configureBlocking(false); this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } CommitLog的同步不是经过netty command的方式而是直接进行TCP连接这样效率更高。连接成功以后通过对比Master和Slave的Offset不断进行同步。
3 sync_master和async_master
sync_master和async_master是写在Broker配置文件里的配置参数这个参数影响的是主从同步的方式。从字面意思理解sync_master是同步方式也就是Master角色Broker中的消息要立刻同步过去async_master是异步方式也就是Master角色Broker中的消息是通过异步处理的方式同步到Slave角色的机器上的。下面结合代码来分析sync_master下的消息同步如代码清单12-6所示。
代码清单12-6 sync_master下的消息同步 public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { if (BrokerRole.SYNC_MASTER this.defaultMessageStore .getMessageStoreConfig().getBrokerRole()) { HAService service this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { // Determine whether to wait if (service.isSlaveOK(result.getWroteOffset() result .getWroteBytes())) { GroupCommitRequest request new GroupCommitRequest (result.getWroteOffset() result .getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); boolean flushOK request.waitForFlush(this.defaultMessageStore .getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error(do sync transfer other node, wait return, but failed, topic: messageExt .getTopic() tags: messageExt.getTags() client address: messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus .FLUSH_SLAVE_TIMEOUT); } } // Slave problem else { // Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus .SLAVE_NOT_AVAILABLE); } } } } 在CommitLog类的putMessage函数末尾调用handleHA函数。代码中的关键词是wakeupAll和waitForFlush在同步方式下Master每次写消息的时候都会等待向Slave同步消息的过程同步完成后再返回如代码清单12-7所示。putMessage函数比较长仅列出关键的代码。
代码清单12-7 putMessage中调用handleHA public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result null; StoreStatsService storeStatsService this.defaultMessageStore .getStoreStatsService(); String topic msg.getTopic(); int queueId msg.getQueueId(); …… handleDiskFlush(result, putMessageResult, msg); handleHA(result, putMessageResult, msg); return putMessageResult; }