当前位置: 首页 > news >正文

3d效果图教程网站广东网站建设的

3d效果图教程网站,广东网站建设的,字节跳动员工待遇,北京外贸网站开发Spring Boot | 如何使用 Apache Kafka 消费 JSON 消息 Apache Kafka 是一个流处理系统,可让您在进程、应用程序和服务器之间发送消息。在本文中,我们将了解如何使用 Apache Kafka 在 Spring Boot 应用程序的控制台上发布 JSON 消息。 为了了解如何创建 …

Spring Boot | 如何使用 Apache Kafka 消费 JSON 消息

Apache Kafka 是一个流处理系统,可让您在进程、应用程序和服务器之间发送消息。在本文中,我们将了解如何使用 Apache Kafka 在 Spring Boot 应用程序的控制台上发布 JSON 消息。

为了了解如何创建 Spring Boot 项目,请参阅本文。

工作步骤 

步骤 1:

转到Spring 初始化程序并创建具有以下依赖项的启动项目: 
Spring for Apache Kafka

步骤 2:

在 IDE 中打开项目并同步依赖项。在本文中,我们将创建一个学生模型,我们将在其中发布学生详细信息。因此,创建一个模型类Student。添加数据成员并创建构造函数并重写toString方法以查看 JSON 格式的消息。以下是学生类的实现:

  • 学生模型

// Java program to implement a

// student class

 

// Creating a student class

public class Student {

 

    // Data members of the class

    int id;

    String firstName;

    String lastName;

 

    // Constructor of the student

    // Class

    public Student()

    {

    }

 

    // Parameterized constructor of

    // the student class

    public Student(int id, String firstName,

                   String lastName)

    {

        this.id = id;

        this.firstName = firstName;

        this.lastName = lastName;

    }

 

    @Override

    public String toString()

    {

        return "Student{"

            + "id = " + id

            + ", firstName = '" + firstName + "'"

            + ", lastName = '" + lastName + "'"

            + "}";

    }

}

步骤 3:

创建一个新的类Config并添加注释@Configuration@EnableKafka。现在使用 Student 类对象创建 Bean ConsumerFactoryConcurrentKafkaListenerContainerFactory 。

  • 配置类

@EnableKafka

@Configuration

public class Config {

 

    // Function to establish a connection

    // between Spring application

    // and Kafka server

    @Bean

    public ConsumerFactory<String, Student>

    studentConsumer()

    {

 

        // HashMap to store the configurations

        Map<String, Object> map

            = new HashMap<>();

 

        // put the host IP in the map

        map.put(ConsumerConfig

                    .BOOTSTRAP_SERVERS_CONFIG,

                "127.0.0.1:9092");

 

        // put the group ID of consumer in the map

        map.put(ConsumerConfig

                    .GROUP_ID_CONFIG,

                "id");

        map.put(ConsumerConfig

                    .KEY_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class);

        map.put(ConsumerConfig

                    .VALUE_DESERIALIZER_CLASS_CONFIG,

                JsonDeserializer.class);

 

        // return message in JSON formate

        return new DefaultKafkaConsumerFactory<>(

            map, new StringDeserializer(),

            new JsonDeserializer<>(Student.class));

    }

 

    @Bean

    public ConcurrentKafkaListenerContainerFactory<String,

                                                   Student>

    studentListner()

    {

        ConcurrentKafkaListenerContainerFactory<String,

                                                Student>

            factory

            = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(studentConsumer());

        return factory;

    }

}

步骤 4:

创建一个带有@Service注释的KafkaService类。此类将包含用于在控制台上发布消息的侦听器方法。 

  • KafkaService 类

@Service

public class KafkaService {

 

    // Annotation required to listen

    // the message from Kafka server

    @KafkaListener(topics = "JsonTopic",

                   groupId = "id", containerFactory

                                   = "studentListner")

    public void

    publish(Student student)

    {

        System.out.println("New Entry: "

                           + student);

    }

}

步骤 5:

启动 zookeeper 和 Kafka 服务器。现在我们需要创建一个名为JsonTopic的新主题。为此,打开一个新的命令提示符窗口并将目录更改为 Kafka 目录。

步骤6:

现在使用下面给出的命令创建一个新主题: 

bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // 用于 mac 和 linux
.\bin\windows\Kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // 用于 windows 

步骤 7:

现在运行 Kafka 生产者控制台,使用以下命令: 

bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // 适用于 mac 和 linux
.\bin\windows\Kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // 适用于 windows 

步骤 8:

运行应用程序并在 Kafka 生产器上输入消息并按回车键。

http://www.yayakq.cn/news/421026/

相关文章:

  • 企业网站开发职责百度网页版浏览器网址
  • seo免费自学的网站卡曼科技网站建设
  • 惠城网站建设微博同步wordpress
  • 阿里云部署网站建设网站坂田
  • wordpress 自助建站互联网销售是什么意思
  • 永久免费做网站app聚美优品的网站建设状况
  • 一个空间两个php网站替换wordpress
  • 化州网站开发公司久久网会上市吗
  • 怎么看一个网站什么程序做的百度百科官网登录
  • 新手网站建设教程平阳住房和城乡建设厅网站
  • 长沙3合1网站建设价格农技推广
  • 响应式网站展示型站开发技术培训
  • 网站开发计划时间南阳市住房和城市建设局网站
  • 长春网站优化seo淘宝客如何建设推广网站
  • 酷炫给公司网站欣赏企业微信网站怎么建设
  • 辽阳免费网站建设公司wordpress更换ip后台登录不
  • 网站建设推广案例济南网站建设
  • wordpress的文档主题上海知名的seo推广咨询
  • 网站建设跟推广评价指标有什么关系北京工商注册信息查询
  • 建设银行嘉兴分行官方网站企业手机网站建设教程
  • 深圳网站建设服务商万创网企业三合一建站公司具体该怎么找
  • 推广产品网站建设云南网站建设维修公司
  • 淘宝网站做淘宝客天津建设网工程信息网
  • 凡科网站怎么建设个人网站宁波 外贸网站建设
  • 汕头有什么招聘平台seo关键词排名优化方法
  • 百度怎么做网站排名做国外网站收款怎么收
  • 制作网站的设计难点品牌价值
  • 网站导航怎么做永州网站开发
  • 网站安装百度商桥京津冀协同发展的战略意义
  • 怎么做才能提高网站权重舆情分析工具