Skip to main content



How to determine Spark JOB Resources over YARN

This is a question which I believe is not answerable in a direct way. Because it depends upon various run-time factors -


  1. Data Distribution
  2. Skewed Data
  3. Operations that we are performing Join, Windowing, etc.
  4. Shuffle Time, GC Overhead, etc
A run-time  analysis of DAG & JOB execution help us tune optimal resources for a Spark Job.

But, we will try to provide very basic answer to this question in the blog. Note that it is a run time behavior, so the answer may not fit all the use cases.

Suppose you have a multi tenant cluster and you determine how much hardware resources is available for your yarn queue or user.

Say you have -

  1. 6 Nodes, and Each node has 16 cores, 64 GB RAM
Also, note the configurations of you Edge Node from where you will trigger the Spark JOB. As multiple Jobs will spawned from same edge node. So, resources of edge node can be a bottleneck too.


  1. 1 core and 1 GB is needed for OS and Hadoop Daemon. So, you have 6 machines with 15 cores and 63 GB RAM.

Number of cores = Concurrent tasks an executor can run 

So we might think, more concurrent tasks for each executor will give better performance. But research shows that
any application with more than 5 concurrent tasks, would lead to bad show. So stick this to 5.

This number came from the ability of executor and not from how many cores a system has. So the number 5 stays same
even if you have double(32) cores in the CPU.

Note that - 
  1. Analyze the DAG, if you see that a executor is processing 1 task at a time then there is no need to provide 5 cores.
  2. Also, try to reduce the number of cores and see performance variation. If reducing number of cores doesn't reduce execution time drastically or you don't have a hard SLA requirements then you might not require 5 cores per executor.
  3. For some use cases, you may require to have Fat executors (Less executors but having more cores) and for some you may require Slim executors (Less cores but more number of executors)
Number of executors:

Coming back to next step, with 5 as cores per executor, and 15 as total available cores in one Node(CPU) - we come to 
3 executors per node.

So with 6 nodes, and 3 executors per node - we get 18 executors. Out of 18 we need 1 executor (java process) for AM/ Driver in YARN we get 17 executors

This 17 is the number we give to spark using --num-executors while running from spark-submit shell command

Note that - 

  1. This a run-time factor as well. If your job is as simple as applying a window function or group by function. Then, if you think logically all the tasks will be processed by 1 executor. So, having too many executors doesn't make sense.
  2. If you can just spawn 17 executors then you would not want to give all 17 to a JOB. There might be other jobs running at same moment in parallel.
  3. You would also want to know execution or completion time for this Job. So, that you can schedule other Jobs accordingly. That way you will know when the space will be released from this Job and can be utilized by other Jobs.
Memory for each executor:

From above step, we have 3 executors  per node. And available RAM is 63 GB

So memory for each executor is 63/3 = 21GB. This, is just random mathematics, you can have less memory. Don't occupy more if you don't need it.

However small overhead memory is also needed to determine the full memory request to YARN for each executor.
Formula for that over head is max(384, .07 * spark.executor.memory)

Calculating that overhead - .07 * 21 (Here 21 is calculated as above 63/3)
                            = 1.47

Since 1.47 GB > 384 MB, the over head is 1.47.
Take the above from each 21 above => 21 - 1.47 ~ 19 GB

So executor memory - 19 GB


Note that -

  1. This is also determined by operations one is performing. Like - if its a map side join then a executor should have sufficient memory for data. In such a scenario, we may require to have Fat executors, meaning if you want to allocate a total of 50 GB RAM then we can have - 
    1. 10 executors, each with 5 GB RAM, or
    2. 5 executors, each with 10GB RAM . This is what we meant by Fat executors.
  2. If there is too much shuffling then we can increase the shuffle partitions and increase the number of executors. That way, we will have a better data distribution of Keys among executors - executing reduce tasks.
  3. Normally, Executor memory controls the execution time period of a task. If we have sufficient memory data will be not spilled to disk. 
  4. Again, determining an optimal memory is a runtime analysis for a JOB. For example - 
    1. If we are executing a window operation like row_number(). And, data is skewed i.e. we have more number of rows for a partition than it may be required to have a big executor with sufficient memory. Note, the processing might be slow in this case but we can increase the memory in such a way that it won't fail because of Memory problems.
    2. In some scenario, we might have a huge data set. But, well distributed among unique keys. In that case, we may require more number of executors but with tiny fraction of memory.
    3. Also note that, we should define container/ executor size in such a way that if its a daily running Job then Job should get sufficient resources on scheduled time for execution. Otherwise, it may disturb stability of system  i.e. a Job may execute successfully today. But, may fail tomorrow due to resource contention.
    4. If needed Salting should be done. So, as to distribute the data set.
    5. Also, say if for a Join we have too many NULL values causing data skew. Then, 
      1. we can filter out all the NULL records.
      2. Do the JOIN
      3. Then union the data set having NULL records (#1) with output data set of JOIN (#2).
    6. One has to analyze the problem and come up with a solution to efficiently utilize memory. Just increasing the executor memory is not a solution in major cases.
    7. Consider a situation, where-in, we have given 45 GB memory to executor and each executor has 10 cores. On run-time, this job is failing with "Out of Memory - Required memory exceeds available". When we analyze tasks, we get a better picture - 
      1. There can be data skew leading to tasks failure, or
      2. Say, a task is reading 4 GB data and joining with some other data set. Now, we say that this executor can execute 10 tasks in parallel (10 cores ). Chances are that it will go out of memory because, as tasks execute 10*4 ~ 40 GB or more memory will be used. A possible solution can be - 
        1. Reduce the parallelism to 4.
        2. Reduce the memory to 21GB
        3. Increase the number of executor by respective amount.
        4. That way, tasks will be parallelized across executors. Rather than one executor executing too many tasks.
Driver Cores: 

Mostly, having default driver cores of 2 is sufficient. But, in case we have some parallel computations i.e say if there are more than 1 JOB's executing in parallel within a spark application then to speed up tasks allocation to executors, we may want to have more number of cores for driver.

Driver Memory:

Driver-memory flag controls the amount of memory to allocate for a driver, which is 1GB by default and should be increased in case you call a collect() or take(N) action on a large RDD inside your application.

Spark driver is the cockpit of jobs and tasks execution. It also hosts WEB UI for the environment.It splits a Spark application into tasks and schedules them to run on executors. A driver is where the task scheduler lives and spawns tasks across workers. A driver coordinates workers and overall execution of tasks. Driver requires many additional services like - ShuffleManager, HTTPFileServer, etc

Thus, driver memory should be set appropriately. So, as to avoid out of memory errors.

Dynamic Allocation
If you are still not sure then you can try dynamic allocation of resources. I avoid using that because Firstly, a JOB can eat up whole cluster (, which may not be required). Secondly, you will turn on Fair Scheduling with preemption: that might result in to lost executors or computation for a JOB, which can result unpredictable behavior for a JOB i.e. it might complete in 10 minutes, or 1 hour or may fail sometimes.

Its a good feature to try. But, as your cluster grows ( in terms of number of running Jobs) you might want to fine tune your applications, or  if you have sufficient money or budget to add more resources or nodes to cluster than may be fine tuning is not needed. 

Comments

Popular posts

Python [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: Missing Authority Key Identifier

  Error requests.exceptions.SSLError: HTTPSConnectionPool  Max retries exceeded with url:  (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: Missing Authority Key Identifier (_ssl.c:1028)'))). Analysis & Solution Recently, we updated from Python 3.11 to 3.13, which resulted in error above. We did verify AKI = SKI in chain of certificates. Also, imported chain of certificates into certifi. Nothing worked for us. Seemingly, it is a bug with Python 3.13. So, we downgraded to Python 3.12 and it started working. Other problems and solution -  '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1006)'))) solution  pip install pip-system-certs [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate has expired  (_ssl.c:1006) solution  1# openssl s_client -showcerts -connect  signin.aws.amazon.com:443  </dev/...




Spring MongoDB Rest API not returning response in 90 seconds which is leading to client timeout

  We have Spring Boot  Rest API deployed in Kubernetes cluster which integrates with MongoDB to fetch the data.  MongoDB is fed with data by a real time Spark & NiFi job.  Our clients complained that for a request what they send they don't have response within 90 seconds. Consider it like an OMS ( Order ManagEment System).  On further analysis, we found that Spark & NiFi processing is happenning within 10 seconds after consuming response data from Kafka. Thus, initally out thought was that it due to delay from upstream to produce data in to Kafka.  Thankfully, our data had create / request  timestamp, and when response was received, and when response was inserted into MongoDB. Subtracting response insert time from request time seemed to be well within 90 seconds. But, still client did timeout on not seeing a response within 90 seconds. This led to confusion on our side.  But, then we realized it was due to Read Preference . We updated this...




MongoDB Regex Query taking more time in Production but same query perform well in UAT

   We came across a situation where-in, MongoDB Query was taking more time in Production like 10 seconds and 4.2 seconds but same query performed well in UAT taking under 400 ms. The very first thought that was evident to us that it is because of amount of data which differed in UAT and Production. Then we ran following to see the execution plan -   db.collection.aggregate(<queries>).explain() This gave us Winning and Rejected Plans. Under which, we analyzed that although it was using 'IXSCAN.' But, it was incorrect index- as we had one compound index built on time field and other fields, and there was other index just on time field for TTL purposes. Winning plan picked TTL index rather than compound index. Thus, we dropped TTL index and built TTL index on a different time field.  That got our query performance time from 10 seconds to 726 ms. Also, for other query the performance came down from 8 seconds to 4.3 seconds. Then, we ran following -  ...




What is Leadership

 




Spark MongoDB Connector Not leading to correct count or data while reading

  We are using Scala 2.11 , Spark 2.4 and Spark MongoDB Connector 2.4.4 Use Case 1 - We wanted to read a Shareded Mongo Collection and copy its data to another Mongo Collection. We noticed that after Spark Job successful completion. Output MongoDB did not had many records. Use Case 2 -  We read a MongoDB collection and doing count on dataframe lead to different count on each execution. Analysis,  We realized that MongoDB Spark Connector is missing data on bulk read as a dataframe. We tried various partitioner, listed on page -  https://www.mongodb.com/docs/spark-connector/v2.4/configuration/  But, none of them worked for us. Finally, we tried  MongoShardedPartitioner  this lead to constant count on each execution. But, it was greater than the actual count of records on the collection. This seems to be limitation with MongoDB Spark Connector. But,  MongoShardedPartitioner  seemed closest possible solution to this kind of situation. But, it per...