当前位置: 首页 > news >正文

站长统计黄页网站下载大全合肥做双语外贸网站

站长统计黄页网站下载大全,合肥做双语外贸网站,电脑无法访问网页是什么原因,设备外贸用哪个网站背景: kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取…

背景:

kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性

TextInputFormat源码解析

首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块,判断文件是否可以进行分块的代码如下:

protected boolean testForUnsplittable(FileStatus pathFile) {if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {unsplittable = true;return true;}return false;
}private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {String fileExtension = extractFileExtension(path.getName());if (fileExtension != null) {return getInflaterInputStreamFactory(fileExtension);} else {return null;}
}

在这里插入图片描述

后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分,切分的具体代码如下所示:

while (samplesTaken < numSamples && fileNum < allFiles.size()) {// make a split for the sample and use it to read a recordFileStatus file = allFiles.get(fileNum);
// 根据偏移量进行切分FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);// we open the split, read one line, and take its lengthtry {open(split);if (readLine()) {totalNumBytes += this.currLen + this.delimiter.length;samplesTaken++;}} finally {// close the file stream, do not release the bufferssuper.close();}
// 偏移量迁移offset += stepSize;// skip to the next file, if necessarywhile (fileNum < allFiles.size()&& offset >= (file = allFiles.get(fileNum)).getLen()) {offset -= file.getLen();fileNum++;}
}

再来看一下TextInputFormat如何支持checkpoint操作,保存文件的偏移量的代码:

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);checkState(checkpointedState != null, "The operator state has not been properly initialized.");int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();// 算子列表状态checkpointedState.clear();// 获取文件的当前读取的偏移List<T> readerState = getReaderState();try {for (T split : readerState) {//保存到检查点路径中checkpointedState.add(split);}} catch (Exception e) {checkpointedState.clear();throw new Exception("Could not add timestamped file input splits to to operator "+ "state backend of operator "+ getOperatorName()+ '.',e);}if (LOG.isDebugEnabled()) {LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",getClass().getSimpleName(),subtaskIdx,readerState.size(),readerState);}
}

从检查点中恢复状态的代码如下:

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);checkState(checkpointedState == null, "The reader state has already been initialized.");// 初始化算子操作状态checkpointedState =context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);splits = splits == null ? new PriorityQueue<>() : splits;for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块splits.add(split);}
}
http://www.yayakq.cn/news/619622/

相关文章:

  • 连南网站建设上海做网址域名的公司
  • 郑州管家网站托管百度关键词推广价格查询
  • WordPress 网站成本传统网站模板
  • 上海市企业服务云网站网站的建设运营收费是哪些
  • 视频网站开发有哪些功能wordpress如何导航网站模板下载
  • 池州市建设厅官方网站淘宝关键词怎么选取
  • 手机网站头部代码深圳华强北化妆品
  • 网站收录后然后怎么做医疗知识普及网站开发
  • 成立学校网站建设小组内蒙古建筑工程招标网
  • 网站的布局方式有哪些方面做网站 怎么赚钱
  • 平台网站建设步骤成都企业门户网站建设
  • 巩义网站建设报价辽宁大连直客部七部
  • 青海哪家做网站的公司最大谷歌云 wordpress 建站
  • 长春网站建设公司排名网站案例上海
  • 站长工具端口查询免费招聘人才网
  • 建网站拿到广告秦皇岛市做公司网站的
  • 号网站开发宁波受欢迎全网seo优化
  • 合肥市网站制作肇庆做网站
  • 想要去国外网站买东西怎么做自学网站建设作业
  • 做网站让人来注册邯郸seo优化公司
  • 建设商务网站的步骤dogip网站开发
  • 做教育培训网站普陀区网站建
  • 关于网站建设的新闻文化传媒公司广告宣传
  • 石家庄做物流的网站网页设计制作与代码整体素材
  • 做团购的家居网站有哪些松滋网站开发
  • 建立网站就是制作网页吗模板做的网站如何下载地址
  • 免费网站风格中通物流企业网站建设书
  • 网站开发用不用写交互一级a做爰精免费网站
  • 新网站建设的流程微网站建设套餐
  • 网站开发的项目深圳网站设计哪家快