-
-
Save joshlk/871d58e01417478176e7 to your computer and use it in GitHub Desktop.
import pandas as pd | |
def _map_to_pandas(rdds): | |
""" Needs to be here due to pickling issues """ | |
return [pd.DataFrame(list(rdds))] | |
def toPandas(df, n_partitions=None): | |
""" | |
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is | |
repartitioned if `n_partitions` is passed. | |
:param df: pyspark.sql.DataFrame | |
:param n_partitions: int or None | |
:return: pandas.DataFrame | |
""" | |
if n_partitions is not None: df = df.repartition(n_partitions) | |
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect() | |
df_pand = pd.concat(df_pand) | |
df_pand.columns = df.columns | |
return df_pand |
I am running into the memory problem. This works on about 500,000 rows, but runs out of memory with anything larger. I am partitioning the spark data frame by two columns, and then converting 'toPandas(df)' using above. Any ideas on best way to use this? I want each individual partition to be a pandas data frame.
Its likely that your partitions are too big for each executor. You probably need to increase the number of partitions before using toPandas
.
Hey Josh,
I was looking to use the code to create a pandas data frame from a pyspark data frame of 10mil+ records. I saved the above code to a file (faster_toPandas.py) and attempted to import this into my main program. While attempting to call the toPandas() function on my Pyspark dataframe, I kept receiving an Import Error: Module "faster_toPandas" not found. It appears that pickle.loads() was the last call made before an error was thrown. Do you have any ideas on what may be the cause of this error?
Any help would be greatly appreciated!
Why isn't this the default implementation? Are there any drawbacks here that I'm not seeing?
Great gist btw, really appreciate it.
thanx , it's a great way of using toPandas() func.
I was using this great code with pandas 0.18.1, but now with 0.21.1 it fails this way:
<ipython-input-5-63082a0c1bfc> in toPandas(df, n_partitions)
30 """
31 if n_partitions is not None: df = df.repartition(n_partitions)
---> 32 df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
33 df_pand = pd.concat(df_pand)
34 df_pand.columns = df.columns
/local/adhoc/spark/python/pyspark/rdd.py in collect(self)
770 with SCCallSiteSync(self.context) as css:
771 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
--> 772 return list(_load_from_socket(port, self._jrdd_deserializer))
773
774 def reduce(self, f):
/local/adhoc/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
140 try:
141 rf = sock.makefile("rb", 65536)
--> 142 for item in serializer.load_stream(rf):
143 yield item
144 finally:
/local/adhoc/spark/python/pyspark/serializers.py in load_stream(self, stream)
137 while True:
138 try:
--> 139 yield self._read_with_length(stream)
140 except EOFError:
141 return
/local/adhoc/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
162 if len(obj) < length:
163 raise EOFError
--> 164 return self.loads(obj)
165
166 def dumps(self, obj):
/local/adhoc/spark/python/pyspark/serializers.py in loads(self, obj, encoding)
417 if sys.version >= '3':
418 def loads(self, obj, encoding="bytes"):
--> 419 return pickle.loads(obj, encoding=encoding)
420 else:
421 def loads(self, obj, encoding=None):
ImportError: No module named 'pandas.indexes'
Any workaround as I cannot change any other code / lib version? I saw this https://stackoverflow.com/questions/37371451/importerror-no-module-named-pandas-indexes but I cannot apply it.
Thanks!
Hi, I get this error when I try to convert a large spark dataframe to Pandas dataframe-
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
traceback:
[
"Traceback (most recent call last):\n",
"Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:323)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:933)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n\n"
]
I am getting the below error:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-8579817590870380218.py", line 367, in
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-8579817590870380218.py", line 360, in
exec(code, _zcUserQueryNameSpace)
File "", line 1, in
File "", line 14, in toPandas
File "/usr/lib/spark/python/pyspark/rdd.py", line 809, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 11.0 failed 4 times, most recent failure: Lost task 10.3 in stage 11.0 (TID 870, ip-172-31-33-241.us-west-2.compute.internal, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 166, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/cloudpickle.py", line 711, in subimport
import(name)
ImportError: ('No module named pandas', <function subimport at 0x7fa2ee31ec80>, ('pandas',))
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 166, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/cloudpickle.py", line 711, in subimport
import(name)
ImportError: ('No module named pandas', <function subimport at 0x7fa2ee31ec80>, ('pandas',))
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
@maddurup seems you do not have pandas installed in your environment
ImportError: ('No module named pandas', <function subimport at 0x7fa2ee31ec80>, ('pandas',))
I am getting this mismatch on length
ValueError Traceback (most recent call last)
in ()
----> 1 testPrePeriodDataWeekPandas=toPandas(testPrePeriodDataWeek, n_partitions=1000)
in toPandas(df, n_partitions)
16 df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
17 df_pand = pd.concat(df_pand)
---> 18 df_pand.columns = df.columns
19 return df_pand
~/.anaconda/lib/python3.7/site-packages/pandas/core/generic.py in setattr(self, name, value)
4387 try:
4388 object.getattribute(self, name)
-> 4389 return object.setattr(self, name, value)
4390 except AttributeError:
4391 pass
pandas/_libs/properties.pyx in pandas._libs.properties.AxisProperty.set()
~/.anaconda/lib/python3.7/site-packages/pandas/core/generic.py in _set_axis(self, axis, labels)
644
645 def _set_axis(self, axis, labels):
--> 646 self._data.set_axis(axis, labels)
647 self._clear_item_cache()
648
~/.anaconda/lib/python3.7/site-packages/pandas/core/internals.py in set_axis(self, axis, new_labels)
3321 raise ValueError(
3322 'Length mismatch: Expected axis has {old} elements, new '
-> 3323 'values have {new} elements'.format(old=old_len, new=new_len))
3324
3325 self.axes[axis] = new_labels
ValueError: Length mismatch: Expected axis has 0 elements, new values have 9 elements
Do we have solution available for ValueError: Length mismatch: Expected axis has 0 elements, new values have 9 elements?
Hi, I get this error when I try to convert a large spark dataframe to Pandas dataframe-
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
traceback:
[
"Traceback (most recent call last):\n",
"Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:323)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:933)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n\n"
]
I had the same problem but no idea how to solve it.
If you use Spark 2.3.* just turn on Arrow spark.conf.set("spark.sql.execution.arrow.enabled", "true")
and it will significantly speed up toPandas method.
don't forget reset_index(drop=True)
of your pandas dataframe
thank you very much for this solution!
I tried all the above mentioned optimisations and still keep running into memory issues. I even put the new partitions as big number.
I get the following error:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 5450 tasks (4.0 GB) is bigger than spark.driver.maxResultSize (4.0 GB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2581)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2302)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2321)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2346)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:997)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
at org.apache.spark.rdd.RDD.collect(RDD.scala:996)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:247)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
I tried all the above mentioned optimisations and still keep running into memory issues. I even put the new partitions as big number.
I get the following error:An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 5450 tasks (4.0 GB) is bigger than spark.driver.maxResultSize (4.0 GB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2581)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2302)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2321)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2346)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:997)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
at org.apache.spark.rdd.RDD.collect(RDD.scala:996)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:247)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
You can increase your spark.driver.maxResultSize in your Spark config. It seems like you df is larger than the four gigabyte. Set it to 15g or something similar and try to run it again.
Not only faster but fixed a job-crashing out-of-memory issue for me! Much appreciated!