当前位置: 首页 > news >正文

深圳网站制作必选祥奔科技网络系统管理是做什么的

深圳网站制作必选祥奔科技,网络系统管理是做什么的,百度关键词排名突然没了,网站导航栏模板怎么做1. producer的结构 producer:生产者 它由三个部分组成 interceptor:拦截器,能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不…

1. producer的结构

producer:生产者

它由三个部分组成

interceptor:拦截器能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不存在的

serialiazer:序列化器kafka中存储的数据是二进制的,所以数据必须经过序列化器进行处理,这个是必须要有的,将用户的数据转换为byte[]的工具类,其中k和v要分别指定

partitioner: 分区器主要是控制发送的数据到topic的哪个分区中,这个默认也是存在的

record accumulator

本地缓冲累加器 默认32M

producer的数据不能直接发送到kafka集群中,因为producer和kafka集群并不在一起,远程发送的数据不是一次发送一条这样太影响发送的速度和性能,所以我们发送都是攒一批数据发一次,record accumulator就是一个本地缓冲区,producer将发送的数据放入到缓冲区中,另外一个线程会去拉取其中的数据,远程发送给kafka集群,这个异步线程会根据linger.msbatch-size进行拉取数据。如果本地累加器中的数据达到batch-size或者是linger.ms的大小阈值就会拉取数据到kafka集群中,这个本地缓冲区不仅仅可以适配两端的效率,还可以批次形式执行任务,增加效率

batch-size 默认16KB

linger.ms 默认为0

生产者部分的整体流程

首先producer将发送的数据准备好

经过interceptor的拦截器进行处理,如果有的话

然后经过序列化器进行转换为相应的byte[]

经过partitioner分区器分类在本地的record accumulator中缓冲

sender线程会自动根据linger.ms和batch-size双指标进行管控,复制数据到kafka

2. producer的简单代码

2.1 准备:

引入maven依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>
</dependencies>

在resources文件中创建log4j.properties

log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n

2.2 生产者中的设定参数

参数含义
bootstrap.serverskafka集群的地址
key.serializerkey的序列化器,这个序列化器必须和key的类型匹配
value.serializervalue的序列化器,这个序列化器必须和value的类型匹配
batch.size批次拉取大小默认是16KB
linger.ms拉取的间隔时间默认为0,没有延迟
partitioner分区器存在默认值
interceptor拦截器选的

2.3 全部代码

public class producer_test {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");//设定集群地址pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设定两个序列化器,其中StringSerializer是系统自带的序列化器,要和数据的类型完全一致pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);//batch-size默认是16KB,参数的单位是bytepro.put(ProducerConfig.LINGER_MS_CONFIG, 0);//默认等待批次时长是0KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");//发送数据的时候有kv两个部分,但是一般k我们什么都不放,只放value的值producer.send(record);producer.close();}
}

在x-shell中观察消费的数据

http://www.yayakq.cn/news/225151/

相关文章:

  • 程序员做网站类网站搜索引擎优化的简称
  • 什么是可信网站认证做网站的收费标准
  • wordpress安装语言设置关键词优化流程
  • 卡地亚手表官方网站查询网站续费一年多少钱
  • 找别人做网站wordpress 主题 新闻_
  • 世界网站流量排名注册网站挣钱
  • 黑群晖建设个人网站庆网站制作公司
  • 外贸自建站的推广方式百度推广销售话术
  • 山东做网站的杭州市萧山区哪家做网站的公司好
  • 学校网站 网站建设搭建网站服务器教程
  • 手机网站建设创意新颖外包一个企业网站多少钱
  • 招聘门户网站有哪些毕业设计做网站有哪些方面
  • 百度自己网站排名网站架构文案
  • 网站浏览图片怎么做百度号码认证申诉平台
  • 广州网站开发公司哪家好seo优化培训多少钱
  • 专业的聊城网站建设怎么做能让网站尽快收录
  • 二维码生成器在线制作免费外贸seo博客
  • 网站建设学习流程企业黄页电话
  • 新网站的建设工作图书馆网站建设调查问卷
  • 搜狗网站排名怎么做株洲企业关键词优化最新报价
  • 九江做网站的公司赣州seo外包怎么收费
  • 商业网站图片企业站seo外包
  • 可以左右滑动的网站wordpress临时关闭页面
  • ps网站建设教程鹤山网站建设易搜互联
  • 商业网站开发实训总结各大网站做推广广告
  • 专业建站培训深圳网站建设行吗
  • 网站提高收录和访问量网站开发技术概述
  • 网站推广做哪个比较好可以做淘宝联盟的免费网站
  • 相册网站建设方案wordpress手机显示缩
  • 台州品牌网站设计网站设计专业公司价格