Dynamic Allocation in Spark
Why spark is faster than
MapReduce?
Here today I will
give you deep dive about Spark Resource Allocation (Static and dynamic
allocation of resources).Whenever this
question arose, we have come up with below explanation that Spark does
in-memory processing of data or it does better or effective utilization of YARN
resources than MapReduce.
How and when
dynamic allocation of resource will give faster and effective utilization of
resources.
Effective utilization of cluster or yarn memory.
What is Executors?
Before we start talking about static and dynamic allocation of resource and why and where dynamic allocation matters, it very necessary to understand resources and processes units and how to calculate right number of resources (executors) or processes(tasks).
For example, if
we create spark context with spark.executor.cores = 5 and spark.executor.memory
=5g, it means the resource unit (Executor) is a combination of 5 cores and 5g
memory to request to cluster manager.Executor is
combined with a bunch of CPU cores and memory. It’s like YARN container in
which process runs. Each executor is a resource unit requested to cluster
manager (either Spark’s own standalone cluster manager, Mesos or YARN).
A single node can have as much executor as much hardware support. If a node has 16GB RAM and 4 Core then we cannot have 2 or more than two with configuration spark.executor.cores = 4 and spark.executor.memory =16gb.
What is Task?
In spark,
executor is long running process. It’s get created with start of application and
die once application is finished whereas lifetime of task is task bashed and it
can be created and killed many time in span of application.The smallest unit
of process is task. In spark it is a thread that reside in a Process called
“Executor”.
Spark defines
each task occupies spark.task.cups number of CPU cores, so we could simply
calculate the number of tasks which could simultaneously run an executor:
No of task per
executor = spark.executor.cores/spark.task.cpus
By Simply connecting Resource Unit(executor) and Execution Unit(task) we can identify the amount of resource required though task.
Driver host spark
context of spark application and it is responsible to running executors where
task scheduler schedule task inside executor.
Spark connect to
YARN for request for specified No of executor while running application, it
start the all executor at beginning of application and then task scheduler
schedule task in executors.It allocates
resource to executor, spark connect to cluster manager, let take cluster
manager as YARN.
As I mentioned
above lifetime of task is task bashed and it (thread) get killed once task is
complete but executor’s lifetime is throughout the Spark Application and it
retain acquired resource for executor till spark context is alive or till
application is running, even if it is seating idle or no task is running in it.
Now here problem
is effective utilization of YARN resource (memory and cores). If there is only
one application is running on cluster then it’s good to keep acquired resource
by Application but in real environment this never happen, instead there is
multiple jobs are running on same cluster and hence fight between multiple
applications comes up to acquire required no of resource unit(Executors).
To resolve this
problem spark came with solution of Dynamic Allocation of Resources.
Dynamic Allocation
This means your
application may give resources back to the cluster if they are no longer used
and request them again later when there is demand. This feature is particularly
useful if multiple applications share resources in your Spark cluster.As compared to
static allocation where spark request all resource at start of application and
release at end of application, Dynamic allocation request and remove resources
dynamically at run-time bashed on pending task.
This feature is
disabled by default and available with some configuration changes.
Now before explaining about Configuration and Setup of Dynamic Allocation we should ask a question that what is difference between Spark and MRv2. How we never faced the issue like static allocation in spark that it is holding memory even it doesn’t require.
Spark Vs. MRv2
But in MRv2 resource unit is container and task is process unit that runs inside the container. Here container gets deallocated once task is completed. So here it never holds the memory for further execution of new task where it happens in spark for fast execution of job.As I mentioned above, In Spark executor is a resource unit and task is process unit and task runs inside the executor. Task can be killed once it gets completed but executor lives as long as application runs.
Dynamic Allocation policy of
scaling executors up and down
- Scale Up Policy requests new executors when there are pending tasks and increases the number of executors exponentially since executors start slowing and Spark application may need slightly more.
- Scale Down Policy removes executors that have been idle for spark.dynamicAllocation.executorIdleTimeout seconds.
Configuration and Setup of
Dynamic Allocation
To configure dynamic allocation, we have
to configure spark.dynamicAllocation.enabled to ture and spark.shuffle.service.enabled to true to enable
external shuffle service.
These two services must require for
dynamic allocation but apart from these two there are some others configuration
to have fine grained control on resource allocation of executors.
Spark
Property |
Default
Value |
spark.dynamicAllocation.enabled |
false |
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors |
spark.dynamicAllocation.minExecutors |
0 |
spark.dynamicAllocation.maxExecutors |
Integer.MAX_VALUE |
spark.dynamicAllocation.executorIdleTimeout |
60s |
If you specify --num-executors command-line option of spark-submit it will be treated as spark.dynamicAllocation.initialExecutors value.
Application level configuration
spark-submit
--master spark://<spark_master>:7077
--class
com.riveriq.DynamicAllocationTest --executor-cores 1 --executor-memory 1G
--conf
spark.dynamicAllocation.enabled=true DynamicAllocationTest-app.jar
Or
val sparkSession = SparkSession.builder
.appName(DynamicAllocationTest)
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.shuffle.service.enabled", "true")
.config("spark.dynamicAllocation.initialExecutors", "12")
How to calculate the desired
resources (executors, core & memory)
Hardware:
Cores Per node - 16 Core
RAM Per node - 112
Total Hardware- 6 Node, 96 Cores, 672 GB
First 1 core and 1 GB is needed for OS and Hadoop Daemons, so available resources are 15 cores, 111 GB RAM for each node
Let's take 5 cores per executors, take core in such manner that it fully utilized in all executors
No of Concurrent task per executor =No of thread per executor = No of cores per executor = 5
Number of executors per node=
Total no of core per node/ No of cores per executor
= 15/5=3
Total no of executors = No of
nodes * Number of executors per node
= 6*3=18
Memory for each executor: 111/3 = 37 GB
No of cores per executors = 5 cores
Final numbers - Executors - 18, Cores 5, Executor Memory - 37 GB
spark-submit
--class " com.riveriq.DynamicAllocationTest"
--master yarn
--deploy-mode
cluster
--num-executors
18
--executor-memory
5g
--executor-cores
30
DynamicAllocationTest-app.jar
Here for dynamic
allocation no need to specify --num-executors. If you are specifying it will
act as spark.dynamicAllocation.initialExecutors value.
When we should enable
dynamic allocation of resource
In one of my
spark program, I was reading huge no files and then doing some operation and
then merging those into a single file using fileutil.copymerge. I will answer
this question by sharing one of my research.
When I started
running this application with right no of executors. I found yarn memory is going to 99% that was
expected as per my spark submit command.
But when I saw
the spark application DAG and Execution timeline I found an interesting thing.
That was after
running sometime 17 executors out 18 is seating idle and one executor running
merge task and this was also expected because this task will run by single
thread only.
Even the 17
executors are seating idle but yarn memory still 99% and that is wastes of
resource that need to assign to some other parallel running applications and
then came with conclusion of enabling dynamic allocation.
Before enabling
dynamic allocation in spark, please consider below points.
1.
If it is expected to run
multiple application in parallel on same cluster.
2.
Check Application DAG and
execution timeline, if there is some executors seating idle for long time.
3. Every application has same priority to finish.
Conclusion
Dynamic resource
allocation is solution for effective utilization of resources. Here spark
calculate required no of resources, allocate and deallocate at run time.By Default, spark
does static allocation of resources. We statically define right no executors,
memory and no of cores but same time it’s very difficult to calculate the right
no of resources and it can lead to ineffective utilization of resources. It can create underutilized of cluster
resource and for other application starvation of resources.
0 Comments
Be first to comment on this post.