当前位置: 首页 > news >正文

医美行业网站建设wordpress导航菜单

医美行业网站建设,wordpress导航菜单,国家最新政策,多用户网站管理系统目录 一、引入Kafka的依赖 二、配置Kafka 三、创建主题 1、自动创建(不推荐) 2、手动动创建 四、生产者代码 五、消费者代码 六、常用的KafKa的命令 Kafka是一个高性能、分布式的消息发布-订阅系统,被广泛应用于大数据处理、实时日志分析等场景。Spring B…

       

目录

一、引入Kafka的依赖

二、配置Kafka

三、创建主题

1、自动创建(不推荐)

2、手动动创建

四、生产者代码

五、消费者代码  

六、常用的KafKa的命令


        Kafka是一个高性能、分布式的消息发布-订阅系统,被广泛应用于大数据处理、实时日志分析等场景。Spring Boot作为目前最流行的Java开发框架之一,其简洁的配置和丰富的工具使得与Kafka的集成变得更加容易。本文将介绍如何使用Spring Boot整合Kafka,实现高效的数据处理和消息传递。

一、引入Kafka的依赖

       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

二、配置Kafka

spring:kafka:bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,这里有三个地址,用逗号分隔。listener:ack-mode: manual_immediate #设置消费者的确认模式为manual_immediate,表示消费者在接收到消息后立即手动确认。concurrency: 3  #设置消费者的并发数为3missing-topics-fatal: false  #设置为false,表示如果消费者订阅的主题不存在,不会抛出异常。producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 设置消息键的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #设置消息值的序列化器acks: 1  #一般就是选择1,兼顾可靠性和吞吐量 ,如果想要更高的吞吐量设置为0,如果要求更高的可靠性就设置为-1consumer:auto-offset-reset: earliest #设置为"earliest"表示将从最早的可用消息开始消费,即从分区的起始位置开始读取消息。enable-auto-commit: false #禁用了自动提交偏移量的功能,为了避免出现重复数据和数据丢失,一般都是手动提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置消息键的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #设置消息值的反序列化器

注:kafka的acks有三个值,可以根据实际情况和需求平衡消息系统的吞吐量和数据安全性,来选择对应的值。

  • acks=0:这是最不可靠的模式。当设置为acks=0时,生产者在发送消息后不会等待任何服务器端的确认响应。这种模式下,生产者可以迅速继续发送下一批消息,效率最高,但风险也最大。如果在此模式下发生网络问题或broker故障,发送的消息可能会永久丢失,生产者无法得知消息是否成功到达Kafka broker。因此,这种配置适合于能够容忍少量数据丢失的场景,例如实时数据分析或生成非关键的实时报表。
  • acks=1:这是默认的配置模式,也是一种折衷方案。在这种模式下,生产者会等待分区的领导者节点(leader)确认消息已经成功写入磁盘,才会发送确认信息给生产者。这提高了数据的安全性,因为只要领导者节点保存了消息,即使跟随者(replicas)没有及时同步,消息也不会丢失。然而,如果领导者在同步给所有追随者之前崩溃,那么尚未同步的副本将无法获取该消息,仍然存在消息丢失的风险。
  • acks=all或-1:这是最可靠的模式。在这个模式下,生产者不仅需要领导者节点确认,还会等待所有同步副本(In-sync replicas, ISR)都确认写入消息后才会收到确认。这极大地增强了数据的持久性保证,确保了即使在多个节点故障的情况下,消息也不会丢失。此模式适用于数据可靠性要求非常高的场景,如金融交易系统或重要的日志记录

三、创建主题

    1、自动创建(不推荐)

         不存在的主题,会自动创建,分区数和副本数均为默认值。而默认值可能会不符合某些场景的要求。

在kafka的安装目录conf目录下找到该配置文件server.properties,添加如下配置:
num.partitions=3 #默认3个分区
auto.create.topics.enable=true #开启自动创建主题
default.replication.factor=3 #默认3个副本

    2、手动动创建

         在kafka的安装目录bin目录下,执行如下命令: 

//创建一个有三个分区和三个副本,名为zhuoye的主题
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye 

四、生产者代码

@Slf4j
@Component
public class ALiYunServiceImpl implents IALiYunService {@Autowiredprivate KafkaTemplate kafkaTemplate;@Autowiredprivate ExecutorService executorService;String topicName = "zhuoye";@Overridepublic void queryECSMetricInfo() {//发送到kafka的消息集合,因为使用了多线程,并且在多线程中往该集合进行添加操作,所以需要线程安全的List<Message> messages = Collections.synchronizedList(new ArrayList<>());boolean flag = true;//获取上次查询时间Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;Long endTime = System.currentTimeMillis();try {//查询出所有的运行中的实例List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");if (CollectionUtils.isEmpty(cloudInstances)) {return;}//定义计数器CountDownLatch latch = new CountDownLatch(cloudInstances.size());//遍历查询for (CloudInstanceAssetDto instance : cloudInstances) {executorService.submit(() -> {try {//获取内网流出带宽,并将结果封装到消息集合中dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,startTime, endTime, instance, messages);} catch (Exception e) {log.error("获取ECS的指标数据-多线程处理任务异常!", e);} finally {latch.countDown();}});}//等待任务执行完毕latch.await();//将最终的消息集合发送到kafkaif (CollectionUtils.isNotEmpty(messages)) {for (int i = 0; i < messages.size(); i++) {if (StringUtils.isNotBlank(messages.get(i).getValue())&& "noSuchInstance".equals(messages.get(i).getValue())) {continue;}kafkaTemplate.send(topicName,  messages.get(i));}}} catch (Exception e) {flag = false;log.error("获取ECS的指标数据失败", e);}//更新记录上次查询时间if (flag) {QueryTimeRecord queryTimeRecord = new QueryTimeRecord();queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟queryTimeRecordMapper.updateByBelongId(queryTimeRecord);}}

这个时候,如果你想看有没有把消息发送到kafka的指定主题可以使用如下命令:

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye 

五、消费者代码  

@Slf4j
@Component
public class KafkaConsumer {// 消费监听@KafkaListener(topics = "zhuoye",groupId ="zhuoye-aliyunmetric")public void consumeExtractorChangeMessage(ConsumerRecord<String, String> record, Acknowledgment ack){try {String value = record.value();//处理数据,存入openTsDb.................................ack.acknowledge();//手动提交}catch (Exception e){log.error("kafa-topic【zhuoye】消费阿里云指标源消息【失败】");log.error(e.getMessage());}}
}

六、常用的KafKa的命令

//创建主题
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye
//查看kafka是否接收对应的消息
 kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye
// 修改kafka-topic分区数
./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic zhuoye
// 查看topic分区数
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic zhuoye
// 查看用户组消费情况
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe

http://www.yayakq.cn/news/265380/

相关文章:

  • 网站快速备案安全吗php不用框架怎么做网站
  • 上传wordpress网站张掖艺能网站建设
  • 做特产的网站开张怎么宣传如何加强校园网站建设
  • 电商 网站建设文字陇南网站设计
  • 高端网站改版顾问前端开发主要做什么
  • 大连网站建设企业网站建设的风险分析
  • wdcp 网站迁移本人有资金寻求合作
  • 机关单位 网站建设方案策划书建站哪个平台好用
  • 货架 网站建设 牛商网石景山区公司网站建设
  • 域名绑定网站需要多久网站开店前的四项基本建设
  • 网站后台账号密码破解郑州营销策划公司排行榜
  • wap网站 开发WordPress邮箱内容修改
  • 建设银行投资网站首页网页设计入门书籍
  • 网站面包屑导航论坛内网站怎么建设
  • 绿色食品网站建设论文调用wordpress
  • 著名建筑网站学网站开发看什么书
  • 设计 微网站做应用级网站用什么语言好
  • 网站怎么做才能赚钱wordpress教材
  • 个人的网站建设目标腾讯街景地图实景下载
  • 如何来建设网站怎样进网站空间
  • 照明做外贸的有那些网站广州开发区投资集团
  • 三拼域名做网站长不长吉林大学建设工程学院网站
  • 滁州建设管理网站宁波建站平台
  • 做企业网站需要准备什么佛山大沥
  • 南城网站建设公司方案蓝色主题的网站模板
  • 全国建设工程执业资格注册中心网站ps网站页面设计教程
  • 甘肃省通信管理局 网站备案网站建设与电子商务的教案
  • 为什么要做个人网站互联网营销师是哪个部门发证
  • 静态页优秀网站找别人做网站怎么防止别人修改
  • 上海网站推广价格广州市 住房建设局网站