当前位置: 首页 > news >正文

漳州网站建设 林做的网站在百度上搜不出来

漳州网站建设 林,做的网站在百度上搜不出来,wordpress段代码,超简单做网站软件背景 kafka广播消息的时候为了保证groupId不重复,再创建的时间采用前缀时间戳的形式,这样可以保证每次启动的时候是创建的新的,但是 会出现一个问题:就是每次停机或者重启都会新建一个应用实例,关闭应用后并不会删除…

背景

kafka广播消息的时候为了保证groupId不重复,再创建的时间采用前缀+时间戳的形式,这样可以保证每次启动的时候是创建的新的,但是

会出现一个问题:就是每次停机或者重启都会新建一个应用实例,关闭应用后并不会删除kafka下面的消费组,导致消费组越来越多,目前

我们有promethes监控kafka消息偏移,一直没有消费的消费组就会进行报警;

解决思路

既然是没有删除消费组就通过优雅停机,应用关闭前采用java的api操作kafka消费组,进行删除

代码实现

1)编写类实现DisposableBean接口,实现destroy方法,注意每个项目定义的id会不一样,此例子中 id = “cfgs-broadcast”

package com.simo.vsim.cfgs.init;import com.alibaba.nacos.api.config.annotation.NacosValue;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;@Data
@Component
@Slf4j
public class ApplicationListen implements InitializingBean, DisposableBean {@Resourceprivate KafkaListenerEndpointRegistry registry;@NacosValue(value = "${spring.kafka.bootstrap-servers}", autoRefreshed = true)private String servers;@Overridepublic void destroy()  {MessageListenerContainer listenerContainer = registry.getListenerContainer("cfgs-broadcast");String groupId = listenerContainer.getGroupId();Map<String, Object> props = new HashMap<>(1);props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers);AdminClient adminClient = AdminClient.create(props);DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(groupId));KafkaFuture resultFuture = deleteConsumerGroupsResult.all();try {resultFuture.get();log.info("kafka关闭消费组="+groupId);} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}adminClient.close();}@Overridepublic void afterPropertiesSet() {}
}

2)接收kafka广播消息的时候指定容器id,用于第一步通过id进行删除,id = “cfgs-broadcast”

/*** groupId不一样代表广播模式,earliest 可能重复消费,latest可能漏消费* @param message* @param ack*/
@KafkaListener(containerFactory = "manualImmediateListenerContainerFactory" , topics = {"${kafka.topic.cfgs-broadcast}"},properties = {"auto.offset.reset=latest"},groupId = "cfgs-broadcast-" + "#{T(java.lang.System).currentTimeMillis()}",idIsGroup = false,id = "cfgs-broadcast")
public void onMessageManualBroadcast(List<Object> message, Acknowledgment ack){message.forEach(item -> handleMsg(2,item));//直接提交offsetack.acknowledge();
}

效果

1)正常启动有这个消费组:cfgs-broadcast-1696754926097

2)重新启动,通过日志显示已经删除(k8s默认是优雅停机)
在这里插入图片描述
如果是iead直接关闭下,不要一下子点击两下停止,点击一次是优雅停机,连续点击2次就是kill -9的效果,就无法看到效果
![在这里插入图片描述](https://img-blog.csdnimg.cn/d36947cdd8f048acaa886eadafeaa34b.png

3)查看kafka消费组,确实已经删除

http://www.yayakq.cn/news/156878/

相关文章:

  • 做网站维护要什么专业重庆市场所码图片
  • 网站建设哪专业口碑宣传
  • wordpress 站内信国家企业信用信息网查询
  • 宝山网站建设服务线上推广方案怎么写
  • 无锡专业网站制作网站建设中 意思
  • 网站建设模板可用吗wordpress插件 一键登录
  • 使用vue做简单网站教程寻找东莞微信网站建设
  • 网站建设策划书编制微信公众平台微网站怎么做
  • 手机网站关键词排名查询网站的搭建流程
  • 郑州网站顾问热狗网vs2010网站开发示例
  • 免费企业网站源码生成wordpress 随机阅读数
  • 成绩分析智能网站怎么做外贸企业网站设计公司
  • 怎样做后端数据传输前端的网站网站建设艾瑞市场分析
  • 对接标准做好门户网站建设重庆企业网站推广代理
  • 厦门的企业网站网站建设分析魅族
  • 淮安市网站企业网站营销的实现方式
  • 网站建设证据保全wordpress答题跳转
  • 东莞模板网站设计wordpress 文章标题外链
  • 如何购买网站流量北京市建设工程发包承包交易中心网站
  • 特效素材免费下载网站详情页设计思路遵循哪五个营销环节
  • 做推广效果哪个网站好公司网站一般用什么软件做
  • 关于网站建设的广告语做网站环境配置遇到的问题
  • 万网网站建设选哪个好做摄影网站的公司
  • 加强检察门户网站建设情况南宁seo主管
  • 织梦做网站教程vps服务器怎么创建多个网站
  • 哈密建设集团有限责任公司网站扬州建设局网站
  • 网站建设包装策略大庆网站建设优化
  • 个人网站设计教程12123互联网服务平台
  • 淄博网站建设专家静安建设机械网站
  • 建设外卖网站规划书免费建网站广告语