当前位置: 首页 > news >正文

珠海网站建设方案维护虚拟电脑主机平台

珠海网站建设方案维护,虚拟电脑主机平台,虚拟主机网站怎么上传文件,福建省建设厅网站 登录系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录前言DataX的Writer写入流…

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel

文章目录

  • 系列文章目录
  • 前言
  • DataX的Writer写入流程
  • Writer组件如何处理各类数据源
  • writer相关源码


前言

在 DataX 中,writer 是数据同步过程中的一个核心组件,负责将数据写入到目标数据源。下面是对 DataX 中 writer 组件的源码分析:
Writer 接口定义:
DataX 的 writer 组件首先定义了一个 Writer 接口,该接口定义了 writer 需要实现的基本方法,如 init(), write(), post() 等。
不同的数据源插件需要实现这个接口,提供对应的数据写入逻辑。
Writer 插件实现:
对于每种目标数据源,DataX 都会有一个对应的 writer 插件实现。例如,对于 MySQL 数据源,会有一个 MysqlWriter 类实现 Writer 接口。
每个 writer 插件的实现中,会包含与目标数据源交互的逻辑,如建立连接、执行 SQL 语句、批量插入数据等。
Writer 配置:
在 DataX 的 JSON 配置文件中,会指定 writer 的类型和相应的配置参数。
这些配置参数会被传递给 writer 插件的 init() 方法,用于初始化 writer 实例。
数据写入逻辑:
在 write() 方法中,writer 会从上游的 reader 中获取数据,并将其写入到目标数据源。
根据不同的数据源和写入策略,writer 可能会采用批量插入、逐条插入等方式进行数据写入。
writer 还会处理写入过程中的异常和错误,确保数据的完整性和一致性。
Writer 清理和关闭:
在数据写入完成后,writer 会执行 post() 方法,进行一些清理和关闭操作。
这可能包括关闭数据库连接、释放资源等。
通过对 DataX 中 writer 组件的源码分析,我们可以了解到 writer 是如何与目标数据源进行交互的,以及它是如何处理和写入数据的。


DataX的Writer写入流程

  • 初始化和准备:
    根据配置文件中指定的目标数据源类型和参数,初始化Writer实例。
    建立与目标数据源的连接,这通常涉及到网络连接、认证授权等步骤。
    准备写入操作所需的各种资源,如缓冲区、事务等。
  • 数据接收:
    Writer从上游的Reader组件接收数据。这些数据可能是经过转换和处理的,已经符合目标数据源的要求。Writer将数据暂存到本地缓冲区或内存中,等待批量写入或逐条写入。
  • 数据格式化和处理:
    根据目标数据源的要求,Writer可能需要对接收到的数据进行格式化处理,如将数据转换为特定的文本格式、二进制格式或JSON格式等。
  • 数据写入:
    Writer将格式化处理后的数据写入目标数据源。写入操作可能涉及到网络通信、数据库操作等。根据目标数据源的特性,Writer会采用批量写入、流式写入等不同的写入方式以提高性能。对于支持事务的数据源,Writer会在每个写入操作前开启一个事务,并在写入完成后提交事务以确保数据的一致性。
  • 错误处理和重试:
    在写入过程中,Writer需要处理可能出现的各种错误和异常,如网络中断、数据格式错误等。根据配置文件中指定的错误处理策略,Writer可能会进行重试、跳过错误数据、记录错误日志等操作。
  • 写入完成和清理:
    当所有数据都成功写入目标数据源后,Writer会执行一些清理操作,如关闭数据库连接、释放资源等。Writer还会向上游的Reader或整个DataX任务发送完成信号,以通知整个任务流程已经完成。

Writer组件如何处理各类数据源

不同的数据源具有不同的写入特性和要求,因此Writer组件需要针对不同的数据源实现相应的写入逻辑。以下是一般情况下,DataX Writer组件如何处理各类数据源的大致步骤和考虑因素:

  • 数据源连接:
    Writer组件首先需要与目标数据源建立连接。这可能涉及到网络通信、认证授权、连接池管理等操作。
    根据数据源类型的不同,Writer可能会使用不同的连接协议和库,如JDBC、ODBC、API等。
  • 写入前准备:
    根据目标数据源的表结构,Writer可能需要创建表、索引或分区。
    Writer可能还需要准备写入数据的格式,如文本、二进制、JSON等。
    对于支持事务的数据源,Writer可能会开启一个事务来确保数据的一致性。
  • 数据写入:
    Writer从Reader组件接收数据,并将其写入目标数据源。
    根据数据源的特点,Writer可能会采用批量写入、逐条写入、流式写入等不同的写入方式。对于一些支持并行写入的数据源,Writer可能需要将数据分片并分配给多个线程或进程进行并发写入。
  • 错误处理:
    Writer需要处理写入过程中可能出现的异常和错误,如网络中断、数据格式错误、数据冲突等。
    根据不同的错误类型,Writer可能会采取重试、跳过、记录错误日志等不同的处理策略。
  • 写入优化:
    对于不同的数据源,Writer可能会采用不同的优化策略来提高写入性能,如使用批量插入、调整事务大小、优化网络传输等。
    Writer还可能利用目标数据源的特定功能,如批量提交、索引优化等,来进一步提高写入效率。
  • 写入后处理:
    在数据写入完成后,Writer可能会执行一些后处理操作,如提交事务、关闭连接、清理临时文件等。
    对于一些需要额外处理的数据源,Writer可能还会执行数据校验、更新统计信息等操作。
  • 扩展性和灵活性:
    DataX的Writer组件设计通常具有高度的扩展性和灵活性,以便支持新的数据源类型。通过实现统一的接口和抽象类,可以方便地添加新的Writer插件来支持新的数据源。

总之,DataX的Writer组件通过针对不同数据源实现特定的写入逻辑和优化策略,能够高效地处理各类数据源,并确保数据的正确性和一致性。同时,其扩展性和灵活性的设计也使得DataX能够轻松应对不断变化的数据处理需求。

writer相关源码


/*** 每个Writer插件需要实现Writer类,并在其内部实现Job、Task两个内部类。* * * */
public abstract class Writer extends BaseObject {/*** 每个Writer插件必须实现Job内部类*/public abstract static class Job extends AbstractJobPlugin {/*** 切分任务。<br>* * @param mandatoryNumber*            为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!* * */public abstract List<Configuration> split(int mandatoryNumber);}/*** 每个Writer插件必须实现Task内部类*/public abstract static class Task extends AbstractTaskPlugin {public abstract void startWrite(RecordReceiver lineReceiver);public boolean supportFailOver(){return false;}}
}
public class MysqlWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql;public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterJob;@Overridepublic void preCheck(){this.init();this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);}@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterJob.init(this.originalConfig);}// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)@Overridepublic void prepare() {//实跑先不支持 权限 检验//this.commonRdbmsWriterJob.privilegeValid(this.originalConfig, DATABASE_TYPE);this.commonRdbmsWriterJob.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);}// 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)@Overridepublic void post() {this.commonRdbmsWriterJob.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterJob.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterTask;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterTask.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);}//TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterTask.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);}@Overridepublic boolean supportFailOver(){String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);return "replace".equalsIgnoreCase(writeMode);}}}

public class RdbmsWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;static {//加载插件下面配置的驱动类DBUtil.loadDriverClass("writer", "rdbms");}public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterMaster;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();// warn:not like mysql, only support insert mode, don't useString writeMode = this.originalConfig.getString(Key.WRITE_MODE);if (null != writeMode) {throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("写入模式(writeMode)配置有误. 因为不支持配置参数项 writeMode: %s, 仅使用insert sql 插入数据. 请检查您的配置并作出修改.",writeMode));}this.commonRdbmsWriterMaster = new SubCommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterMaster.init(this.originalConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterMaster.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterMaster.split(this.originalConfig,mandatoryNumber);}@Overridepublic void post() {this.commonRdbmsWriterMaster.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterMaster.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterSlave;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterSlave = new SubCommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterSlave.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);}public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterSlave.startWrite(recordReceiver,this.writerSliceConfig, super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterSlave.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);}}}
http://www.yayakq.cn/news/833032/

相关文章:

  • 专业的东莞网站排名高端定制网站建设
  • 网站怎样做wap端明年做哪个网站致富
  • 怎么自己弄一个网站宝安区在深圳排第几
  • 给别人做网站是外包公司盐田区住房和建设局网站
  • 学校网站建设价格哪里有培训网页设计
  • 网站开发+协作平台大气有内涵的公司名字
  • 网站过期怎么找回来常州建设企业网站
  • 北京网站建设公九江集团网站建设
  • 青岛网站制作辰星辰成功的网站必须具备的要素
  • 网站不能粘贴怎么做wordpress新建网页插件
  • 专门做酒的网站有哪些网站前端做报名框代码
  • 品牌网站设计自己的网站怎么做app
  • 微网站制作超链接网站优化员seo招聘
  • 大连省建设厅网站汕头制作网站推荐
  • 乐清网站建设乐清网站设计wordpress 反斜杠 luj
  • 企业网站建设的账务处理湖州网站集约化平台
  • 简单网页设计模板网站成都做微信小程序的公司
  • 网站搭建策略与方法是什么购物网站导航模板
  • 淘宝客网站主题模版wordpress微商授权
  • 网站如何做一张轮播图app定制开发制作
  • 网站 需求文档微商城开发公司
  • 专业的营销型网站企业文化南昌网站建设价格
  • 砀山网站建设常德规划建设局网站
  • 高校网站建设滞后即给做网站又给我们做推广的公司呢
  • 网站开发课程设计总结wordpress配置多语言
  • 建设银行海淀支行 网站一级a做爰小说免费网站
  • 公司百度网站怎么做wordpress上传慢
  • 巢湖网站建设公司网页设计与制作课程标准构建
  • 河北省城乡与建设厅网站手机自助网站建设
  • 护肤品网站制作 网新科技广州最近流行传染疾病