River IQ

Dynamic Allocation in Spark

  Ashish Kumar      Spark August 26, 2018
Image

Why spark is faster than MapReduce?


Here today I will give you deep dive about Spark Resource Allocation (Static and dynamic allocation of resources).Whenever this question arose, we have come up with below explanation that Spark does in-memory processing of data or it does better or effective utilization of YARN resources than MapReduce.

How and when dynamic allocation of resource will give faster and effective utilization of resources.

Effective utilization of cluster or yarn memory.


What is Executors?


Before we start talking about static and dynamic allocation of resource and why and where dynamic allocation matters, it very necessary to understand resources and processes units and how to calculate right number of resources (executors) or processes(tasks).

For example, if we create spark context with spark.executor.cores = 5 and spark.executor.memory =5g, it means the resource unit (Executor) is a combination of 5 cores and 5g memory to request to cluster manager.Executor is combined with a bunch of CPU cores and memory. Itís like YARN container in which process runs. Each executor is a resource unit requested to cluster manager (either Sparkís own standalone cluster manager, Mesos or YARN).

A single node can have as much executor as much hardware support. If a node has 16GB RAM and 4 Core then we cannot have 2 or more than two with configuration spark.executor.cores = 4 and spark.executor.memory =16gb. 

What is Task?


In spark, executor is long running process. Itís get created with start of application and die once application is finished whereas lifetime of task is task bashed and it can be created and killed many time in span of application.The smallest unit of process is task. In spark it is a thread that reside in a Process called ďExecutorĒ.

Spark defines each task occupies spark.task.cups number of CPU cores, so we could simply calculate the number of tasks which could simultaneously run an executor:

No of task per executor = spark.executor.cores/spark.task.cpus

By Simply connecting Resource Unit(executor) and Execution Unit(task) we can identify the amount of resource required though task.

 Why Spark came up with new mechanism of resource allocation, called Dynamic Allocation?


Driver host spark context of spark application and it is responsible to running executors where task scheduler schedule task inside executor.

Static allocation


Spark connect to YARN for request for specified No of executor while running application, it start the all executor at beginning of application and then task scheduler schedule task in executors.It allocates resource to executor, spark connect to cluster manager, let take cluster manager as YARN.

As I mentioned above lifetime of task is task bashed and it (thread) get killed once task is complete but executorís lifetime is throughout the Spark Application and it retain acquired resource for executor till spark context is alive or till application is running, even if it is seating idle or no task is running in it.

Now here problem is effective utilization of YARN resource (memory and cores). If there is only one application is running on cluster then itís good to keep acquired resource by Application but in real environment this never happen, instead there is multiple jobs are running on same cluster and hence fight between multiple applications comes up to acquire required no of resource unit(Executors).

To resolve this problem spark came with solution of Dynamic Allocation of Resources.

 

Dynamic Allocation


This means your application may give resources back to the cluster if they are no longer used and request them again later when there is demand. This feature is particularly useful if multiple applications share resources in your Spark cluster.As compared to static allocation where spark request all resource at start of application and release at end of application, Dynamic allocation request and remove resources dynamically at run-time bashed on pending task.

This feature is disabled by default and available with some configuration changes.

Now before explaining about Configuration and Setup of Dynamic Allocation we should ask a question that what is difference between Spark and MRv2. How we never faced the issue like static allocation in spark that it is holding memory even it doesnít require.



Spark Vs. MRv2

But in MRv2 resource unit is container and task is process unit that runs inside the container. Here container gets deallocated once task is completed. So here it never holds the memory for further execution of new task where it happens in spark for fast execution of job.As I mentioned above, In Spark executor is a resource unit and task is process unit and task runs inside the executor. Task can be killed once it gets completed but executor lives as long as application runs.

Dynamic Allocation policy of scaling executors up and down

  1. Scale Up Policy requests new executors when there are pending tasks and increases the number of executors exponentially since executors start slowing and Spark application may need slightly more.
  2. Scale Down Policy removes executors that have been idle for spark.dynamicAllocation.executorIdleTimeout seconds.

Configuration and Setup of Dynamic Allocation


To configure dynamic allocation, we have to configure spark.dynamicAllocation.enabled to ture and spark.shuffle.service.enabled to true to enable external shuffle service.

These two services must require for dynamic allocation but apart from these two there are some others configuration to have fine grained control on resource allocation of executors.           

Spark Property

Default Value

spark.dynamicAllocation.enabled

false

spark.dynamicAllocation.initialExecutors

spark.dynamicAllocation.minExecutors

spark.dynamicAllocation.minExecutors

0

spark.dynamicAllocation.maxExecutors

Integer.MAX_VALUE

spark.dynamicAllocation.executorIdleTimeout

60s

     

        If you specify --num-executors command-line option of spark-submit it will be treated as spark.dynamicAllocation.initialExecutors value.


Application level configuration

spark-submit --master spark://<spark_master>:7077

--class com.riveriq.DynamicAllocationTest --executor-cores 1 --executor-memory 1G

--conf spark.dynamicAllocation.enabled=true DynamicAllocationTest-app.jar

Or

  val sparkSession = SparkSession.builder

      .appName(DynamicAllocationTest)

      .config("spark.dynamicAllocation.enabled", "true")

      .config("spark.shuffle.service.enabled", "true")

      .config("spark.dynamicAllocation.initialExecutors", "12")

How to calculate the desired resources (executors, core & memory)


Hardware:

Cores Per node - 16 Core

RAM Per node - 112

 

Total Hardware- 6 Node, 96 Cores, 672 GB

First 1 core and 1 GB is needed for OS and Hadoop Daemons, so available resources are 15 cores, 111 GB RAM for each node

Let's take 5 cores per executors, take core in such manner that it fully utilized in all executors

No of Concurrent task per executor =No of thread per executor = No of cores per executor = 5

Number of executors per node= Total no of core per node/ No of cores per executor

= 15/5=3

Total no of executors = No of nodes * Number of executors per node

= 6*3=18

Memory for each executor: 111/3 = 37 GB

No of cores per executors = 5 cores

Final numbers - Executors - 18, Cores 5, Executor Memory - 37 GB

spark-submit --class " com.riveriq.DynamicAllocationTest"

--master yarn

--deploy-mode cluster

--num-executors 18

--executor-memory 5g

--executor-cores 30

DynamicAllocationTest-app.jar

Here for dynamic allocation no need to specify --num-executors. If you are specifying it will act as spark.dynamicAllocation.initialExecutors value.

When we should enable dynamic allocation of resource


In one of my spark program, I was reading huge no files and then doing some operation and then merging those into a single file using fileutil.copymerge. I will answer this question by sharing one of my research.

When I started running this application with right no of executors.  I found yarn memory is going to 99% that was expected as per my spark submit command.

But when I saw the spark application DAG and Execution timeline I found an interesting thing.

That was after running sometime 17 executors out 18 is seating idle and one executor running merge task and this was also expected because this task will run by single thread only.

Even the 17 executors are seating idle but yarn memory still 99% and that is wastes of resource that need to assign to some other parallel running applications and then came with conclusion of enabling dynamic allocation.

Before enabling dynamic allocation in spark, please consider below points.

1.       If it is expected to run multiple application in parallel on same cluster.

2.       Check Application DAG and execution timeline, if there is some executors seating idle for long time.

3.       Every application has same priority to finish. 

Conclusion


Dynamic resource allocation is solution for effective utilization of resources. Here spark calculate required no of resources, allocate and deallocate at run time.By Default, spark does static allocation of resources. We statically define right no executors, memory and no of cores but same time itís very difficult to calculate the right no of resources and it can lead to ineffective utilization of resources.  It can create underutilized of cluster resource and for other application starvation of resources.

                                                                                                   

0 Comments

Be first to comment on this post.