graph TD;
A-->B;
A-->C;
B-->D;
C-->D;
Loading
Increase driver heap to accommodate large DAGs
Avoid too granular executors (use larger heaps) and configure multi-threading (cores)
Set memory.fraction=0.6 to leave the rest to executor working memory (shuffle, etc)
60% of instance CPUs allocated to executors, leave headroom for other tasks
Disable offHeap memory – not stable in our tests
# instance i3 .8 xlarge | 244 GiB | 32 CPU | 4 *2 TiB SSD | 10 Gbps
driver -memory 32 g
spark .driver .maxResultSize =10 g
executor -memory 32 g
executor -cores 6
num -executors INSTANCES *4
spark .memory .offHeap .enabled =false
spark .executor .memoryOverhead =12 g
spark .memory .fraction =0.6
spark .dynamicAllocation .enabled =false
spark .shuffle .service .enabled =false
Use disk persistence only, when running on SSDs
Leave heap memory to Spark executor
--conf spark .driver .extraJavaOptions =
-Dspark .persistence .useDisk =true
-Dspark .persistence .useOnHeapMemory =false
-Dspark .persistence .useOffHeapMemory =false
-Dspark .persistence .keepDeserialized =false
-Dspark .persistence .replication =2
Fine tune the shuffle partitions based on your number of executors/cores.
Increase the split size when reading data from blob store (e.g. S3)
spark .sql .shuffle .partitions =SPARK_NUM_EXECUTORS * SPARK_EXECUTOR_CORES *2
spark .sql .files .maxPartitionBytes = 268435456
spark .files .maxPartitionBytes =268435456
Increase tasks execution resilience
Increase network timeouts to cope with network flukes (e.g. EMR)
Enable blacklisting of executors and increase number of task retries to cope with degraded instances
spark .sql .broadcastTimeout =36000
spark .network .timeout =120
spark .task .maxFailures =20
spark .blacklist .enabled =true
spark .blacklist .timeout =99 h