当前位置: 首页 > news >正文

温岭公司做网站湛江网站制作工具

温岭公司做网站,湛江网站制作工具,广中路街道网站建设,商丘seo教程10 Flink CDC 1. CDC是什么2. CDC 的种类3. 传统CDC与Flink CDC对比4. Flink-CDC 案例5. Flink SQL 方式的案例 1. CDC是什么 CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数…

10 Flink CDC

  • 1. CDC是什么
  • 2. CDC 的种类
  • 3. 传统CDC与Flink CDC对比
  • 4. Flink-CDC 案例
  • 5. Flink SQL 方式的案例

1. CDC是什么

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
CDC 技术应用场景非常广泛:
数据同步,用于备份,容灾;
数据分发,一个数据源分发给多个下游;
数据采集(E),面向数据仓库/数据湖的 ETL 数据集成。

2. CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
在这里插入图片描述

3. 传统CDC与Flink CDC对比

  1. 传统 CDC ETL 分析
    在这里插入图片描述

  2. 基于 Flink CDC 的 ETL 分析
    在这里插入图片描述

  3. 基于 Flink CDC 的聚合分析
    在这里插入图片描述

  4. 基于 Flink CDC 的数据打宽
    在这里插入图片描述

4. Flink-CDC 案例

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
开源地址:https://github.com/ververica/flink-cdc-connectors。
示例代码:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;public class FlinkCDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点
续传,需要从 Checkpoint 或者 Savepoint 启动程序//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//2.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");//3.创建 Flink-MySQL-CDC 的 Source//initial (default): Performs an initial snapshot on the monitored database tables upon 
first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first 
startup, just read from the end of the binlog which means only have the changes since the 
connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first 
startup, and directly read binlog from the specified timestamp. The consumer will traverse the 
binlog from the beginning and ignore change events whose timestamp is smaller than the 
specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon 
first startup, and directly read binlog from the specified offset.DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder().hostname("hadoop01").port(3306).username("root").password("000000").databaseList("gmall-flink").tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会
读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.startupOptions(StartupOptions.initial()).deserializer(new StringDebeziumDeserializationSchema()).build();//4.使用 CDC Source 从 MySQL 读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);//5.打印数据mysqlDS.print();//6.执行任务env.execute();} 
}

5. Flink SQL 方式的案例

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建 Flink-MySQL-CDC 的 SourcetableEnv.executeSql("CREATE TABLE user_info (" +" id INT," +" name STRING," +" phone_num STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop01'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '000000'," +" 'database-name' = 'gmall-flink'," +" 'table-name' = 'z_user_info'" +")");tableEnv.executeSql("select * from user_info").print();env.execute();}
}
http://www.yayakq.cn/news/245147/

相关文章:

  • 机械加工网站哪个好visualstudio 做网站
  • 网站美术视觉效果布局设计聊城做网站推广公司
  • 怎么把自己做的网站阿里巴巴吧做网站
  • 网页设计与网站开发试卷视频网站外链怎么做
  • 南京建设工程信息网站wordpress提示不安全
  • android 旅游网站开发成都网络优化网站建设
  • 建设网站公司兴田德润在哪里建设银行个人网上银行网页
  • 有没有做网站近期的新闻热点
  • 旅游网站设计asp火蝠电商代运营值得相信吗
  • 铜陵市网站建设广州哪里有做网站推广
  • 云南营销型网站建设百度登录入口官网
  • 宁波免费网站建站模板国家反诈中心app下载二维码
  • 怎样建立一个营销的公司网站网络广告商
  • 建设网站的重要性赚钱宝部署wordpress
  • 沧浪seo网站优化软件企业网站建设标准
  • 现在做网站需要多少钱网站如何做cdn
  • 北京做企业网站多少钱合肥网站建设公司排名
  • 做物流网站的图片素材中国室内设计师联盟官网
  • 南沙门户网站建设开发个网站开票名称是什么意思
  • 许昌网站建设wordpress 移植
  • 公司电商网站开发合同济南建设职业技术学院
  • 网站修改建设品牌vi设计有哪些内容
  • 摄影网站制作安装系统后没有wordpress
  • 管理咨询网站二级域名做网址导航大全网站
  • 柳州市城乡建设局网站常德政务网站
  • 网站模板 jsp运河经济开发区建设局网站
  • 海洋馆网站建设python做网站还是数据
  • 公司宣传页设计印刷保定网站关键词优化
  • 网站建设jnlongji济南网络推广公司排名
  • 樊城区建设局网站微信crm客户管理系统