当前位置: 首页 > news >正文

中国工程项目网站云主机怎么上传网站

中国工程项目网站,云主机怎么上传网站,关于做美食的小视频网站,梅州网站设计1 前言 在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。 2 基于Kafka管理的拦截器 Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和 org.apache.kafka.cli…

1 前言

在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。

2 基于Kafka管理的拦截器

Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor
org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下:

2.1 定义拦截器

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 在发送消息之前操作System.out.println("Sending message: " + record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {// 可以在这里获取配置}
}

2.2 定义消费者拦截器

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {// 配置拦截器}@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// 处理接收到的消息records.forEach(record -> {System.out.println("Consumed message: " + record.value());});return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {// 资源清理}
}

2.3 添加拦截器

方式一,通过工厂自定义器设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}@Overridepublic void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));}
}

方式二,通过配置设置拦截器

spring:kafka:producer:properties:interceptor.classes: org.example.kafka.CustomProducerInterceptorconsumer:properties:interceptor.classes: org.example.kafka.CustomConsumerInterceptor

2.4 拦截器使用Spring容器中的Bean

上面的方法可以看到,拦截器由于没有在Spring容器中管理,则无法使用容器中其他Bean来做业务处理,那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例:
Bean定义

@Component
public class MyComponent {public void checkMessage(String message) {System.out.println("Sending message: " + message);}
}

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {private MyComponent myComponent;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {myComponent.checkMessage(record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {myComponent = configs.get("myComponent");}
}

设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {@Autowiredprivate MyComponent myComponent;@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of("myComponent", myComponent,ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}
}

3 基于Spring-Kafka管理的拦截器

基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别(ConsumerRecords),如果要对单条消息拦截,可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。

3.1 单条消息拦截接口定义

由于此拦截器是受Spring容器管理的,所以可以通过@Component注解自动注入到容器中,进行自动拦截。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;@Component
public class CustomRecordInterceptor implements RecordInterceptor<Object, Object> {@Overridepublic ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record) {System.out.println(record.topic());return record;}
}
http://www.yayakq.cn/news/488549/

相关文章:

  • 罗湖做网站的公司网站建设实施过程
  • 哪些公司做网站维护的中国空间站和国际空间站对比
  • led 网站模板设计公司网站图
  • 吉安网站建设343000wordpress 微信绑定
  • 备案 个人网站名称做3d效果图的网站有哪些
  • 建站之星使用教程除了速卖通还有什么网站做外贸
  • 小孩子和大人做的网站无锡高端网站开发
  • 一起做玩具网站中国制造网外贸网官网登录
  • 做app的模板下载网站有哪些龙岩相亲网
  • 下载网站系统建电子商城网站
  • 茂名制作网站软件专门做处理货的网站
  • 网站建设论文3000字重庆铜梁网站建设费用
  • 义乌网站建设方案详细网站js代码检测
  • 北京齐力众信网站建设宿迁网站建设价格
  • 做一网站APP多少钱国家建设执业资格注册中心网站
  • 软文广告有哪些沈阳网站seo优化哪家好
  • 网页设计难还是网站建设南兰州装修公司哪家口碑最好
  • 个人网页网站建设老铁外链
  • 免费crm网站下载的软件学习网站的设置和网页的发布
  • 微信网站是多少钱杭州建站
  • 企业网站制作费用示范校建设专题网站
  • 学校网站建设必要性站长工具无忧
  • 网站后缀意思商标制作logo在线制作
  • 部门门户网站建设请示百度移动网站排名
  • 汽车网站建设分析报告中国服务器龙头企业
  • 哈尔滨道外区建设局官方网站自适应网站功能
  • 怎样做化妆品公司网站关于茶叶网站模板
  • 广州 互联网公司 网站首页网站添加google地图
  • 自己做的网站突然打不开做的好的响应式网站有哪些
  • 怎样做公司的网站建设网站首页布局设计原理