当前位置: 首页 > news >正文

宝安做棋牌网站建设找哪家效益快网站像素大小

宝安做棋牌网站建设找哪家效益快,网站像素大小,电子商务网站开发的主要支撑组件,河北省建设集团有限公司网站首页Spark中常见的两种数据倾斜现象如下 stage部分task执行特别慢 一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。 作业重试多次某几个task总会失败 常见的退出码143、53、137…

Spark中常见的两种数据倾斜现象如下

  • stage部分task执行特别慢

在这里插入图片描述

一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。

  • 作业重试多次某几个task总会失败
    在这里插入图片描述

常见的退出码143、53、137、52以及heartbeat timed out异常,通常可认为是executor内存被打满。

RDD调优方法

  1. 查看数据分布
    Spark Core中shuffle算子出现数据倾斜时,可在Spark作业中加入查看key分布的代码,也可以将代码拆解出来使用spark-shell做测试
val rdd = sc.parallelize(Array("hello", "hello", "hello", "hi")).map((_,1))// 数据量较少
rdd.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(20)
// 数据量较大, 用sample采样后在统计
rdd.sample(false, 0.1)
.reduceByKey(_+_)
.sortBy(_._2, false)
.take(20)
  1. 调整shuffle并行度
    原理:Spark在做shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适如比较小,可能造成大量不相同的key对应的数据被分配到了同一个task上,造成该task所处理的数据远大于其它task,从而造成数据倾斜
    在这里插入图片描述

调优建议:

  • 使用spark.default.parallelism调整分区数,默认值200建议500或更大
  • 在shuffle的算子上直接设置分区数,如:a.join(b, 500)、rdd.reduceByKey(_ + _, 500)
  1. reduce join转map join
    原理:不使用join算子直接进行连接操作,而使用broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的出现
    在这里插入图片描述

调优建议:

  • broadcast的数据量不要超过500M, 过大driver/executor可能会oom
// 1.broadcast小表
val rdd1Broadcast = sc.broadcast(rdd1.collect())
// 2.map join
rdd2.map { x =>val rdd1DataMap = rdd1Broadcast.value.toMaprdd1DataMap.get(x._1) match {case Some(v) => (x._1, (x._2, v))case None => (x._1, (x._2, null))}
}
// 2.或者直接
rdd2.join(rdd1Broadcast)
  1. 分拆join在union
    原理:将有数据倾斜的RDD1中倾斜key对应的数据集单独抽取出来加盐(随机前缀),另外一个RDD2每条数据分别与所有的随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者join之后去掉前缀;然后将不包含倾斜key的剩余数据进行join;最后将两次join的结果集通过union合并,即可得到全部join结果。
    在这里插入图片描述

调优建议:

// 1.统计数量最大的key
val skewedKeySet = rdd1.sample(false, 0.2).reduceByKey(_ + _).sortBy(_._2, false).take(10).map(x => x._1).toSet// 2.拆分异常的rdd, 倾斜key加上随机数
val rdd1_1 = rdd1.filter(x => skewedKeySet.contains(x._1)).map { x =>val prefix = scala.util.Random.nextInt(10).toString(s"${prefix}_${x._1}", x._2)
}
val rdd1_2 = rdd1.filter(x => !skewedKeySet.contains(x._1))// 3.正常rdd存在倾斜key的部分进行膨胀
val rdd2_1 = rdd2.filter(x => skewedKeySet.contains(x._1)).flatMap { x =>val list = 0 until 10list.map(i => (s"${i}_${x._1}", x._2))}val rdd2_2 = rdd2.filter(x => !skewedKeySet.contains(x._1))// 4.倾斜key的rdd进行join
val skewedRDD = rdd1_1.join(rdd2_1).map(x => (x._1.split("_")(1), x._2))
// 5.普通key的rdd进行join
val sampleRDD = rdd1_2.join(rdd2_2)
// 6.结果union
skewedRDD.union(sampleRDD)

SQL调优方法

  1. 查看数据分布
    统计某个查询结果或表中出现次数超过200次的key
WITH a AS (${query})
SELECT k,s
FROM (SELECT ${key} AS k,count(*) AS sFROM aGROUP BY ${key}
)
WHERE s > 200
  1. 自动调整shuffle并行度
    原理:自适应执行开启的前提下(AQE),假设我们设置的shuffle partition个数为5,在map stage结束之后,我们知道每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB,那么在运行时,我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB),第二个reducer处理连续的partition 1 到3,共60MB,第三个reducer处理partition 4 (50MB)
    在这里插入图片描述

Spark参数:

参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.coalescePartitions.minPartitionNum自适应执行中使用的最小shuffle后分区数,默认值executor*core数
spark.sql.adaptive.coalescePartitions.initialPartitionNum合并前的初始shuffle分区数量,默认值spark.sql.shuffle.partitions
spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分区到建议的目标值, 默认256m
spark.sql.shuffle.partitionsjoin等操作分区数,默认值200推荐500或更大
  1. 自动优化Join
    原理:自适应执行开启的前提下(AQE),我们可以获得SortMergeJoin两个子stage的数据量,在满足条件的情况下,即一张表小于broadcast阈值,可以将SortMergeJoin转化成BroadcastHashJoin
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.autoBroadcastJoinThreshold默认10M,设置为-1可以禁用广播;实际根据hive表存储的统计信息或文件预估大小与此值做判断看是否做broadcast,由于文件是压缩格式一般情况下此参数并不可靠建议膨胀系数spark.sql.sources.fileCompressionFactor=10推荐此参数保持默认,调整自适应的broadcast参数
spark.sql.adaptive.autoBroadcastJoinThreshold此参数仅影响自适应执行阶段join优化时broadcast阈值;设置为-1可以禁用广播;默认值spark.sql.autoBroadcastJoinThreshold自适应执行得到的数据比较准确,driver内存足够的前提下可以将此值调大如200M
  1. 自动处理数据倾斜
    原理:自适应执行开启的前提下(AQE),我们可以在运行时很容易地检测出有数据倾斜的partition。当执行某个stage时,我们收集该stage每个mapper 的shuffle数据大小和记录条数。如果某一个partition的数据量或者记录条数超过中位数的N倍,并且大于某个预先配置的阈值,我们就认为这是一个数据倾斜的partition,需要进行特殊的处理
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.skewJoin.enabled开启自动解决数据倾斜,默认值true
spark.sql.adaptive.skewJoin.skewedPartitionFactor影响因子,某分区数据大小超过所有分区中位数与影响因子乘积,才会被认为发生了数据倾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes视为倾斜分区的分区数据最小值
http://www.yayakq.cn/news/777582/

相关文章:

  • 做网站需要技术怎么把自己做的网站发布出去
  • 做网站年入多少做外汇的人一般看什么网站
  • 新校区建设网站管理规定网站搭建设计 是什么
  • 免费网站怎么做出来的珠海建设工程信息网站
  • 免费游戏网站制作dede做的网站总被挂马
  • 大城网站建设自己做网站怎么推广
  • 建站公司怎么获客怎么做模板网站的报价表
  • 新网站设计最简单的软件顺德做网站设计的公司
  • 做一个彩票网站需要怎么做店面设计餐饮风格
  • 二手房网站谁做的更好给公司建官网
  • 南沙建设局网站如何提网站建设需求
  • 做电影网站有风险吗软件设计专业学什么
  • 做时彩网站违法吗石家庄网站定制
  • wordpress顺序设置别名seo搜索引擎优化怎么做
  • 网站开发的项目开发厦门seo计费
  • 多语言企业网站源码wordpress 弹幕
  • phpcms网站logo地宝网南昌租房信息
  • 西宁网站建设的公司哪家好微信分销小程序开发
  • 汕头潮阳网站建设东莞长安
  • 网页设计与制作教程第五版课后答案如何优化网站结构
  • 阿里云域名注册证书吉林网络seo
  • 新化 网站开发阿里云域名购买
  • 淘宝运营跟做网站哪种工资高最新注册域名查询
  • 网站建设与维护 出题网站建设年度汇报
  • 怎么把自己做的网站怎么做微信网页制作
  • 一般做公司网站需要哪几点成都医疗seo整站优化
  • 专业提供网站建设服务的企业凯里州建公司简介
  • 泸县手机网站建设有哪些单页网站
  • 音乐网站建设论文的目的和意义电子设计大赛网站开发
  • 学校网站建设价格明细表什么公司做企业网站