当前位置: 首页 > news >正文

深圳制作网站培训机构中山做网站多少钱

深圳制作网站培训机构,中山做网站多少钱,网络推广平台软件app,公众号制作135Kafka在Java项目中的应用 Docker 安装Kafka 一.首先需要安装docker,可看这篇文章安装docker 二.拉取zookeeper和KafKa镜像 docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafkaKafka组件需要向zookeeper进行注册,所以也需要安装zookeeper 三.启动zookeeper…

Kafka在Java项目中的应用

Docker 安装Kafka

一.首先需要安装docker,可看这篇文章安装docker

二.拉取zookeeper和KafKa镜像

docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka

Kafka组件需要向zookeeper进行注册,所以也需要安装zookeeper

三.启动zookeeper、kafka组件

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeperdocker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

启动成功界面如下,status即为running(运行中)
在这里插入图片描述

四.创建Springboot项目

4.1 添加依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies>

4.2 application.yml文件

server:port: 9090
spring:kafka:bootstrap-servers: localhost:9092consumer:# 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)auto-offset-reset: earliestproducer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializerretries: 3  #  重试次数
kafka:topic:my-topic: my-topicmy-topic2: my-topic2

4.3 创建实体类Book

public class Book {private Long id;private String name;public Book() {}public Book(Long id, String name) {this.id = id;this.name = name;}public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return "Book{" +"id=" + id +", name='" + name + '\'' +'}';}
}

4.4 配置KafKa信息

@Configuration
public class KafkaConfig {@Value("${kafka.topic.my-topic}")String myTopic;@Value("${kafka.topic.my-topic2}")String myTopic2;/*** JSON消息转换器*/@Beanpublic RecordMessageConverter jsonConverter() {return new StringJsonMessageConverter();}/*** 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。*/@Beanpublic NewTopic myTopic() {return new NewTopic(myTopic, 2, (short) 1);}@Beanpublic NewTopic myTopic2() {return new NewTopic(myTopic2, 1, (short) 1);}
}

4.5 controller代码

@RestController
@RequestMapping(value = "/book")
public class BookController {@Value("${kafka.topic.my-topic}")String myTopic;@Value("${kafka.topic.my-topic2}")String myTopic2;BookProducerService producer;private AtomicLong atomicLong = new AtomicLong();BookController(BookProducerService producer) {this.producer = producer;}@GetMapping("/send")public String sendMessageToKafkaTopic(@RequestParam("name") String name) {this.producer.sendMessage(myTopic, new Book(atomicLong.addAndGet(1), name));this.producer.sendMessage(myTopic2, new Book(atomicLong.addAndGet(1), name));return name+" : 消息已经发送!";}
}

4.6 book 的生成者业务

@Service
public class BookProducerService {private static final Logger logger = LoggerFactory.getLogger(BookProducerService.class);private final KafkaTemplate<String, Object> kafkaTemplate;//通过构造方法进行注入public BookProducerService(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, Object o) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息",result.getRecordMetadata().topic(),result.getRecordMetadata().partition()),ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));}}

4.7 book的消费者业务

@Service
public class BookConsumerService {@Value("${kafka.topic.my-topic}")private String myTopic;@Value("${kafka.topic.my-topic2}")private String myTopic2;private final Logger logger = LoggerFactory.getLogger(BookProducerService.class);private final ObjectMapper objectMapper = new ObjectMapper();@KafkaListener(topics = {"${kafka.topic.my-topic}"}, groupId = "group1")public void consumeMessage(ConsumerRecord<String, String> bookConsumerRecord) {try {Book book = objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString());} catch (JsonProcessingException e) {e.printStackTrace();}}@KafkaListener(topics = {"${kafka.topic.my-topic2}"}, groupId = "group2")public void consumeMessage2(Book book,ConsumerRecord<String,String> bookConsumerRecord) throws JsonProcessingException {Book value = objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), value.toString());logger.info("消费者消费{}的消息 -> {}", myTopic2, book.toString());}
}

代码整体目录如下

在这里插入图片描述

4.8 启动成功界面

在这里插入图片描述

4.9 浏览器访问

在这里插入图片描述

4.10 控制台显示

在这里插入图片描述

至此.基于KafKa的Springboot项目简单应用已经完成,后续需要对Kafka进行更深的学习以及应用!

http://www.yayakq.cn/news/857987/

相关文章:

  • 美容平台网站建设动易cms网站后台很慢是什么原因
  • 西宁做网站君博优选wordpress 本地打开很慢
  • 网站 建设初步公司管理软件用什么好
  • 做网站怎样租用虚拟空间县城做网站的多么
  • 休闲旅游网站建设开发app需要钱吗
  • 万州做网站多少钱铁岭哪家做营销型网站
  • 网站内容建设ppt模板做网站哪家公司便宜
  • 首钢建设工资网站网站建设免费书
  • 网站建设 职责大众汽车网站建设
  • 论坛网站平台建设方案广州南建站时间
  • 聊城做网站哪里好建个好网站
  • 网站开发编码选择一般是女性门户网站源码
  • 网站不备案可以做淘宝联盟吗php做网站参考文献
  • 网站定制首页费用京东方软件开发工程师待遇
  • 怎么做网站后台企业网站 的网络营销方法有
  • 网站建设的目标和需求分析成都电商网站开发公司
  • 成都制作网站软件网页设计与网站建设完全实用手册
  • 建设网站挂广告赚钱网站鼠标悬停动态效果
  • 常州语言网站建设5成都网站建设
  • 开发软件的应用seo教程自学网
  • 苏州网站建设需要多少钱公司注册后怎么做网站
  • 荆门做网站的公司贵阳软件开发公司在哪里
  • 单页营销网站后台黄骅市人事考试网
  • 网站建设金手指排名稳定怎么做企业网站排名
  • wordpress购买会员资格新网站上线 怎么做seo
  • 农业门户网站建设目标wordpress侧边栏在哪调
  • 网站设计的基本原则wordpress 分类目录 丢失
  • 网站 空间 服务器 免费北京装饰公司招聘信息
  • c 做的网站网站架构设计图
  • 徐汇网站建设推广义乌前十跨境电商公司