当前位置: 首页 > news >正文

网站建设督查工作主持词天堂伞在线观看

网站建设督查工作主持词,天堂伞在线观看,个人证书查询网入口免费,网站开发语言什么意思问题: 使用RocketMQ消息队列,生产者将数据发送出去了,但是生产者一致没接收到(或者是间隔好几分钟,突然接收到一条数据)怎么办?并且通过rocket web控制台查看消息的状态为NOT_ONELINE或者NOT_CONSUME&#…

问题: 使用RocketMQ消息队列,生产者将数据发送出去了,但是生产者一致没接收到(或者是间隔好几分钟,突然接收到一条数据)怎么办?并且通过rocket web控制台查看消息的状态为NOT_ONELINE或者NOT_CONSUME,(如下图) 这种诡异现象该怎么解决?
在这里插入图片描述

1. 先说解决方案

这种情况99%是由于订阅关系不一致导致的,可以排查下程序看看是否有多个消费者使用了同一个group,并且订阅了不同的主题。逻辑图展示如下:
在这里插入图片描述
这种情况只需要将不同的消费者的group区分一下即可, 逻辑关系图变成如下这种:
在这里插入图片描述

到此为止,是不是惊奇的发现,问题解决了?

2. 注意事项:订阅关系一致性

看下Rocket MQ官方文档给出的说明:

定义
消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。

和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。

在这里插入图片描述
这里面只描述出了Tag的一致,事实上下面这种订阅关系也是错误的,同一个group中的两个消费者分别订阅了不同的主题, 违背了定义中的消费行为一致原则:

//Consumer c1
Consumer c1 = ConsumerBuilder.build(groupA);
c1.subscribe(topicA);
//Consumer c2Consumer 
c2 = ConsumerBuilder.build(groupA);
c2.subscribe(topicB);

3. 剖析源码实现,分析原因

从GitHub下载rocketmq源码通过idea打开之后,从官方提供的example进来:
在这里插入图片描述
进入到DefaultMQPushConsumer构造方法中,可以发现初始化了一个DefaultMQPushConsumerImpl类:

    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy) {this.consumerGroup = consumerGroup;this.namespace = namespace;this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;// 这里初始化一个默认的push类型的Consumer实现类defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);}

然后继续进入到DefaultMQPushConsumerImpl类中, 可以看见有一个成员变量MQClientInstance mQClientFactory, 在DefaultMQPushConsumerImpl类的start()(启动消费者)方法中会通过MQClientManager初始化MQClientInstance类.
在这里插入图片描述
接着跳转到MQClientInstance构造方法中, 会发现有这样一行代码, 初始化了一个rebalanceService. 这个rebalanceService就是RocketMQ隔一段时间进行rebalance的核心实现.
在这里插入图片描述
继续剖析RebalanceService类, 发现其实现了Runnable接口, 话不多说, 直接看其 run()方法中做了什么事.

呀! 原来是隔一段时间调用一次上述咱们提到的DefaultMQPushConsumerImpl类中的doRebalance()方法, 搞了半天又绕回来了. … … … … … …

直接进入到这里面, 看看rebalance的逻辑:
在这里插入图片描述
集群部署模式下, 会进行rebalance操作, 根据topic名称和group名称获取到所有的consumer列表.

case CLUSTERING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 这里根据topic名称和Group进行获取到所有的consumerList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}

但是进去这行代码里面发现, topic名称仅仅用来获取Broker的网络地址, 真正获取到所有Consumer列表的是通过Group名称获取的, 看到这里相信大家基本上能够恍然大悟. 回归到上面的问题: 如果一个一个Group中的多个消费者分别订阅了不同的主题, 即: 消费行为不一致, 无论这个属于当前Group中的消费者是否订阅了这个主题, 都会参与rebalance.
在这里插入图片描述
画图解释一下, 假设在同一个Group下, 两个Consumer都分别订阅了Topic1和Topic2, 这种情况订阅关系一致,
在这里插入图片描述
假设消费者1消费Topic2的速度比较快, 经过一次rebalance之后, Consumer订阅的队列逻辑有可能成为这样的:
在这里插入图片描述
此时由于订阅关系的一致性, 整体系统并不会出现问题. 接下来看一种情况, 同一个消费组中的Consumer1 订阅了Topic1, Consumer2订阅了Topic2, 初始情况逻辑关系是这样:
在这里插入图片描述
由于进行rebalance是通过Group获取对应的消费者客户端ID, 因此rebalance之后可能出现Consumer1 指向了Topic2中的某一个队列, 同理, Consumer2指向了Topic1中的队列. 但是这与Consumer中设定的topic不一致, 因此会出现RocketMQ中消息状态为为NOT_COMSUME_YET

(个人通过对源码的简单梳理总结的文章, 如有错误欢迎指正)

http://www.yayakq.cn/news/284774/

相关文章:

  • 呼和浩特 的网站建设免费网址域名
  • 天津网站建设技术托管淘宝客网站如何做排名
  • 东莞营销型网站设计云南建设厅网站公示
  • 门户网站建设运行环境要求商品小程序怎么制作
  • 设备高端网站建设绵阳城区大建设
  • 文化公司网站建设策划书基于android的app的设计与开发
  • 营销型网站北京king 主题WordPress
  • 番禺区住房和建设局网站网站建设包含项目
  • 数码公司网站建设的意义太原网架公司
  • seo做网站招生页面设计模板
  • 云南企业网站建设有限公司沈阳百度网站排名
  • 郑州一凡网站建设西安免费公司网站制作
  • 医药网站前置审批专属头像制作素材图片
  • 网络直播网站开发手机免费建设网站
  • 哪个做app的网站好体育新闻最新消息今天
  • 河南seo网站多少钱wordpress获取文章别名
  • 网站名称管理做网站的云服务器选什么
  • 房山网站建设成都装修网站建设
  • 移动app与网站建设的区别湘潭网站网站建设
  • 苏州著名网站建设做视频网站视频短片
  • 定位网站关键词厦门企业宣传片制作
  • 云网站系统涿州市建设局网站网址是多少
  • 深圳惠州网站建设公司莱芜金点子最新招聘平台
  • 12306网站开发商施工企业有哪些
  • 网站建设项目管理职业技能培训中心
  • 厦门外贸网站建设报价表企业网盘系统
  • 教育企业重庆网站建设台州网站制作套餐
  • 网站建设工程师招聘苏州市城乡建设档案馆网站
  • 网站右侧二维码怀来县建设局网站
  • 口碑做团购网站如果网站没有做icp备案