Re-assigning a variable in the REPL leads to memory leaks.
The original case had a few unnecessary features, but it boils down to re-assigning a fresh large array (100MB) to the same variable several times.
The existing test case can be further minimized to not depend on Spark:
object Min {
def m(array: Array[Int]): Array[Int] = {
Array.range(0, array.length)
}
}
val n = 25000000
val array = (0 until n).toArray
var ary2: Array[Int] = null
// Repeat the rest until failure:
ary2 = Min.m(array)
This example fails both in spark-shell
and the plain Scala REPL (2.11.7).
I ran the reproducible code in the spark-shell (1.6.1-rc1 with scala 2.10.5) and noticed the heap going up. I used jvisualvm
to monitor the heap usage and took a heap dump when I saw it was significantly higher (after 4-5 repeats of the last line). The profiler immediately identified the top-most consumers: sevaral arrays of exactly 25 million elements, each one holding onto 100MB of heap.
JVisualVM can find the nearest GC root for each one of the arrays, here's the breakdown:
- one of them was the expected
ary2
- the others were deeply nested fields rooted in a top-level object, itself contained inside the Spark class loader. The path to the object is something like
$read$.$iw.$iw.$iw.$iw.$iw.$iw.$iresX
, whereX
is a number, each time different.
It became clear that each invocation of m
is creating a new array that is not garbage collectable. Clearly, Array.range
is copying the array and the returne RDD is live. But the returned value is stored inside a mutable variable, ary2
and should overwrite the reference to the previous array.
So who holds on to the old RDD?
The Spark REPL allows some of the regular Scala REPL command line arguments, so the next step was to run it with -Xprint:parse
. I was particularly interested in what the assignment is compiled down to.
The Scala REPL is not really an interpreter: each line is actually compiled to a full Scala program by wrapping the line in a fairly involved mix of wrapper classes, objects and imports for the previously defined members.
Here's a slightly cleaned-up version of the output (using a slightly more complicated version of the reproducible test case still involving Spark types):
scala> ary2 = Min.m(sc, array, n)
[[syntax trees at end of parser]] // <console>
package $line25 {
class $read extends Serializable {
class $iwC extends Serializable {
val $VAL21 = $line3.$read.INSTANCE;
import $VAL21.$iw.$iw.sc;
val $VAL22 = $line4.$read.INSTANCE;
import $VAL22.$iw.$iw.sqlContext;
class $iwC extends Serializable {
import org.apache.spark.SparkContext._;
class $iwC extends Serializable {
class $iwC extends Serializable {
import sqlContext.implicits._;
class $iwC extends Serializable {
import sqlContext.sql;
class $iwC extends Serializable {
import org.apache.spark.sql.functions._;
val $VAL23 = $line15.$read.INSTANCE;
import $VAL23.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.n;
val $VAL24 = $line19.$read.INSTANCE;
import $VAL24.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.array;
import org.apache.spark.rdd.RDD;
val $VAL25 = $line21.$read.INSTANCE;
import $VAL25.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.ary2;
import org.apache.spark.SparkContext;
class $iwC extends Serializable {
import org.apache.spark.rdd.RDD;
val $VAL26 = $line24.$read.INSTANCE;
import $VAL26.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.Min;
class $iwC extends Serializable {
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this);
ary2 = Min.m(sc, array, n);
val $ires9 = ary2
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
object $read extends scala.AnyRef {
val INSTANCE = new $read()
}
}
[[syntax trees at end of parser]] // <console>
package $line25 {
object $eval extends scala.AnyRef {
lazy val $result = $line25.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$ires9;
val $print: String = {
$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw;
"".$plus("ary2: org.apache.spark.rdd.RDD[Int] = ").$plus($line25.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$ires9).$plus("\n")
}
}
}
The important bit is around the ary2
assignment. And to my surprise, there's the insidious $ires9
field:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this);
ary2 = Min.m(sc, array, n);
val $ires9 = ary2
$ires9
is a field that holds on to the same array as ary2
, except that it's never re-assigned! That's where our memory goes!
Looking through the Spark sources it looks like this assignment is added by the REPL in SparkMemberHandlers.scala
(for Scala 2.10), more precisely inside AssignmentHandler
. The same code appears in the vanilla Scala REPL.
Why, oh why?!
The answer is in the last lines of the listing: because the REPL prints out the toString
of the last expression! However, assignments have type Unit
, and that's not interesting enough, so someone decided to store that variable into a field and print that field instead of the uninteresting ()
. Assignments are special-cased.
A proper bugfix needs to be applied to the Scala REPL. I don't see an easy way to fix this issue while keeping the special treatment of assignments, other than eagerly evaluating toString
on the variable and storing that in the synthetic result. However, printing is more involved that just a call to toString
(The REPL does a deep print on Arrays, truncates output after a certain number of elements, etc). Maybe assignments should not be special-cased after all? The Worksheet doesn't do it, and that's never been a problem. A lie has short legs, as they say in some parts of the world!
Never assign a variable in the REPL. You can define a helper method that does it for you, but NEVER, ever, let an assignment be evaluated by the REPL.
def update(r1: RDD[Int]): Unit = {
ary2 = r1
}
This can update ary2
as many times as you want, without causing an OOM.
This is great. I forgot about the
-Xprint:parse
feature. I actually hacked the forked REPL in Spark to print the string before compilation!Would the
ires9
problem be avoided if you added another harmless expression after the assignment, as a workaround? For example,println(array.take(5))
?