当前位置: 首页 > news >正文

坪山建设网站建站网站专题设计稿

坪山建设网站建站,网站专题设计稿,引流获客工具,wordpress栏目页面目录 1.维表 2.数据准备 创建源数据 创建维度表 创建Sink表 3.配置任务 Flink SQL创建kafka源表 Flink SQL创建MySQL维表 Flink SQL创建MySQL结果表 编写计算任务 核验数据 1.维表 目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎…

目录

1.维表

2.数据准备

创建源数据

创建维度表

创建Sink表

3.配置任务

Flink SQL创建kafka源表

Flink SQL创建MySQL维表

Flink SQL创建MySQL结果表

编写计算任务

核验数据


1.维表

目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据,然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。

本案例采用MySQL创建维表,与创建MySQL sink表语法相同。

2.数据准备

创建源数据

重启kafka,创建Topic:  case_kafka_mysql

写入json格式的数据

  {"ts": "20201011","id": 8,"price_amt":211}

创建维度表

在MySQL中创建名为product_dim的表

CREATE TABLE `product_dim` (`id` bigint(11) NOT NULL,`coupon_price_amt` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

向数据表插入如下数据:

INSERT INTO `product_dim` VALUES (1, 1);
INSERT INTO `product_dim` VALUES (3, 1);
INSERT INTO `product_dim` VALUES (8, 1);
创建Sink表

在MySQL中创建名为sync_test_3的表

CREATE TABLE `sync_test_3` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

3.配置任务

Flink SQL创建kafka源表
create table flink_test_3 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'case_kafka_mysql','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test3','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');
Flink SQL创建MySQL维表
create table flink_test_3_dim (id BIGINT,coupon_price_amt BIGINT
)
WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'product_dim','username' = 'root','password' = 'Admin','lookup.max-retries' = '3','lookup.cache.max-rows' = 1000);

WITH参数

参数

说明

类型

备注

lookup.cache.max-rows

指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。

Integer

默认情况下,维表Cache是未开启的。

lookup.cache.ttl

指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。

Duration

默认情况下,维表Cache是未开启的。你可以设置lookup.cache.max-rows lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。

lookup.cache.caching-missing-key

是否缓存空的查询结果。

Boolean

参数取值如下:

true(默认值):缓存空的查询结果。

false:不缓存空的查询结果。

lookup.max-retries

查询数据库失败的最大重试次数。

Integer

默认值为3

Flink SQL创建MySQL结果表
CREATE TABLE sync_test_3 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_3','username' = 'root','password' = 'Admin');
编写计算任务
INSERT INTO sync_test_3
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_3 as aLEFT JOIN flink_test_3_dim  FOR SYSTEM_TIME AS OF  a.proctime  as bON b.id = a.id)
GROUP BY ts;
核验数据
SELECT id, ts, total_gmv FROM sync_test_3;

http://www.yayakq.cn/news/603289/

相关文章:

  • 蚌埠集团网站建设wordpress整合ecms同步登录
  • 给wordpress添加字段网站优化效果查询
  • 被攻击网站wordpress 页面重定向循环
  • thinkphp做中英文网站菠萝菠萝蜜免费播放视频
  • 换空间网站备案吗公司做网站自己注册域名
  • 做cad室内平面图的家具素材网站信息推广服务
  • 网站排名是什么意思网站动态画面用啥做
  • asp.net做的网站淄博网站优化
  • 先做网站还是先申请域名重庆拓达建设集团网站
  • 网站建设的特色iphone做网站服务器
  • 网站建设的售后服务怎么写网站开发图片多打开速度慢
  • m开头的网站开发工具青岛网页设计 学校
  • 红酒哪个网站做的好wordpress获取热门文章
  • 美发网站源码如何建造企业网站
  • 各大行业网站中国哪里正在大开发大建设
  • 提升网站权重的方法泰安网站建设论文结论
  • 哪里有做网站的素材网站开发主流技术
  • 河南省住房和城乡建设门户网站高端网站建设公司好吗
  • 电子商务网站建设 教案网站开发要求有哪些
  • 郴州网站制作公司常熟做网站多少钱
  • 国内知名的网站建设企业最好的做法
  • 什么网站做推广比较好涪陵做网站
  • 软件公司网站在线捕鱼网站建设
  • 免费营销型企业网站模板html5 制作手机网站
  • 做网站网页排版错误wordpress门户信息主题
  • php建站模板成品网站源码免费
  • 如何建双注册网站wordpress即时新闻
  • 做网站销售怎么开发客户做网站找我
  • 网站设置支付宝在线支付舟山网站建设公司
  • 肥城网站建设公司soho建网站