当前位置: 首页 > news >正文

努比亚网站开发文档注册安全工程师报考官网

努比亚网站开发文档,注册安全工程师报考官网,专业营销的网站建设公司排名,网站建设优化服务资讯背景 项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。 一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似: def main(…

背景

项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。

一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似:

 def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dml,在insert语句中调用etl_handle进行预处理和写入tableEnv.executeSql(INSERT_DWD_TABLE1)tableEnv.executeSql(INSERT_DWD_TABLE2)... 
}

当有多个ods->dwd操作放在同一个flink作业中时,发现这种方式会导致每次insert操作都是单独的DAG,非常消耗资源,特别是这些处理都是比较轻量级的,最好是能融合在同一个DAG中共享资源。

解决方案

查看flink文档:INSERT 语句 | Apache Flink

因此,可以采用statementset的方式,将不同insert sql进行分组执行,每组的insert sql会先被缓存到 StatementSet 中,并在StatementSet.execute() 方法被调用时,同一组的 insert sql(sink) 会被优化成一张DAG共用taskmanager,减少资源浪费,即类似:

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dmltableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE1).addInsertSql(INSERT_DWD_TABLE2).addInsertSql(INSERT_DWD_TABLE3).execute()tableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE4).addInsertSql(INSERT_DWD_TABLE5).addInsertSql(INSERT_DWD_TABLE6).execute()
}

其他

如果是纯flink sql而不用data stream api,也是可以达到同样的效果的。

http://www.yayakq.cn/news/114433/

相关文章:

  • 网站首页制作的过程三亚app开发公司
  • 怎样登陆网站后台赣州章贡区天气预报15天
  • 做视频解析网站犯法吗摄影网站有哪些?
  • 免费建站的网站网页游戏传奇世界
  • 天津市工商网站查询企业信息wordpress技术cms主题
  • 做外贸常用的网站有哪些网站怎么做关键词怎么优化
  • seo网站关键词排名软件手动搭建wordpress
  • 网站上怎么做动画广告视频建筑类期刊排名
  • 怎样制作做实景的网站如何做网站出单
  • 办个网站卖什么好处百度收录左侧带图片的网站
  • 网站备案需要去哪里网站开发合同验收
  • 四川省住房和城镇建设官方网站数据统计网站有哪些
  • 网页制作网站素材网站哪家公司好
  • 做网站要学点什么googlechrome浏览器
  • 张家港杨舍网站建设cn域名网站
  • 做机械设备销售的那个网站好素材下载
  • 如何才能看到国外的设计网站诸城网站建设0536s
  • 网站怎么下载视频介绍重庆网页设计
  • 安徽中小企业网站建设集团公司网站开发
  • 做网站客户改来改去wordpress优化攻略
  • 手机做直播官方网站酷家乐软件培训班
  • jsp网站开发详解书中山网站建设费用
  • 站长之家查询工具常见的旅游网络营销方式
  • 做微网站要多少钱沧州网站建设设计
  • 东莞哪家做网站比较好企业平台网站建设
  • 关于网站设计的论文上海计算机一级网页设计
  • 家教网站如何做少儿编程有没有必要学
  • 专门做产品推广ppt的网站网站建设相关的工作
  • 淄博什么兼职的网站建设专业手机网站建设公司排名
  • 乔拓云网站注册广东东莞新增本土确诊0例