Spark 内核

Posted by SH on February 18, 2021

Spark内核

一、RDD

RDD是一个容错的并行的数据结构,可以将数据存储到内存和磁盘中,并能控制数据分区,提供丰富API

一般来说,分布式数据集的容错性有两种方式:数据检查点记录数据更新

为了有效地实现容错,RDD本身提供了一种高度受限的共享内存模型,它是只读的记录分区的集合

RDD的读操作可以精确到一条记录,RDD的写操作则是批量的。

RDD底层存储原理

每个RDD的数据都以Block的形式存储在多台机器上。

每个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver上的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD不再需要存储时,将向BlockManagerSlave发送指令删除相应的Block。

img

BlockManager管理RDD的物理分区,每个Block就是节点上对应的一个数据块,可以存储在内存或者磁盘上。而RDD中的Partition是一个逻辑数据块,对应相应的物理块Block。本质上,一个RDD在代码中相当于是数据的一个元数据结构,存储着数据分区及其逻辑结构的映射关系,存储着RDD之前的依赖转换关系。

BlockManager:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package org.apache.spark.storage

private[spark] class BlockManager(
    executorId: String,
    rpcEnv: RpcEnv,
    val master: BlockManagerMaster, // master
    val serializerManager: SerializerManager,
    val conf: SparkConf,
    memoryManager: MemoryManager,
    mapOutputTracker: MapOutputTracker,
    shuffleManager: ShuffleManager,
    val blockTransferService: BlockTransferService,
    securityManager: SecurityManager,
    externalBlockStoreClient: Option[ExternalBlockStoreClient])
  extends BlockDataManager with BlockEvictionHandler with Logging {
      ...
  }

BlockManagerMaster会持有整个Application的Block的位置、Block所占用的存储空间等元数据信息,在Driver的DAGScheduler中就是通过这些信息来确认数据运行的本地化。

RDD五大特性

RDD都有5个主要特性:

  • 分区列表
  • 每个分区都有一个计算函数
  • 依赖于其他RDD的列表
  • Key-Value数据类型的RDD分区器
  • 每个分区都有一个分区位置列表(优先位置)

RDD:

1
2
3
4
5
6
7
8
9
10
11
12
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
    
    protected def getPartitions: Array[Partition] // 返回RDD分区列表
    @DeveloperApi
    def compute(split: Partition, context: TaskContext): Iterator[T] // 分区计算函数
    protected def getDependencies: Seq[Dependency[_]] = deps // 返回依赖列表
    @transient val partitioner: Option[Partitioner] = None // 可选,分区的方法,指定如何分区
    protected def getPreferredLocations(split: Partition): Seq[String] = Nil // 可选,优先位置,输入参数是split分片,输出结果是一组优先的节点位置
}

1. 分区列表:

每一个分区都会被一个计算任务(Task)处理,分区数决定了并行计算的数量,RDD的并行度默认从父RDD传给子RDD。

默认情况下,一个HDFS上的数据分片就是一个partition。可以在创建RDD是指定分片个数,如果不指定,从集合创建时,默认分区数量为该程序所分配到的资源的CPU核数;如果从HDFS文件创建,默认为文件的Block数。

2. 每个分区都有一个计算函数:

实现compute函数,对具体的分片进行计算。

3. 依赖于其他RDD的列表:

宽依赖、窄依赖。

4. Key-Value数据类型的RDD分区器:

每个Key-Value形式的RDD都有Partitioner属性;它决定了RDD如何分区。

5. 每个分区都有一个优先位置列表:

优先位置列表存储每个Partition的优先位置,对于一个HDFS文件来说,就是每个Partition块的位置。在具体计算、具体分片以前,Spark已经知道任务发生在哪个节点上,也就是说任务本身是计算层面的、代码层面的,代码发生运算之前它就已经知道它要运算的数据在什么地方,有具体节点的信息。(数据不动代码动)

RDD弹性特性的7个方面

1. 自动进行内存和磁盘数据存储的切换

2. 基于Lineage血缘的高效容错机制

3. Task如果失败会自动进行特定次数的重试

4. Stage如果失败会自动进行特定次数的重试

5. Checkpoint和Persist,可主动或被动触发

6. 数据调度弹性(DAGScheduler、TaskScheduler和资源管理无关)

7. 数据分片的高度弹性(coalesce)

二、部署流程(Yarn)

Yarn Cluster模式部署流程:

img

执行spark-submit脚本最终会执行java SparkSubmit -xxx -xxx -xxx命令,最终形成一个JVM - Process(SparkSubmit)进程,并执行main方法。

1
2
3
4
5
6
bin/spark-submit \
-class org.apache.spark.examples.SparkPi \
-master yarn \
-deploymode cluster \
./examples/jars/spark-examples_2.11-2.11.jar \
100

SparkSubmit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1. SparkSubmit

	// 启动进程
    -- main
    
    	// 封装参数
        -- new SparkSubmitArguments
        
        // 提交
        -- submit
        
            // 准备提交环境
            -- prepareSubmitEnvironment
            
                // Yarn Cluster(主要)
                -- childMainClass = "org.apache.spark.deploy.yarn.Client"
                // Client
                -- childMainClass = args.mainClass (SparkPi)
            
            -- doRunMain (runMain)
            
                // 反射加载类
                -- Utils.classForName(childMainClass)
                // 查找main方法
                -- mainClass.getMethod("main", new Array[String](0).getClass)
                // 调用main方法
                -- mainMethod.invoke

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  2. org.apache.spark.deploy.yarn.Client

    -- main
    	// 封装参数
        -- new ClientArguments(argStrings)
        
		// new Client(args, sparkConf).run()
        -- new Client
        
            -- yarnClient = YarnClient.createYarnClient
        
        -- client.run
                
                -- this.appId = submitApplication
                	-- launcherBackend.connect() // Backend:后台

                    // 封装指令 
                    -- createContainerLaunchContext
                    
                    	// Cluster
                    	-- command = bin/java org.apache.spark.deploy.yarn.ApplicationMaster
                    	// Client
                    	-- command = bin/java org.apache.spark.deploy.yarn.ExecutorLauncher

                    -- createApplicationSubmissionContext
                
                    // 向Yarn提交应用,提交指令
                    -- yarnClient.submitApplication(appContext)

ApplicationMaster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
3. ApplicationMaster
    
    // 启动进程
    -- main
    	// 封装参数
        -- new ApplicationMasterArguments(args)
        
        // 创建应用管理器对象
        -- new ApplicationMaster(amArgs, new YarnRMClient) // RM
        	-- heartbeatInterval // 心跳周期
        	-- rpcEnv // RPC
        	-- amEndpoint // 终端
        
        // 运行
        -- master.run
        
            // Cluster (否则 runExecutorLauncher)
            -- runDriver
            
                // 启动用户应用(--class SparkPi)
                -- startUserApplication
                
                    // 获取用户应用的类的main方法
                    -- userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
      
                    // 启动Driver线程,执行用户类的main方法,
                    -- new Thread().start()
                    
                // 注册AM
                -- registerAM
                
                    // 获取yarn资源
                    -- allocator = client.register
                    
                    // 分配资源
                    -- allocator.allocateResources()
                    
                    	// 处理资源(数据本地化:node-local、rack-local)
                        -- handleAllocatedContainers
                        	// 运行所有容器
                            -- runAllocatedContainers
                            	-- launcherPool // 线程池

                                 -- new ExecutorRunnable(
                                     	-- rpc // RPC
                                     	-- nmClient // NM
                                     ).run(
                                     	-- startContainer
                                     		-- prepareCommand
                                        	
                                        	-- command = bin/java org.apache.spark.executor.CoarseGrainedExecutorBackend // 后台交互:ExecutorBackend
                                 	)

CoarseGrainedExecutorBackend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
4. CoarseGrainedExecutorBackend
	
	-- main
	
		-- run
		
			// RPC Endpoint
			-- env.rpcEnv.setupEndpoint("Executor",) 
				-- onStart
				
					// 反向注册到Driver
					-- ref.ask[Boolean](RegisterExecutor())
                
                -- receive
                
                	-- case RegisteredExecutor
                		-- new Executor
                		
                	-- case LaunchTask
                		-- executor.launchTask

时序图:

img

三、RPC

Spark2.x版本使用Netty通讯框架作为内部通讯组件。spark基于netty新的rpc框架借鉴了Akka的中的设计,它是基于Actor模型,如下图所示:

img

Spark通讯框架中各个组件(Client/Master/Worker)可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。具体各个组件之间的关系图如下:

img

Endpoint(Client/Master/Worker)有1个InBox和N个OutBox(N>=1,N取决于当前Endpoint与多少其他的Endpoint进行通信,一个与其通讯的其他Endpoint对应一个OutBox),Endpoint接收到的消息被写入InBox,发送出去的消息写入OutBox并被发送到其他Endpoint的InBox中。

img

相关类:

  • Backend:后台
    • Driver:CoarseGrainedSchedulerBackend
    • Executor:CoarseGrainedExecutorBackend
  • RpcEndpoint:RPC端点,Spark针对每个节点(Client/Master/Worker)都称之为一个Rpc端点,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher;

  • RpcEnv:RPC上下文环境,每个RPC端点运行时依赖的上下文环境称为RpcEnv;

  • Dispatcher:消息分发器,针对于RPC端点需要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;

  • Inbox:指令消息收件箱,一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;

  • RpcEndpointRef:RpcEndpointRef是对远程RpcEndpoint的一个引用。当需要向一个具体的RpcEndpoint发送消息时,一般需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。

  • OutBox:指令消息发件箱,对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;

  • RpcAddress:表示远程的RpcEndpointRef的地址,Host + Port。

  • TransportClient:Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;

  • TransportServer:Netty通信服务端,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱;

四、任务调度机制

概述

img

通过其Transactions操作,形成了RDD血缘关系图,即DAG,最后通过Action的调用,触发Job并调度执行。DAGScheduler负责Stage级的调度,主要是将job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统

img

Job提交和Task拆分:

1576411924489

Driver初始化SparkContext过程中,会分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并启动SchedulerBackend以及HeartbeatReceiver。SchedulerBackend通过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor执行。HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活状况,并通知到TaskScheduler。

Stage调度

Spark的任务调度是从DAG切割开始,主要是由DAGScheduler来完成。当遇到一个Action操作后就会触发一个Job的计算,并交给DAGScheduler来提交,下图是涉及到Job提交的相关方法调用流程图。

img

Job由最终的RDD和Action方法封装而成,SparkContext将Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是,由最终的RDD不断通过依赖回溯判断父依赖是否是宽依赖,即以Shuffle为界,划分Stage,窄依赖的RDD之间被划分到同一个Stage中,可以进行pipeline式的计算,如上图紫色流程部分。划分的Stages分两类,一类叫做ResultStage,为DAG最下游的Stage,由Action方法决定,另一类叫做ShuffleMapStage,为下游Stage准备数据。

WordCount例子:

1576412234857

一个Stage是否被提交,需要判断它的父Stage是否执行,只有在父Stage执行完毕才能提交当前Stage,如果一个Stage没有父Stage,那么从该Stage开始提交。Stage提交时会将Task信息(分区信息以及方法等)序列化并被打包成TaskSet交给TaskScheduler,一个Partition对应一个Task,另一方面TaskScheduler会监控Stage的运行状态,只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务,其他类型的Task失败会在TaskScheduler的调度过程中重试。

相对来说DAGScheduler做的事情较为简单,仅仅是在Stage层面上划分DAG,提交Stage并监控相关状态信息。TaskScheduler则相对较为复杂。

Task调度

SparkTask的调度是由TaskScheduler来完成,DAGScheduler将Stage打包到TaskSet交给TaskScheduler,TaskScheduler会将TaskSet封装为TaskSetManager加入到调度队列中。

TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。

TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态,所以说SchedulerBackend是管“粮食”的,同时它在启动后会定期地去“询问”TaskScheduler有没有任务要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余量,你要不要啊”,TaskScheduler在SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行,大致方法调用流程如下图所示:

img

将TaskSetManager加入rootPool调度池中之后,调用SchedulerBackend的riviveOffers方法给driverEndpoint发送ReviveOffer消息;driverEndpoint收到ReviveOffer消息后调用makeOffers方法,过滤出活跃状态的Executor(这些Executor都是任务启动时反向注册到Driver的Executor),然后将Executor封装成WorkerOffer对象;准备好计算资源(WorkerOffer)后,taskScheduler基于这些资源调用resourceOffer在Executor上分配task。

调度策略

TaskScheduler是以树的方式来管理任务队列,树中的节点类型为Schdulable,叶子节点为TaskSetManager,非叶子节点为Pool。

TaskScheduler支持两种调度策略,一种是FIFO,也是默认的调度策略,另一种是FAIR。在TaskScheduler初始化过程中会实例化rootPool,表示树的根节点,是Pool类型。

  • FIFO调度策略

如果是采用FIFO调度策略,则直接简单地将TaskSetManager按照先来先到的方式入队,出队时直接拿出最先进队的TaskSetManager,其树结构如下图所示,TaskSetManager保存在一个FIFO队列中。

  • FAIR调度策略

img

FAIR模式中有一个rootPool和多个子Pool,各个子Pool中存储着所有待分配的TaskSetMagager。

在FAIR模式中,需要先对子Pool进行排序,再对子Pool里面的TaskSetMagager进行排序,因为Pool和TaskSetMagager都继承了Schedulable特质,因此使用相同的排序算法

排序过程的比较是基于Fair-share来比较的,每个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值,minShare值以及weight值。

注意,minShare、weight的值均在公平调度配置文件fairscheduler.xml中被指定,调度池在构建阶段会读取此文件的相关配置。

1) 如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,那么B排在A前面;(runningTasks比minShare小的先执行

2) 如果A、B对象的runningTasks都小于它们的minShare,那么就比较runningTasks与minShare的比值(minShare使用率),谁小谁排前面;(minShare使用率低的先执行

3) 如果A、B对象的runningTasks都大于它们的minShare,那么就比较runningTasks与weight的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行

4) 如果上述比较均相等,则比较名字。

整体上来说就是通过minShare和weight这两个参数控制比较过程,可以做到让minShare使用率和权重使用率少(实际运行task比例较少)的先运行

FAIR模式排序完成后,所有的TaskSetManager被放入一个ArrayBuffer里,之后依次被取出并发送给Executor执行。

从调度队列中拿到TaskSetManager后,由于TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。

本地化调度

DAGScheduler切割Job,划分Stage, 通过调用submitStage来提交一个Stage对应的tasks,submitStage会调用submitMissingTasks,submitMissingTasks确定每个需要计算的task 的preferredLocations,通过调用getPreferrdeLocations()得到partition的优先位置,由于一个partition对应一个task,此partition的优先位置就是task的优先位置,对于要提交到TaskScheduler的TaskSet中的每一个task,该task优先位置与其对应的partition对应的优先位置一致。

根据每个task的优先位置,确定task的Locality级别,Locality一共有五种,优先级由高到低顺序:

名称 解析
PROCESS_LOCAL 进程本地化,task和数据在同一个Executor中,性能最好
NODE_LOCAL 节点本地化,task和数据在同一节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输
RACK_LOCAL 机架本地化,task和数据在同一机架的两个节点上,数据需要通过网络在节点之间进行传输
NO_PREF 对于task来说,从哪里获取都一样,没有好坏之分
ANY task和数据可以在集群的任何地方,而且不在一个机架中,性能最差

延迟调度策略:

在调度执行时,Spark调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以X本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以X本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推

可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的Executor可能就会有相应的资源去执行此task,这就在在一定程度上提到了运行性能。

失败重试与黑名单机制

除了选择合适的Task调度运行外,还需要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败。

在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task了。

五、Shuffle机制

  • 大多数Spark作业的性能主要消耗在shuffle环节(磁盘IO、序列化、网络传输);
  • 什么时候shuffle:有宽依赖才会出现shuffle

  • Shuffle Write、Shuffle Read;

  • 两种机制:HashShuffle、SortShuffle,Spark1.2之前用HashShuffle,之后用SortShuffleManager了;
    • HashShuffle:产生大量中间磁盘文件;
    • SortShuffle:合并临时文件;
      • SortShuffle两种机制:普通运行机制bypass运行机制

在划分stage时,最后一个stage称为finalStage,它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。

  • ShuffleMapStage的结束伴随着shuffle文件的写磁盘

  • ResultStage基本上对应代码中的action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job的运行结束。

Shuffle中的任务个数

Spark Shuffle分为map阶段和reduce阶段,或者称之为ShuffleRead阶段和ShuffleWrite阶段,那么对于一次Shuffle,map过程和reduce过程都会由若干个task来执行,那么map task和reduce task的数量是如何确定的呢?

假设Spark任务从HDFS中读取数据,那么初始RDD分区个数由该文件的split个数决定,也就是一个split对应生成的RDD的一个partition,假设初始partition个数为N。

初始RDD经过一系列算子计算后(假设没有执行repartition和coalesce算子进行重分区,则分区个数不变,仍为N,如果经过重分区算子,那么分区个数变为M),假设分区个数不变,当执行到Shuffle操作时,map端的task个数和partition个数一致,即map task为N个。

reduce端的stage默认取spark.default.parallelism这个配置项的值作为分区数,如果没有配置,则以map端的最后一个RDD的分区数作为其分区数(也就是N),那么分区数就决定了reduce端的task的个数。

reduce端数据的读取

map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task会先执行,那么后执行的reduce task如何知道从哪里去拉取map task落盘后的数据呢?

reduce端的数据拉取过程如下:

  1. map task 执行完毕后会将计算状态以及磁盘小文件位置等信息封装到MapStatus对象中,然后由本进程中的MapOutPutTrackerWorker对象将mapStatus对象发送给Driver进程的MapOutPutTrackerMaster对象;

  2. 在reduce task开始执行之前会先让本进程中的MapOutputTrackerWorker向Driver进程中的MapoutPutTrakcerMaster发动请求,请求磁盘小文件位置信息;

  3. 当所有的Map task执行完毕后,Driver进程中的MapOutPutTrackerMaster就掌握了所有的磁盘小文件的位置信息。此时MapOutPutTrackerMaster会告诉MapOutPutTrackerWorker磁盘小文件的位置信息;

  4. 完成之前的操作之后,由BlockTransforService去Executor所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过48M(reduce task每次最多拉取48M数据,将拉来的数据存储到Executor内存的20%内存中)。

HashShuffle

未经优化的HashShuffleManager

shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“划分”。所谓“划分”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

shuffle read阶段,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,map task给下游stage的每个reduce task都创建了一个磁盘文件,因此shuffle read的过程中,每个reduce task只要从上游stage的所有map task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

未优化的HashShuffleManager工作原理:

img

优化的HashShuffleManager

为了优化HashShuffleManager可以设置一个参数,spark.shuffle. consolidateFiles,该参数默认值为false,将其设置为true即可开启优化机制,通常来说,如果使用HashShuffleManager,那么建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了,此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件,也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor(Executor CPU个数为1),每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量,也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

优化后的HashShuffleManager工作原理:

img

SortShuffle

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort. bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

普通运行机制

在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

普通运行机制的SortShuffleManager工作原理:

img

bypass运行机制

bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值;

  • 不是聚合类的shuffle算子。

此时,每个task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

bypass运行机制的工作原理:

img

Shuffle参数调优

  • spark.shuffle.file.buffer:默认32k,缓冲大小,可适当调大(64k)减少IO次数;
  • spark.reducer.maxSizeInFlight:默认48m,shuffle read task的缓冲大小,决定了每次能拉取多少数据,可适当调大(96m)减少拉取次数(网络传输次数);
  • spark.shuffle.io.maxRetries:默认3,shuffle read task从write task所在节点拉取数据失败自动重试次数,可适当调大;
  • spark.shuffle.io.retryWait:默认5s,重试等待时间,可适当调大;

六、SparkSQL Join

https://www.cnblogs.com/jmx-bigdata/p/14021183.html

  • 影响JOIN操作的因素
  • Spark中JOIN执行的5种策略
  • Spark是如何选择JOIN策略的

影响JOIN操作的因素

数据集的大小

参与JOIN的数据集的大小会直接影响Join操作的执行效率。同样,也会影响JOIN机制的选择和JOIN的执行效率。

JOIN的条件

JOIN的条件会涉及字段之间的逻辑比较。根据JOIN的条件,JOIN可分为两大类:等值连接非等值连接。等值连接会涉及一个或多个需要同时满足的相等条件。在两个输入数据集的属性之间应用每个等值条件。当使用其他运算符(运算连接符不为=)时,称之为非等值连接。

JOIN的类型

在输入数据集的记录之间应用连接条件之后,JOIN类型会影响JOIN操作的结果。主要有以下几种JOIN类型:

  • 内连接(Inner Join):仅从输入数据集中输出匹配连接条件的记录。
  • 外连接(Outer Join):又分为左外连接、右外链接和全外连接。
  • 半连接(Semi Join):右表只用于过滤左表的数据而不出现在结果集中。
  • 交叉连接(Cross Join):交叉联接返回左表中的所有行,左表中的每一行与右表中的所有行组合。交叉联接也称作笛卡尔积。

Spark的五种JOIN策略

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

Spark是如何选择JOIN策略的

等值连接的情况

有join提示(hints)的情况,按照下面的顺序

  • 1.Broadcast Hint:如果join类型支持,则选择broadcast hash join
  • 2.Sort merge hint:如果join key是排序的,则选择 sort-merge join
  • 3.shuffle hash hint:如果join类型支持, 选择 shuffle hash join
  • 4.shuffle replicate NL hint: 如果是内连接,选择笛卡尔积方式

没有join提示(hints)的情况,则逐个对照下面的规则

  • 1.如果join类型支持,并且其中一张表能够被广播(spark.sql.autoBroadcastJoinThreshold值,默认是10MB),则选择 broadcast hash join
  • 2.如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(可以构建一个hash map) ,则选择shuffle hash join
  • 3.如果join keys 是排序的,则选择sort-merge join
  • 4.如果是内连接,选择 cartesian join
  • 5.如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join

非等值连接情况

有join提示(hints),按照下面的顺序

  • 1.broadcast hint:选择broadcast nested loop join.
  • 2.shuffle replicate NL hint: 如果是内连接,则选择cartesian product join

没有join提示(hints),则逐个对照下面的规则

  • 1.如果一张表足够小(可以被广播),则选择 broadcast nested loop join
  • 2.如果是内连接,则选择cartesian product join
  • 3.如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join

七、内存管理

  • MemoryManager:StaticMemoryManagerUnifiedMemoryManager(Spark1.6后);
  • 堆内、堆外;
  • 堆内:Storage存储内存、Execution执行内存、Other其他、预留内存;
    • Storage:用于缓存RDD(核心)、展开partition、存放广播变量;
    • Execution:用于shuffle、join、sort、aggregation等操作中的缓存buffer;
    • Other:用户数据结构、spark内部元数据、防止OOM,不受MemoryManager管理;
  • 比例设置:spark.memory.fraction、spark.memory.storageFraction;

Driver的内存管理相对来说较为简单,主要是 Executor 的内存管理分析。

堆内和堆外内存规划

作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。

堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放。

堆内内存

堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不同的管理模式下,这三部分占用的空间大小各不相同。

Spark 对堆内内存的管理是一种逻辑上的”规划式”的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存,我们来看其具体流程:

JVM 的对象可以以序列化的方式存储,序列化的过程是将对象转换为二进制字节流,本质上可以理解为将非连续空间的链式存储转化为连续空间或块存储,在访问时则需要进行序列化的逆过程——反序列化,将字节流转化为对象,序列化的方式可以节省存储空间,但增加了存储和读取时候的计算开销。

对于 Spark 中序列化的对象,由于是字节流的形式,其占用的内存大小可直接计算,而对于非序列化的对象,其占用的内存是通过周期性地采样近似估算而得,即并不是每次新增的数据项都会计算一次占用的内存大小,这种方法降低了时间开销但是有可能误差较大,导致某一时刻的实际内存有可能远远超出预期。此外,在被 Spark 标记为释放的对象实例,很有可能在实际上并没有被 JVM 回收,导致实际可用的内存小于 Spark 记录的可用内存。所以 Spark 并不能准确记录实际可用的堆内内存,从而也就无法完全避免内存溢出(OOM, Out of Memory)的异常。

虽然不能精准控制堆内内存的申请和释放,但 Spark 通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在一定程度上可以提升内存的利用率,减少异常的出现。

堆外内存

为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。

堆外内存使用JVM堆以外的内存,不会被gc回收,可以减少频繁的full gc,所以在Spark程序中,会长时间逗留再Spark程序中的大内存对象可以使用堆外内存存储。

堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。

利用 JDK Unsafe API(从 Spark 2.0 开始,在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放(堆外内存之所以能够被精确的申请和释放,是由于内存的申请和释放不再通过JVM机制,而是直接向操作系统申请,JVM对于内存的清理是无法准确指定时间点的,因此无法实现精确的释放),而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。

默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

内存空间分配

静态内存管理

在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如图所示:

img

堆内内存计算方式:

1
2
可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction
可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction

其中 systemMaxMemory 取决于当前 JVM 堆内内存的大小,最后可用的执行内存或者存储内存要在此基础上与各自的 memoryFraction 参数和 safetyFraction 参数相乘得出。上述计算公式中的两个 safetyFraction 参数,其意义在于在逻辑上预留出 1-safetyFraction 这么一块保险区域,降低因实际内存超出当前预设范围而导致 OOM 的风险(上文提到,对于非序列化对象的内存采样估算会产生误差)。值得注意的是,这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并没有区别对待,和”其它内存”一样交给了 JVM 去管理。

Storage内存和Execution内存都有预留空间,目的是防止OOM,因为Spark堆内内存大小的记录是不准确的,需要留出保险区域。

堆外的空间分配较为简单,只有存储内存和执行内存,如图所示:

img

可用的执行内存和存储内存占用的空间大小直接由参数spark.memory.storageFraction 决定,由于堆外内存占用的空间可以被精确计算,所以无需再设定保险区域。

静态内存管理机制实现起来较为简单,但如果用户不熟悉 Spark 的存储机制,或没有根据具体的数据规模和计算任务或做相应的配置,很容易造成”一半海水,一半火焰”的局面,即存储内存和执行内存中的一方剩余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。由于新的内存管理机制的出现,这种方式目前已经很少有开发者使用,出于兼容旧版本的应用程序的目的,Spark 仍然保留了它的实现。

统一内存管理

Spark1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,统一内存管理的堆内内存结构如图所示:

img

堆外内存:

img

其中最重要的优化在于动态占用机制,其规则如下:

  1. 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围;

  2. 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)

  3. 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间;

  4. 存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。

统一内存管理的动态占用机制如图所示:

img

凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的。所以要想充分发挥 Spark 的性能,需要进一步了解存储内存和执行内存各自的管理方式和实现原理。

存储内存管理

RDD的持久化机制

cache 方法是使用默认的MEMORY_ONLY 的存储级别将 RDD 持久化到内存,故缓存是一种特殊的持久化(cache算子底层调用persist算子,默认使用内存级别)。 堆内和堆外存储内存的设计,便可以对缓存 RDD 时使用的内存做统一的规划和管理

RDD 的持久化由 Spark 的 Storage 模块负责,实现了 RDD 与物理存储的解耦合。Storage 模块负责管理 Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时 Driver 端和 Executor 端的 Storage 模块构成了主从式的架构,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。

Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个 Partition 经过处理后唯一对应一个 Block(BlockId 的格式为 rdd_RDD-ID_PARTITION-ID )。Driver端的Master 负责整个 Spark 应用程序的 Block 的元数据信息的管理和维护,而Executor端的 Slave 需要将 Block 的更新等状态上报到 Master,同时接收 Master 的命令,例如新增或删除一个 RDD。

img

在对 RDD 持久化时,Spark 规定了MEMORY_ONLY、MEMORY_AND_DISK 等 7 种不同的存储级别 ,而存储级别是以下 5 个变量的组合:

1
2
3
4
5
6
7
class StorageLevel private(
    private var _useDisk: Boolean, //磁盘
    private var _useMemory: Boolean, //这里其实是指堆内内存
    private var _useOffHeap: Boolean, //堆外内存
    private var _deserialized: Boolean, //是否为非序列化
    private var _replication: Int = 1 //副本个数
)
持久化级别 含义
MEMORY_ONLY 以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它们的时候,重新被计算
MEMORY_AND_DISK 同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取
MEMORY_ONLY_SER 同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销
MEMORY_AND_DISK_SER 同MEMORY_AND_DISK,但是使用序列化方式持久化Java对象
DISK_ONLY 使用非序列化Java对象的方式持久化,完全存储到磁盘上
MEMORY_ONLY_2 MEMORY_AND_DISK_2 如果是尾部加了2的持久化级别,表示将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可

可以看出存储级别从三个维度定义了 RDD 的 Partition(同时也就是 Block)的存储方式:

1) 存储位置:磁盘/堆内内存/堆外内存。如 MEMORY_AND_DISK 是同时在磁盘和堆内内存上存储,实现了冗余备份。OFF_HEAP 则是只在堆外内存存储,目前选择堆外内存时不能同时存储到其他位置。

2) 存储形式:Block 缓存到存储内存后,是否为非序列化的形式。如 MEMORY_ONLY 是非序列化方式存储,OFF_HEAP 是序列化方式存储。

3) 副本数量:大于 1 时需要远程冗余备份到其他节点。如 DISK_ONLY_2 需要远程备份 1 个副本。

RDD缓存过程

RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器(Iterator)的数据结构来访问,这是 Scala 语言中一种遍历数据集合的方法。通过 Iterator 可以获取分区中每一条序列化或者非序列化的数据项(Record),这些 Record 的对象实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一Partition的不同Record的存储空间并不连续

RDD 在缓存到存储内存之后,Partition被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。将Partition由不连续的存储空间转换为连续存储空间的过程,Spark称之为”展开”(Unroll)

Block 有序列化和非序列化两种存储格式,具体以哪种方式取决于该 RDD 的存储级别。非序列化的 Block 以一种 DeserializedMemoryEntry 的数据结构定义,用一个数组存储所有的对象实例,序列化的 Block 则以 SerializedMemoryEntry的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个 Executor的 Storage 模块用一个链式 Map 结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的 Block 对象的实例,对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。

因为不能保证存储空间可以一次容纳 Iterator中的所有数据,当前的计算任务在 Unroll时要向 MemoryManager申请足够的 Unroll空间来临时占位,空间不足则 Unroll失败,空间足够时可以继续进行

对于序列化的 Partition,其所需的 Unroll 空间可以直接累加计算,一次申请。

对于非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record,采样估算其所需的 Unroll 空间并进行申请,空间不足时可以中断,释放已占用的 Unroll 空间。

如果最终 Unroll 成功,当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存储空间,如下图所示:

img

在静态内存管理时,Spark 在存储内存中专门划分了一块Unroll 空间,其大小是固定的,统一内存管理时则没有对Unroll 空间进行特别区分,当存储空间不足时会根据动态占用机制进行处理。

淘汰与落盘

由于同一个 Executor的所有的计算任务共享有限的存储内存空间,当有新的 Block需要缓存但是剩余空间不足且无法动态占用时,就要对 LinkedHashMap中的旧 Block进行淘汰(Eviction),而被淘汰的 Block如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该 Block。

存储内存的淘汰规则为:

  • 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存;

  • 新旧 Block 不能属于同一个 RDD,避免循环淘汰;

  • 旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题;

  • 遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新 Block 所需的空间。其中 LRU 是 LinkedHashMap 的特性。

落盘的流程则比较简单,如果其存储级别符合 _useDisk 为 true 的条件,再根据其_deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在 Storage 模块中更新其信息。

执行内存管理

执行内存主要用来存储任务在执行 Shuffle 时占用的内存,包含Write和Read两阶段对执行内存的使用。

Shuffle Write

1) 若在 map 端选择普通的排序方式,会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。

2) 若在 map 端选择 Tungsten 的排序方式,则采用 ShuffleExternalSorter 直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。

Shuffle Read

1) 在对 reduce 端的数据进行聚合时,要将数据交给 一个包含地域列的文件,如何把每个省份的数据单独输出到一个文件?处理,在内存中存储数据时占用堆内执行空间。

2) 如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间。

在 ExternalSorter 和 Aggregator 中,Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据,但在 Shuffle 过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从 MemoryManager 申请到新的执行内存时,Spark 就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。

Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 使用的计划(钨丝计划),解决了一些 JVM 在性能上的限制和弊端。Spark 会根据 Shuffle 的情况来自动选择是否采用 Tungsten 排序。

Tungsten 采用的页式内存管理机制建立在 MemoryManager 之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。

每个内存页用一个 MemoryBlock 来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。

堆内的 MemoryBlock 是以 long 型数组的形式分配的内存,其 obj 的值为是这个数组的对象引用,offset 是 long 型数组的在 JVM 中的初始偏移地址,两者配合使用可以定位这个数组在堆内的绝对地址;堆外的 MemoryBlock 是直接申请到的内存块,其 obj 为 null,offset 是这个内存块在系统内存中的 64 位绝对地址。Spark用 MemoryBlock巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个 Task申请到的内存页

Tungsten 页式管理下的所有内存用 64 位的逻辑地址表示,由页号和页内偏移量组成:

  • 页号:占 13 位,唯一标识一个内存页,Spark 在申请内存页之前要先申请空闲页号。

  • 页内偏移量:占 51 位,是在使用内存页存储数据时,数据在页内的偏移地址。

有了统一的寻址方式,Spark 可以用 64 位逻辑地址的指针定位到堆内或堆外的内存,整个 Shuffle Write 排序的过程只需要对指针进行排序,并且无需反序列化,整个过程非常高效,对于内存访问效率和 CPU 使用效率带来了明显的提升。

Spark 的存储内存和执行内存有着截然不同的管理方式:

  • 对于存储内存来说,Spark 用一个 LinkedHashMap 来集中管理所有的 Block,Block 由需要缓存的 RDD 的 Partition 转化而成;
  • 而对于执行内存,Spark 用 AppendOnlyMap 来存储 Shuffle 过程中的数据,在 Tungsten 排序中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制。

OOM

Spark面对OOM问题的解决方法及优化总结

  • 哪些区域会OOM?
    • Storage?Execution?
      • OOM的问题通常出现在execution这块内存中,因为storage这块内存在存放数据满了之后,会直接丢弃内存中旧的数据,对性能有影响但是不会有OOM的问题。
    • Other?Other-用户自定义数据结构?Other-spark内部元数据?
  • 每个区域的OOM分别应该如何解决?

八、核心组件

BlockManager

BlockManager是整个Spark底层负责数据存储与管理的一个组件,Driver和Executor的所有数据都由对应的BlockManager进行管理。

Driver上有BlockManagerMaster,负责对各个节点上的BlockManager内部管理的数据的元数据进行维护,比如block的增删改等操作,都会在这里维护好元数据的变更。

每个节点都有一个BlockManager,每个BlockManager创建之后,第一件事即使去向BlockManagerMaster进行注册,此时BlockManagerMaster会为其长难句对应的BlockManagerInfo。

BlockManager运行原理如下图所示:

img

BlockManagerMaster与BlockManager的关系非常像NameNode与DataNode的关系,BlockManagerMaster中保存中BlockManager内部管理数据的元数据,进行维护,当BlockManager进行Block增删改等操作时,都会在BlockManagerMaster中进行元数据的变更,这与NameNode维护DataNode的元数据信息,DataNode中数据发生变化时NameNode中的元数据信息也会相应变化是一致的。

每个节点上都有一个BlockManager,BlockManager中有3个非常重要的组件:

  • DiskStore:负责对磁盘数据进行读写;

  • MemoryStore:负责对内存数据进行读写;

  • BlockTransferService:负责建立BlockManager到远程其他节点的BlockManager的连接,负责对远程其他节点的BlockManager的数据进行读写;

每个BlockManager创建之后,做的第一件事就是想BlockManagerMaster进行注册,此时BlockManagerMaster会为其创建对应的BlockManagerInfo。

使用BlockManager进行写操作时,比如说,RDD运行过程中的一些中间数据,或者我们手动指定了persist(),会优先将数据写入内存中,如果内存大小不够,会使用自己的算法,将内存中的部分数据写入磁盘;此外,如果persist()指定了要replica,那么会使用BlockTransferService将数据replicate一份到其他节点的BlockManager上去。

使用BlockManager进行读操作时,比如说,shuffleRead操作,如果能从本地读取,就利用DiskStore或者MemoryStore从本地读取数据,但是本地没有数据的话,那么会用BlockTransferService与有数据的BlockManager建立连接,然后用BlockTransferService从远程BlockManager读取数据;例如,shuffle Read操作中,很有可能要拉取的数据在本地没有,那么此时就会到远程有数据的节点上,找那个节点的BlockManager来拉取需要的数据。

只要使用BlockManager执行了数据增删改的操作,那么必须将Block的BlockStatus上报到BlockManagerMaster,在BlockManagerMaster上会对指定BlockManager的BlockManagerInfo内部的BlockStatus进行增删改操作,从而达到元数据的维护功能。

共享变量底层实现

默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中,此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。

Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量)。Broadcast Variable会将用到的变量,仅仅为每个节点拷贝一份,即每个Executor拷贝一份,更大的用途是优化性能,减少网络传输以及内存损耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。Broadcast Variable是共享读变量,task不能去修改它,而Accumulator可以让多个task操作一个变量。

广播变量

广播变量允许编程者在每个Executor上保留外部数据的只读变量,而不是给每个任务发送一个副本。

img

Spark还尝试使用高效的广播算法分发广播变量,以降低通信成本。

在任务运行时,Executor并不获取广播变量,当task执行到 使用广播变量的代码时,会向Executor的内存中请求广播变量;之后Executor会通过BlockManager向Driver拉取广播变量,然后提供给task进行使用。

累加器

Accumulator是存在于Driver端的,集群上运行的task进行Accumulator的累加,随后把值发到Driver端,在Driver端汇总(Spark UI在SparkContext创建时被创建,即在Driver端被创建,因此它可以读取Accumulator的数值),由于Accumulator存在于Driver端,从节点读取不到Accumulator的数值。

Accumulator提供了多个task对于同一个变量并行操作的功能,但是task只能对Accumulator进行累加操作,不能读取它的值,只有Driver程序可以读取Accumulator的值。

img

九、常见问题

shuffle FetchFailedException

分析:

shuffle分为shuffle write和shuffle read两部分。 shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。 shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。 如果shuffle read的量很大,那么将会导致一个task需要处理的数据非常大,从而导致JVM crash以及取shuffle数据失败,最后executor也丢失了,看到Failed to connect to host的错误(executor lost)或者造成长时间的gc。

解决方案:

(a) 减少shuffle数据和操作 思考是否可以使用map side join或是broadcast join来规避shuffle的产生。 将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

(b) 控制分区数 对于SparkSQL和DataFrame的join,group by等操作,通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。 对于Rdd的join,groupBy,reduceByKey等操作,通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。

(c)提高executor的内存 通过spark.executor.memory适当提高executor的memory值。

(d)增加并行task的数目 通过增加并行task的数目,从而减小每个task的数据量。

(e)查看是否存在数据倾斜的问题 是否存在某个key数据特别大导致倾斜?如果存在可以单独处理或者考虑改变数据分区规则。