当前位置: 首页 > news >正文

南昌网站建设优化公司排名seo外推上排名

南昌网站建设优化公司排名,seo外推上排名,怎么样建设自己的网站,做网站公司排名是什么RocketMQ的Broker分为Master和Slave两个角色#xff0c;为了保证高可用性#xff0c;Master角色的机器接收到消息后#xff0c;要把内容同步到Slave机器上#xff0c;这样一旦Master宕机#xff0c;Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实…        RocketMQ的Broker分为Master和Slave两个角色为了保证高可用性Master角色的机器接收到消息后要把内容同步到Slave机器上这样一旦Master宕机Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实现的源码。 1 同步属性信息 Slave需要和Master同步的不只是消息本身一些元数据信息也需要同步比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在启动的时候判断自己的角色是否是Slave是的话就启动定时同步任务如代码清单12-1所示。 代码清单12-1 Slave角色定时同步元数据信息 if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) {     if (this.messageStoreConfig.getHaMasterAddress() ! null this.messageStoreConfig.getHaMasterAddress().length() 6) {         this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());         this.updateMasterHAServerAddrPeriodically false;     } else {         this.updateMasterHAServerAddrPeriodically true;     }     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {         Override         public void run() {             try {                 BrokerController.this.slaveSynchronize.syncAll();             } catch (Throwable e) {                 log.error(ScheduledTask syncAll slave exception, e);             }         }     }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); }   在syncAll函数里调用syncTopicConfig、getAllConsumerOffset、syncDelayOffset和syncSubscriptionGroupConfig进行元数据同步。我们以syncConsumerOffset为例来看看底层的具体实现如代码清单12-2所示。 代码清单12-2 getAllConsumerOffset具体实现 public ConsumerOffsetSerializeWrapper getAllConsumerOffset(     final String addr) throws InterruptedException, RemotingTimeoutException,     RemotingSendRequestException, RemotingConnectException, MQBroker-Exception {     RemotingCommand request RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);     RemotingCommand response this.remotingClient.invokeSync(addr, request, 3000);     assert response ! null;     switch (response.getCode()) {         case ResponseCode.SUCCESS: {             return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);         }         default:             break;     }     throw new MQBrokerException(response.getCode(), response.getRemark()); }   getAllConsumerOffset的基本逻辑是组装一个RemotingCommand底层通过Netty将消息发送到Master角色的Broker然后获取Offset信息。 2 同步消息体 下面介绍Master和Slave之间同步消息体内容的方法也就是同步CommitLog内容的方法。CommitLog和元数据信息不同首先CommitLog的数据量比元数据要大其次对实时性和可靠性要求也不一样。元数据信息是定时同步的在两次同步的时间差里如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一致不过这种情况还可以补救手动调整Offset重启Consumer等。CommitLog在高可靠性场景下如果没有及时同步一旦Master机器出故障消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。 主要的实现代码在Broker模块的org.apache.rocketmq.store.ha包中里面包括HAService、HAConnection和WaitNotifyObject这三个类。 HAService是实现commitLog同步的主体它在Master机器和Slave机器上执行的逻辑不同默认是在Master机器上执行见代码清单12-3。 代码清单12-3 根据Broker角色确定是否设置HaMasterAddress if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) {     if (this.messageStoreConfig.getHaMasterAddress() ! null this.messageStoreConfig         .getHaMasterAddress().length() 6) {         this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());         this.updateMasterHAServerAddrPeriodically false;     } else {         this.updateMasterHAServerAddrPeriodically true;     }   当Broker角色是Slave的时候MasterAddr的值会被正确设置这样HAService在启动的时候在HAClient这个内部类中connectMaster会被正确执行如代码清单12-4所示。 代码清单12-4 Slave角色连接Master private boolean connectMaster() throws ClosedChannelException {     if (null socketChannel) {         String addr this.masterAddress.get();         if (addr ! null) {             SocketAddress socketAddress RemotingUtil.string2SocketAddress(addr);             if (socketAddress ! null) {                 this.socketChannel RemotingUtil.connect(socketAddress);                 if (this.socketChannel ! null) {                     this.socketChannel.register(this.selector, SelectionKey.OP_READ);                 }             }         }         this.currentReportedOffset HAService.this.defaultMessageStore.getMaxPhyOffset();         this.lastWriteTimestamp System.currentTimeMillis();     }     return this.socketChannel ! null; }   从代码中可以看出HAClient试图通过Java NIO函数去连接Master角色的Broker。Master角色有相应的监听代码如代码清单12-5所示。 代码清单12-5 监听Slave的HA连接 public void beginAccept() throws Exception {     this.serverSocketChannel ServerSocketChannel.open();     this.selector RemotingUtil.openSelector();     this.serverSocketChannel.socket().setReuseAddress(true);     this.serverSocketChannel.socket().bind(this.socketAddressListen);     this.serverSocketChannel.configureBlocking(false);     this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } CommitLog的同步不是经过netty command的方式而是直接进行TCP连接这样效率更高。连接成功以后通过对比Master和Slave的Offset不断进行同步。 3 sync_master和async_master sync_master和async_master是写在Broker配置文件里的配置参数这个参数影响的是主从同步的方式。从字面意思理解sync_master是同步方式也就是Master角色Broker中的消息要立刻同步过去async_master是异步方式也就是Master角色Broker中的消息是通过异步处理的方式同步到Slave角色的机器上的。下面结合代码来分析sync_master下的消息同步如代码清单12-6所示。 代码清单12-6 sync_master下的消息同步 public void handleHA(AppendMessageResult result,     PutMessageResult putMessageResult, MessageExt messageExt) {     if (BrokerRole.SYNC_MASTER this.defaultMessageStore         .getMessageStoreConfig().getBrokerRole()) {         HAService service this.defaultMessageStore.getHaService();         if (messageExt.isWaitStoreMsgOK()) {             // Determine whether to wait             if (service.isSlaveOK(result.getWroteOffset() result                 .getWroteBytes())) {                 GroupCommitRequest request new GroupCommitRequest                     (result.getWroteOffset() result                     .getWroteBytes());                 service.putRequest(request);                 service.getWaitNotifyObject().wakeupAll();                 boolean flushOK                     request.waitForFlush(this.defaultMessageStore                         .getMessageStoreConfig().getSyncFlushTimeout());                 if (!flushOK) {                     log.error(do sync transfer other node, wait return,                         but failed, topic: messageExt                         .getTopic() tags:                         messageExt.getTags() client address:                         messageExt.getBornHostNameString());                     putMessageResult.setPutMessageStatus(PutMessageStatus                         .FLUSH_SLAVE_TIMEOUT);                 }             }             // Slave problem             else {                 // Tell the producer, slave not available                 putMessageResult.setPutMessageStatus(PutMessageStatus                     .SLAVE_NOT_AVAILABLE);             }         }     } }   在CommitLog类的putMessage函数末尾调用handleHA函数。代码中的关键词是wakeupAll和waitForFlush在同步方式下Master每次写消息的时候都会等待向Slave同步消息的过程同步完成后再返回如代码清单12-7所示。putMessage函数比较长仅列出关键的代码。 代码清单12-7 putMessage中调用handleHA public PutMessageResult putMessage(final MessageExtBrokerInner msg) {     // Set the storage time     msg.setStoreTimestamp(System.currentTimeMillis());     // Set the message body BODY CRC (consider the most appropriate setting     // on the client)     msg.setBodyCRC(UtilAll.crc32(msg.getBody()));     // Back to Results     AppendMessageResult result null;     StoreStatsService storeStatsService this.defaultMessageStore         .getStoreStatsService();     String topic msg.getTopic();     int queueId msg.getQueueId();   ……     handleDiskFlush(result, putMessageResult, msg);     handleHA(result, putMessageResult, msg);     return putMessageResult; }
http://www.yayakq.cn/news/1329/

相关文章:

  • 房地产网站建设分析新网做网站怎么上传
  • 著名的wordpress网站本地网站建设DW
  • 怎样提高网站访问速度python django做的网站
  • 网站是做推广好还是优化好深圳龙华建设工程交易中心网站
  • 怎么在传奇网站上做宣传给企业做网站推广好么
  • 湛江网站模板新站快速收录
  • 有哪些做网站的公司好龙门城乡规划建设局网站
  • 视频网站做游戏分发海安建设局网站
  • 二七网建站关于做网站的合同
  • 营销型网站建设必备功能容桂网站制作公司
  • 电子商务网站设计原理名词解释网络规划设计师属于什么职称
  • gta5买办公室 网站正在建设不建网站可不可以做cpa
  • s001网站建设怎么做外贸企业网站
  • wordpress雪花网站搜索优化技巧
  • 购物网站常用功能模块介绍美食网站需求分析
  • 上海创新网站建设网站建设费支付请示
  • 电商网站哪家做的好建设网站平台需要什么硬件配置
  • 漳州北京网站建设公司哪家好佛山小程序开发平台
  • 网站底部版权信息模板WordPress采集淘宝头条插件
  • 建设银行网站公告在哪iis 设置网站不能访问
  • 碑林区营销型网站建设还有哪些媲美wordpress框架
  • 阿里云网站建设部署与发布试题答案wordpress 搭建教育
  • 怎么在国税网站上做实名认证哪个网站可以做图交易平台
  • 沈阳制作网站的人怎么做免流网站
  • 加强网站互动交流平台建设自查网站站做地图软件
  • 做网站推广的流程富利建设集团有限公司网站
  • asp做网站主要技术360帝国模板网欢迎大家来访_济南网站建设推广_济南 去114网
  • 网站制作怎么做搜索栏上海注册建网站
  • 做设计在哪个网站投递简历wordpress的商城网站制作公司
  • 建湖网站设计沈阳seo网站管理