当前位置: 首页 > news >正文

成都制作网站公司校园网的规划与设计

成都制作网站公司,校园网的规划与设计,建设公司网站计入哪个科目,贷款网站怎么做FlinkSQL_1.12_用DDL实现Kafka到MySQL的数据传输_实现按照条件进行过滤写入MySQL_flink从kafka拉取数据并过滤数据写入mysql_旧城里的阳光的博客-CSDN博客 参考这篇文章,写了kafka到mysql的代码例子,因为自己改了表结构,运行下面代码&#x…

FlinkSQL_1.12_用DDL实现Kafka到MySQL的数据传输_实现按照条件进行过滤写入MySQL_flink从kafka拉取数据并过滤数据写入mysql_旧城里的阳光的博客-CSDN博客

参考这篇文章,写了kafka到mysql的代码例子,因为自己改了表结构,运行下面代码:

package org.test.flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;//TODO 用DDL实现Kafka到MySQL的数据传输
public class FlinkSQL15_SQL_DDL_Kafka_MySQL {public static void main(String[] args) throws Exception {//1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.使用DDL的方式加载数据--注册SourceTabletableEnv.executeSql("create table source_sensor(account_id  BIGINT)" +"with (" +"'connector.type' = 'kafka'," +"'connector.version' = 'universal'," +"'connector.topic' = 'testtopic'," +"'connector.properties.bootstrap.servers' = '11.0.24.216:9092'," +"'connector.properties.group.id' = 'bigdata1109'," +"'format.type' = 'json'"+ ")");Table table = tableEnv.sqlQuery("select * from source_sensor");//3.注册SinkTable:MysqltableEnv.executeSql("CREATE TABLE spend_report (\n" +"    account_id BIGINT,\n" +"    PRIMARY KEY (account_id) NOT ENFORCED)" +"with (" +"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://11.0.24.216:4306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false',"+"'table-name' = 'spend_report',"+"'username' = 'root',"+"'password' = '123456'"+ ")");//4.执行查询kafka数据
//        Table source_sensor = tableEnv.from("source_sensor");
//        //5.将数据写入Mysql
//        source_sensor.executeInsert("sink_sensor");
//table.executeInsert("sink_sensor");//6.执行任务env.execute();}
}

发现报错如下:

Exception in thread "main" org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`sink_sensor` does not existsat org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:159)at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)at scala.collection.Iterator.foreach(Iterator.scala:943)at scala.collection.Iterator.foreach$(Iterator.scala:943)at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)at scala.collection.IterableLike.foreach(IterableLike.scala:74)at scala.collection.IterableLike.foreach$(IterableLike.scala:73)at scala.collection.AbstractIterable.foreach(Iterable.scala:56)at scala.collection.TraversableLike.map(TraversableLike.scala:286)at scala.collection.TraversableLike.map$(TraversableLike.scala:279)at scala.collection.AbstractTraversable.map(Traversable.scala:108)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:554)at org.test.flink.FlinkSQL15_SQL_DDL_Kafka_MySQL.main(FlinkSQL15_SQL_DDL_Kafka_MySQL.java:50)

点击table.executeInsert看了下源码:

    /*** Writes the {@link Table} to a {@link TableSink} that was registered under the specified path,* and then execute the insert operation.** <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or {@link* TableEnvironment#useCatalog(String)} for the rules on the path resolution.** <p>A batch {@link Table} can only be written to a {@code* org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a {@code* org.apache.flink.table.sinks.AppendStreamTableSink}, a {@code* org.apache.flink.table.sinks.RetractStreamTableSink}, or an {@code* org.apache.flink.table.sinks.UpsertStreamTableSink}.** <p>Example:** <pre>{@code* Table table = tableEnv.fromQuery("select * from MyTable");* TableResult tableResult = table.executeInsert("MySink");* tableResult...* }</pre>** @param tablePath The path of the registered TableSink to which the Table is written.* @return The insert operation execution result.*/TableResult executeInsert(String tablePath);

 发现executeInsert方法的参数tablePath需要传入表名,这里的表名应该和

tableEnv.executeSql("create table source_sensor(account_id  BIGINT)"

的表名source_sensor一致。

将:

table.executeInsert("sink_sensor");

改成:

table.executeInsert("source_sensor");

后执行成功。

flink1.2的demo完整代码:flink-java-1.12.7: flink1.12.7的java demo,包括flink wordcount示例,如何连接kafka

 

http://www.yayakq.cn/news/783983/

相关文章:

  • 做家装的网站有哪些内容wordpress 内容置顶
  • 云南网站开发公司推荐电子商务考研最佳方向
  • 数据服务网站策划方案网站备案规则
  • 教育类网站开发需求说明书深圳网站建设吗
  • 建设网站商城wordpress的小程序
  • 小游戏网站欣赏移动互联网软件开发与应用
  • 企业为什么要做网站 作用是什么frp可以做网站吗
  • 南京本地网站建设10套免费ppt模板
  • 做的网站为什么图片看不了最近的广告公司在哪里
  • 佛山网站搭建公司哪家好代加工网
  • 网站在网站网站在哪里找到的wordpress 企业插件
  • 软文网站大全爱用建站官网
  • 重庆专业做淘宝网站wordpress 插件 后门
  • 陕西建设机械官方网站ICP备案和实际网站不是一个名字
  • 如何制作网站链接建筑工程发布网站
  • 想做一个静态网页网站不需要有后台数据库wordpress本地调试修改域名
  • 视频网站数据库设计个人接单做网站的平台
  • 电子商务网站的优势网站开发所要达到的目标
  • 建筑图集网站珠海在线网站建设
  • 珠海免费网站制作wordpress免费企业网站
  • 饮料网站建设wordpress站外链接页面
  • 做酒店网站的公司免费拥有自己的网站
  • 教育网站建设 培训网站建设仓储服务 东莞网站建设 技术支持
  • 网站制定公司wordpress与phpstudy
  • 网站对图片优化中国新闻社是央企吗
  • 有没有专门做帽子的网站嘉兴关键词优化服务
  • 明薇通网站建设首选wordpress国外主题下载
  • 企业建立网站需要国家网站后缀
  • 石家庄有哪些做网站的公司可以做彩票网站的工作室
  • 自己做电影网站违法吗大圣网站建设