- Connect to spark by creating a spark context.
from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName('somename').setMaster('local') sc = SparkContext(conf=conf) The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.
- Creating an RDD. RDD's are distributed objects that contain data. Mostly used for lists. Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system.
Local collection:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) distData.reduce(lambda a, b: a + b) External collection:
PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines. Here is an example invocation:
distFile = sc.textFile("data.txt")
- Entire directories of textfiles can be read by using WholeTextfiles.
SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
files = sc.wholeTextFiles('*.txt') files.collect()
- Using Sample to take a part of the dataset.
Things I learned
Using wildcards! All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").
SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
SequenceFile is a way to store a lot of small files in one big file so it uses less of the HDFS cluster's memory. The file will take more space, but it is much quicker because MapReduce jobs don't have to switch for each small file.
You can't expect Spark to retain knowledge about global variables or scope. The Executor nodes (nodes running spark actions) have copies of scoped variables and won't update the global state.
The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.
The same applies to working with a print statement on the driver node. The execturor's run their code and print output to their own stdout, not to the driver node's:
Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). Collect() can cause resource problems on the driver node:
This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine. Tuples can be operated on in RDD's by 'groupByKey' and 'mapValues'.
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) sorted(rdd.groupByKey().mapValues(len).collect())[('a', 2), ('b', 1)] sorted(rdd.groupByKey().mapValues(list).collect()) .join() for getting the similar key/value pairs as a new dataset. When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Garbage Collection: The main point to remember here is that the cost of garbage collection is proportional to the number of Java objects, so using data structures with fewer objects (e.g. an array of Ints instead of a LinkedList) greatly lowers this cost.
Memory issues: Before trying other techniques, the first thing to try if GC is a problem is to use serialized caching.
Dynamically Loading Spark Properties. In some cases, you may want to avoid hard-coding certain configurations in a SparkConf. For instance, if you’d like to run the same application with different masters or different amounts of memory. Spark allows you to simply create an empty conf.
Then, you can supply configuration values at runtime:
./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
Dataframes are basically a DataSet (derivative of RDD) squeezed into Rows and Columns.
Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.
UDF (user defined functions) can be used as 'SQL'-functions that run on values in DataFrames:
from pyspark.sql.types import IntegerType slen = udf(lambda s: len(s), IntegerType()) df.select(slen(df.name).alias('slen')).collect() [Row(slen=5), Row(slen=3)] This can be handy to create own transformation/filtering on the DataFrame. Things I didnt understand
Why is groupByKey slower than reduceByKey What is an associative and commutative reduce function. What is Output will be partitioned with C{numPartitions} partitions, or the default parallelism level if C{numPartitions} is not specified. Default partitioner is hash-partition. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations.
Things I want to do
Check what other builtin methods from operator has. Run spark cluster in docker Run spark cluster in Azure