当前位置: 首页 > news >正文

济南网站建设jnjy8网页前端模板网站

济南网站建设jnjy8,网页前端模板网站,品牌建设赋能增效,免费发布项目信息的平台Spark 应用调优人数统计优化摇号次数分布优化Shuffle 常规优化数据分区合并加 Cache优化中签率的变化趋势中签率局部洞察优化倍率分析优化表信息 : apply : 申请者 : 事实表lucky : 中签者表 : 维度表两张表的 Schema ( batchNum,carNum ) : ( 摇号批次&#xff0c…

Spark 应用调优

  • 人数统计
    • 优化
  • 摇号次数分布
    • 优化
      • Shuffle 常规优化
      • 数据分区合并
      • 加 Cache
    • 优化
  • 中签率的变化趋势
  • 中签率局部洞察
    • 优化
  • 倍率分析
    • 优化

表信息 :

  • apply : 申请者 : 事实表
  • lucky : 中签者表 : 维度表
  • 两张表的 Schema ( batchNum,carNum ) : ( 摇号批次,申请编号 )
  • 分区键都是 batchNum

运行环境 :

在这里插入图片描述

配置项设置 :

在这里插入图片描述

优化点 :

在这里插入图片描述

人数统计

统计至今,参与摇号的总人次和幸运的中签者人数

val rootPath: String = _// 申请者数据(因为倍率的原因,每一期,同一个人,可能有多个号码)
val hdfs_path_apply = s"${rootPath}/apply"
val applyNumbersDF = spark.read.parquet(hdfs_path_apply)
applyNumbersDF.count// 中签者数据
val hdfs_path_lucky = s"${rootPath}/lucky"
val luckyDogsDF = spark.read.parquet(hdfs_path_lucky)
luckyDogsDF.count

SQL 实现 :

selectcount(*)
from applyNumbersDFselectcount(*)
from luckyDogsDF

去重计数,得到实际摇号数 :

val applyDistinctDF = applyNumbersDF.select("batchNum", "carNum").distinctapplyDistinctDF.count

SQL 实现 :

selectcount(distinct batchNum ,carNum)
from applyDistinctDF

优化

分析 : 共有 3 个 Actions,会触发 3 个 Spark Jobs
用 Cache 原则:

  • RDD/DataFrame/Dataset 引用次数为 1,坚决不用 Cache
  • 当引用次数大于 1,且运行成本占比超过 30%,考虑用 Cache

优化 :

  • 利用 Cache 机制来提升执行性能
val rootPath: String = _// 申请者数据(因为倍率的原因,每一期,同一个人,可能有多个号码)
val hdfs_path_apply = s"${rootPath}/apply"
val applyNumbersDF = spark.read.parquet(hdfs_path_apply)
// 缓存
applyNumbersDF.cacheapplyNumbersDF.countval applyDistinctDF = applyNumbersDF.select("batchNum", "carNum").distinct
applyDistinctDF.count

在这里插入图片描述

摇号次数分布

不同人群摇号次数的分布 :

  • 统计所有申请者累计参与了多少次摇号
  • 所有中签者摇了多少次号才能幸运地摇中签

统计所有申请者的分布情况

val result02_01 = applyDistinctDF.groupBy(col("carNum")).agg(count(lit(1)).alias("x_axis")).groupBy(col("x_axis")).agg(count(lit(1)).alias("y_axis")).orderBy("x_axis")result02_01.write.format("csv").save("_")

SQL 实现 :

with t1 as (selectcarNum,count(1) as x_axisfrom applyDistinctDFgroup by carNum
)
selectx_axis,count(1) as y_axis
from t1
group by x_axis
order by x_axis

在这里插入图片描述

优化

分析 : 共两次 Shuffle。以 carNum 做分组计数, 以 x_axis 列再次做分组计数

Shuffle 的本质 : 数据的重新分发,凡是有 Shuffle 地方就要关注数据分布

  • 对过小的数据分片,要对进行合并

Shuffle 常规优化

优化点 : 减少 Shuffle 过程中磁盘与网络的请求次数

Shuffle 的常规优化:

  • By Pass 排序操作 : 条件:计算逻辑不涉及聚合或排序;Reduce 的并行度 < spark.shuffle.sort.bypassMergeThreshold
  • 调整读写缓冲区 : 条件 : Execution Memory 大

对读写缓冲区做调优 :

  • spark.shuffle.file.buffer : Map 写入缓冲区大小
  • spark.reducer.maxSizeInFlight : Reduce 读缓冲区大小

读写缓冲区是以 Task 为粒度进行设置,所以调整这些参数时, 扩大 50%

默认调优
spark.shuffle.file.buffer = 32KBspark.shuffle.file.buffer = 48 KB (32KB * 1.5)
spark.reducer.maxSizeInFlight = 48 MBspark.reducer.maxSizeInFlight = 72MB ( 48MB * 1.5)

性能对比 :

在这里插入图片描述

数据分区合并

优化点 : 提升 Reduce 阶段的 CPU 利用率

该数据集在内存的精确大小 :

def sizeNew(func: => DataFrame, spark: => SparkSession): String = {val result = funcval lp = result.queryExecution.logicalval size = spark.sessionState.executePlan(lp).optimizedPlan.stats.sizeInByte"Estimated size: " + size/1024 + "KB"
}

把 applyDistinctDF 作实参,调用 sizeNew 函数,返回大小 = 2.6 GB

  • 将数据集尺寸/并行度(spark.sql.shuffle.partitions = 200) = Reduce 每个数据分片的存储大小 ( 2.6 GB / 200 = 13 MB)
  • 数据分片大小在 200 MB 左右为宜,13 MB 太小

优化设置 :

  • 计算集群配置 Executors core = 3 * 2 = 6,其 minPartitionNum 为 6
# 开启 AQE
spark.sql.adaptive.enabled = true# 自动分区合并
spark.sql.adaptive.coalescePartitions.enabled = true
# 合并后的大小
spark.sql.adaptive.advisoryPartitionSizeInBytes = 160MB/200MB/210MB/400MB
# 分区合并后的最小分区数
spark.sqladaptive.coalescePartitions.minPartitionNum = 6

总结 :

  • 并行度过高、数据分片过小,CPU 调度开销会变大,执行性能也变差
  • 检验值 : 分片粒度为 200 MB 左右时,执行性能是最优的
  • 并行度过低、数据分片过大,CPU 数据处理开销也会过大,执行性能会锐减

性能对比 :

在这里插入图片描述

加 Cache

Cache : 避免数据集在磁盘中的重复扫描与重复计算

applyDistinctDF.cache
applyDistinctDF.countval result02_01 = applyDistinctDF.groupBy(col("carNum")).agg(count(lit(1)).alias("x_axis")).groupBy(col("x_axis")).agg(count(lit(1)).alias("y_axis")).orderBy("x_axis")result02_01.write.format("csv").save("_")

性能对比 :

在这里插入图片描述


得到中签者的摇号次数

val result02_02 = applyDistinctDF.join(luckyDogsDF.select("carNum"), Seq("carNum"), "inner").groupBy(col("carNum")).agg(count(lit(1)).alias("x_axis")).groupBy(col("x_axis")).agg(count(lit(1)).alias("y_axis")).orderBy("x_axis")result02_02.write.format("csv").save("_")

SQL 实现 :


with t3 as (selectcarNum,count(1) as x_axisfrom applyDistinctDF t1 join luckyDogsDF t2on t1.carNum = t2.carNumgroup by carNum
)
selectx_axis,count(1) as y_axis
from t3
group by x_axis
order by x_axis

在这里插入图片描述

优化

分析 : 计算中有一次数据关联,两次分组、聚合,排序

  • applyDistinctDF 有 1.35 亿条记录
  • luckyDogsDF 有 115 w条记录
  • 大表 Join 小表,最先想用广播变量

用广播变量来优化大小表关联计算 :

  • 估算小表在内存中的存储大小
  • 设置广播阈值 spark.sql.autoBroadcastJoinThreshold

sizeNew 计算 luckyDogsDF ,得到大小 = 18.5MB

设置广播阈值要大于 18.5MB ,即 : 设置为 20MB :

spark.sql.autoBroadcastJoinThreshold = 20MB

性能对比 :

在这里插入图片描述

中签率的变化趋势

计算中签率,分别统计每个摇号批次中的申请者和中签者人数

// 统计每批次申请者的人数
val apply_denominator = applyDistinctDF.groupBy(col("batchNum")).agg(count(lit(1)).alias("denominator"))// 统计每批次中签者的人数
val lucky_molecule = luckyDogsDF.groupBy(col("batchNum")).agg(count(lit(1)).alias("molecule"))val result03 = apply_denominator.join(lucky_molecule.select, Seq("batchNum"), "inner").withColumn("ratio", round(col("molecule")/ col("denominator"), 5)).orderBy("batchNum")result03.write.format("csv").save("_")

SQL 实现 :

with t1 as (selectbatchNum,count(1) as denominatorfrom applyDistinctDFgroup by batchNum
),
t2 as (selectbatchNum,count(1) as moleculefrom luckyDogsDFgroup by batchNum
)
selectbatchNum,round(molecule/denominator, 5) as ratio
from t1 join t2 on t1.batchNum = t2.batchNum
order by batchNum

在这里插入图片描述

中签率局部洞察

统计 2018 年的中签率

// 筛选出2018年的中签数据,并按照批次统计中签人数
val lucky_molecule_2018 = luckyDogsDF.filter(col("batchNum").like("2018%")).groupBy(col("batchNum")).agg(count(lit(1)).alias("molecule"))// 通过与筛选出的中签数据按照批次做关联,计算每期的中签率
val result04 = apply_denominator.join(lucky_molecule_2018, Seq("batchNum"), "inner").withColumn("ratio", round(col("molecule")/ col("denominator"), 5)).orderBy("batchNum")result04.write.format("csv").save("_")

SQL 实现 :

with t1 as (selectbatchNum,count(1) as moleculefrom luckyDogsDFwhere batchNum like '2018%'group by batchNum
)
selectbatchNum,round(molecule/denominator, 5)
from apply_denominator t2 on t1.batchNum = t2.batchNum
order by batchNum

在这里插入图片描述

优化

DPP 的条件 :

  • 事实表必须是分区表,且分区字段(可以是多个)必须包含 Join Key
  • DPP 仅支持等值 Joins,不支持大于、小于这种不等值关联关系
  • 维表过滤后的数据集,要小于广播阈值,调整 spark.sql.autoBroadcastJoinThreshold

DPP 优化 :

  • 降低事实表 applyDistinctDF 的磁盘扫描量
applyDistinctDF.select("batchNum", "carNum").distinctapplyDistinct.count

性能对比 :

在这里插入图片描述

倍率分析

倍率的分布情况 :

  • 不同倍率下的中签人数
  • 不同倍率下的中签比例

2016 年后的不同倍率下的中签人数 :

val result05_01 = applyNumbersDF.join(luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum"), Seq("carNum"), "inner").groupBy(col("batchNum"), col("carNum")).agg(count(lit(1)).alias("multiplier")).groupBy("carNum").agg(max("multiplier").alias("multiplier")).groupBy("multiplier").agg(count(lit(1)).alias("cnt")).orderBy("multiplier")result05_01.write.format("csv").save("_")
with t3 as (selectbatchNum,carNum,count(1) as multiplierfrom applyNumbersDF t1 join luckyDogsDF t2 on t1.carNum = t2.carNumwhere t2.batchNum >= '201601'group by batchNum, carNum
),
t4 as (selectcarNum,max(multiplier) as multiplierfrom t3group by carNum
)
selectmultiplier,count(1) as cnt
from t4
group by multiplier
order by multiplier;

在这里插入图片描述

优化

关联中的 Join Key 是 carNum (非分区键),所以无法用 DPP 机制优化

将大表 Join 小表 , SMJ 转 BHJ :

  • 计算 luckyDogsDF 的内存大小,确保 < 广播阈值,利用 Spark SQL 的静态优化机制将 SMJ 转为 BHJ
  • 确保过滤后 luckyDogsDF < 广播阈值,利用 Spark SQL 的 AQE 机制动态将 SMJ 转为 BHJ
# 静态BHJ
spark.sql.autoBroadcastJoinThreshold = 20MB# AQE 动态BHJ
spark.sql.autoBroadcastJoinThreshold = 10MB

性能对比 :

在这里插入图片描述


计算不同倍率人群的中签比例

// Step01: 过滤出2016-2019申请者数据,统计出每个申请者在每期内的倍率,并在所有批次中选取
val apply_multiplier_2016_2019 = applyNumbersDF.filter(col("batchNum") >= "201601").groupBy(col("batchNum"), col("carNum")).agg(count(lit(1)).alias("multiplier")).groupBy("carNum").agg(max("multiplier").alias("multiplier")).groupBy("multiplier").agg(count(lit(1)).alias("apply_cnt"))// Step02: 将各个倍率下的申请人数与各个倍率下的中签人数左关联,并求出各个倍率下的中签率
val result05_02 = apply_multiplier_2016_2019.join(result05_01.withColumnRenamed("cnt", "lucy_cnt"), Seq("multiplier"), "left").na.fill(0).withColumn("ratio", round(col("lucy_cnt")/ col("apply_cnt"), 5)).orderBy("multiplier")result05_02.write.format("csv").save("_")

SQL 实现 :

with t5 as (selectbatchNum,carNumcount(1) as multiplierfrom applyNumbersDF where batchNum >= '201601'group by batchNum, carNum
),
t6 as (selectcarNum,max(multiplier) as multiplierfrom t1group by carNum
),
t7 as (selectmultiplier,count(1) as apply_cntfrom t2 group by multiplier
)
select multiplier,round(coalesce(lucy_cnt, 0)/ apply_cnt, 5) as ratio
from t7 left left join t5 on t5.multiplier = t7.multiplier
order by multiplier;

在这里插入图片描述

http://www.yayakq.cn/news/624103/

相关文章:

  • 网站建设用cms如何模板建站
  • 泰安网络推广 网站建设 网站优化大学生创新创业大赛ppt模板
  • 哪有做企业网站济南网站建设 荐搜点网络
  • 长寿网站制作网站页面建设
  • 建湖住房和城乡建设局网站wordpress注册登录弹窗代码
  • 网站运行环境建设方案苏州的网络公司网站建设
  • icp备案通过了 怎么修改我的网站网站建设开发感想
  • 茂名网站制作公司自己做网站生意怎么样
  • 智慧景区网站建设南宁网站开发建设
  • 手机制作网站的软件有哪些东西优秀网站网址
  • 在手机上创建网站厦门网站建设哪家专业
  • 网站建设软件东莞市领导班子
  • 网站建设费用选择网络专业广州注销营业执照
  • 现在还用dw做网站设计么国家高新技术企业官网
  • 工信部网站icp备案查询建设一个直播网站
  • 苏州专业网站制作网易企业邮箱手机怎么登录
  • 青岛网站开发培训广西建设主管部门网站
  • 有哪个网站做ic安阳历史
  • 渭南做网站哪家好如何搭建aspx网站
  • 免费创建网站 优帮云网页制作教程视频下载
  • 建行网站用户名是什么贴吧高级搜索
  • wordpress+做仿站网络营销推广方案策划书
  • 百度云怎么做网站晋中网站公司
  • 泉州市住房与城乡建设网站做网站怎么添加图片
  • 南宁网站关键词推广wordpress后台首页增加论坛帖子
  • 5分钟建站wordpress秦皇岛网络推广公司
  • 汇邦团建网站谁做的南京网站群建设公司
  • 什么装修网站做的好的上海企业网站建设制作
  • 南宁网站开发培训0元入驻的电商平台
  • 捕鱼网站建设惠州关键词排名优化