现在还有什么网站,汕头制作网站推荐,网站管理程序,建筑人才网官方网站评职称Spark底层执行原理
学习Spark运行流程 学习链接#xff1a;https://mp.weixin.qq.com/s/caCk3mM5iXy0FaXCLkDwYQ
一、Spark运行流程 流程#xff1a;
SparkContext向管理器注册并向资源管理器申请运行Executor资源管理器分配Executor#xff0c;然后资源管理器启动Execut…Spark底层执行原理
学习Spark运行流程 学习链接https://mp.weixin.qq.com/s/caCk3mM5iXy0FaXCLkDwYQ
一、Spark运行流程 流程
SparkContext向管理器注册并向资源管理器申请运行Executor资源管理器分配Executor然后资源管理器启动ExecutorExecutor发送心跳至资源管理器SparkContext构建DAG有向无环图将DAG分解成Stage(TaskSet)把Stage发送给TaskSchedulerExecutor向SparkContext申请TaskTaskScheduler将Task发送给Executor运行同时SparkContext将应用程序代码发给Executor 10.Task在Executor上运行运行完毕后释放所有资源
1.1 从代码角度看DAG图的构建
val session SparkSession.builder().master(local[*]).appName().getOrCreate()
val sc session.sparkContext
val lines1 sc.textFile(inputPath1).map(...).map(...)
val lines2 sc.textFile(inputPath2).map(...)
val lines3 sc.textFile(inputPath3)val dtinone1 lines2.union(lines3)
val dtinone lines1.join(dtinone1)
dtinone.saveAsTextFile(...)
dtinone.filter(...).foreach(...)代码的DAG图 Spark内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图也就是上图所示的DAG。
Spark的计算发生在RDD的Action操作而对Action之前所有的TransformationSpark只是记录下RDD生成的轨迹不会触发真正的计算。
1.2 将DAG划分为Stage核心算法
一个Application可以有多个job多个Stage Spark Application中可以因为有不同的Action触发众多的job一个Application中可以有很多的job每个job是由一个或者多个Stage构成的后面的Stage依赖于前面的Stage也就是只有前面的Stage计算完毕后后面的Stage才会运行。 划分依据 Stage划分的一句是宽依赖像像 reduceByKeygroupByKey 等算子会导致宽依赖的产生。 宽窄依赖划分原则 窄依赖父 RDD 的一个分区只会被子 RDD 的一个分区依赖。即一对一或者多对一的关系可理解为独生子女。常见的窄依赖有map、filter、union、mapPartitions、mapValues、join父 RDD 是 hash-partitioned等。 宽依赖父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。即一对多的关系可理解为超生。常见的宽依赖有 groupByKey、partitionBy、reduceByKey、join父 RDD 不是 hash-partitioned等。 核心算法回溯算法 从后往前回溯/反向解析遇到窄依赖加入本Stage遇到宽依赖进行Stage切分。 Spark内核会从触发Action操作的那个RDD开始从后往前推首先回味最后一个RDD创建要一个Stage然后倒推如果发现对某个RDD是宽依赖那么会将宽依赖的那个RDD创建一个新的Stage那个RDD就是新的Stage的最后一个RDD以此类推继续倒推知道所有的RDD全部遍历完成。
1.3 DAG划分为Stage剖析
具体可见 Spark二、Spark技术栈之Spark Core 第六点。
1.4 提交Stages
调度阶段的提交最终会被转换成一个任务的提交
DAGScheduler通过TaskScheduler接口提交任务集这个任务集最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个任务集的生命周期对于DAGScheduler来说提交调度阶段的工作到此就完成了。
而TaskScheduler的具体实现则会在得到计算资源的时候进一步通过TaskSetManager调度具体的任务到对应的Executor节点上进行运算。
1.5 监控Job、Task、Executor
1.5.1 DAGScheduler监控Job与Task
要保证相互依赖的作业调度阶段能够得到顺利的调度执行DAGScheduler 需要监控当前作业调度阶段乃至任务的完成情况。 通过对外暴露一系列的回调函数实现对于TaskScheduler来说这些回调任务主要包括任务的开始结束失败、任务集的失败DAGScheduler 根据这些任务的生命周期信息进一步维护作业和调度阶段的状态信息。
1.5.2 DAGScheduler 监控 Executor 的生命状态
TaskScheduler 通过回调函数通知 DAGScheduler 具体的 Executor 的生命状态如果某一个 Executor 崩溃了则对应的调度阶段任务集的 ShuffleMapTask 的输出结果也将标志为不可用这将导致对应任务集状态的变更进而重新执行相关计算任务以获取丢失的相关数据。
1.6 获取任务执行结果
1.6.1 结果DAGScheduler
一个具体的任务在 Executor 中执行完毕后其结果需要以某种形式返回给 DAGScheduler根据任务类型的不同任务结果的返回方式也不同。
1.6.2 两种结果中间结果与最终结果
对于 FinalStage 所对应的任务返回给 DAGScheduler 的是运算结果本身。对于中间调度阶段对应的任务 ShuffleMapTask返回给 DAGScheduler 的是一个 MapStatus 里的相关存储信息而非结果本身这些存储位置信息将作为下一个调度阶段的任务获取输入数据的依据。
1.6.3 两种类型DirectTaskResult 与 IndirectTaskResult
按任务结果大小的不同ResultTask返回的结果分成两类
如果结果足够小则直接放在 DirectTaskResult 对象内中。如果超过特定尺寸则在 Executor 端会将 DirectTaskResult 先序列化再把序列化的结果作为一个数据块存放在 BlockManager 中然后将 BlockManager 返回的 BlockID 放在 IndirectTaskResult 对象中返回给 TaskSchedulerTaskScheduler 进而调用 TaskResultGetter 将 IndirectTaskResult 中的 BlockID 取出并通过 BlockManager 最终取得对应的 DirectTaskResult。
1.7 任务调度总体诠释 二、Spark运行架构特点
2.1 Executor进程专属
每个Application获取专属的Executor进程该进程在Application期间一直驻留并以多线程方式运行Tasks。
Spark Application不能跨应用程序共享数据除非将数据写入到外部存储系统如图所示
2.2 支持多种资源管理器
Spark与资源管理器无关只要能够获取Executor进程并能保持互相通信就可以了。 Spark支持支援管理器包括Standalone、On Mesos、On YARN、Or On EC2。如图所示
2.3 Job提交就近原则
提交SparkContext的Client应该靠近Worker节点运行Executor的节点最好在同一个Rack机架里因为Spark Application运行过程中SparkContext和Executor之间有大量的信息交换
如果想在远程集群中运行最好使用RPC将SparkContext提交给集群不要远离Worker运行SparkContext。
2.4 移动程序而非移动数据的原则执行
移动程序而非移动数据的原则知性Task采用了数据本地行和推测知性的优化机制。 关键方法taskIdToLocations、getPreferedLocations。