当前位置: 首页 > news >正文

网站策划编辑是干嘛的wordpress 页面伪静态

网站策划编辑是干嘛的,wordpress 页面伪静态,百度官网认证入口,深圳龙华网站公司基于KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerC…

基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同。详情如下:

依赖项(其实spring-kafka包含了kafka-clients)

<!-- spring-kafka --> 
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version>
</dependency>

配置文件
配置参数的格式和含义,参见《spring-kafka的配置使用》

生产代码

@Component
@Slf4j
public class KafKaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于* kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于* ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型*/ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> sendResult){// log.info("发送消息成功:" + sendResult.toString());}});}
}

消费者配置类,其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例

@Data
@Slf4j
@Configuration
public class KafkaConfig {@ResourceEnvironment environment;@Beanpublic KafkaListenerContainerFactory<?> containerFactory() {Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消费并发数量containerFactory.setBatchListener(true);      // 批量监听消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限return containerFactory;}@Beanpublic Map<String, Object> consumerConfigs() {String servers          = environment.getProperty("kafka.servers", "127.0.0.1:9092");String groupId          = environment.getProperty("kafka.groupId", "consumer-group");String sessionTimeout   = environment.getProperty("kafka.session.timeout.ms", "60000");String maxPollRecords   = environment.getProperty("kafka.max.poll.records", "100");String maxPollInterval  = environment.getProperty("kafka.max.poll.interval", "600000");String jaasConfig       = environment.getProperty("kafka.sasl.jaas.config");Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");props.put("sasl.jaas.config", jaasConfig);return props;}
}

消费代码 @KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例,也就指定了kafka集群

@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {@Autowiredprivate Environment environment;@Autowiredprivate KafkaMsgHandleService msgHandleService;@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/*************************      接收消息************************/@Override@KafkaListener( containerFactory = "containerFactory", groupId = "${kafka.groupId}", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "${kafka.concurrency}")public void onMessage(List<ConsumerRecord<String, String>> records) {try {final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));/// 处理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error("KafkaListener_kafka_consume_error.", e);}}/*************************      处理消息************************/private void processRecord(String msg) {taskExecutor.submit(() -> {if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {log.warn("KafkaListener_turn_off_drop_message.");return;}msgHandleService.handle(msg);});}
}
http://www.yayakq.cn/news/672248/

相关文章:

  • 上海做外贸网站的公司长春做网站外包
  • 做学历的网站小户型室内装修设计公司网站
  • 快捷网站建设免费的上色软件
  • 怎么搭建个人网站电脑做服务器杭州网站建设哪家比较好
  • 台州网站制作系统厦门有家装饰
  • 有哪些网站可以做推广nodejs网站毕设代做
  • 百度指数不高的网站怎么优化商贸有限公司经营范围
  • 电商网站的人员团队建设企业网站做百度小程序
  • 自助网站免费建站平台安卓搭建网站
  • 一台服务器可以建设几个网站宁波网上预约挂号平台
  • 注册网站域名要钱吗秦皇岛网站开发报价
  • 网站开发那家好建设通网站是免费的吗
  • 网站推广的软件国内免费域名
  • 单页面企业网站适合前端开发的电脑推荐
  • 衡水企业网站制作报价无锡企业如何建网站
  • 什么网站可以做市场分析呢wordpress 自动同步工具
  • 网站改版的前端流程深圳市网站制作最好的公司
  • 茌平网站建设价格简述企业网站的网络营销功能
  • 南宁哪里做网站保险网站定制
  • 东莞建设网站官网住房和城乡资料网站建设公司考察
  • 网站设计说明书功能流程图哪个公司要做网络推广
  • 衡水专业做wap网站中国艺术设计联盟
  • 地方性小网站的建设第二课强登陆网站新型智库建设的意见
  • 网站建设交付django 网站开发
  • 罗定市城乡建设局网站蓝色网站设计
  • 郑州网站推广营销网站备案的作用
  • 会计网站建设意义新网站怎么快速收录
  • 电子商务网站建设重点难点网站建设成功案例方案
  • 手机管理网站模板下载软件wordpress 如何购买主题
  • 个人网站备案能做什么内容郑州代理记账