Last active
September 25, 2017 09:01
-
-
Save bibiboot/0bdd00d971aa70c1c13cbb7edcd9b8e9 to your computer and use it in GitHub Desktop.
PYSPARK Hello world script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A SchemaRDD can be created for a JSON dataset represented by | |
# an RDD[String] storing one JSON object per string. | |
anotherPeopleRDD = sc.parallelize(['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}', | |
'{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']) | |
anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) | |
print anotherPeople.show() | |
print anotherPeople.printSchema() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
{"created_at": 122, "id": 748, "text": "Hello Im posting a tweet!"} | |
{"created_at": 1222, "id": 748, "text": "Im posting a tweet!"} | |
"""" | |
from pyspark.sql.types import * | |
fields = [StructField("text", StringType(), True), StructField("id", IntegerType(), True), StructField("created_at", IntegerType(), True) ] | |
jsonSchema = StructType(fields) | |
#df = sqlContext.read.load("/FileStore/tables/gorke4nr1467407735399/citylots.json", format="json", schema=jsonSchema) | |
df = sqlContext.read.json("/FileStore/tables/gorke4nr1467407735399/citylots.json", schema=jsonSchema) | |
print df.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#{"id": 748238620146839552, "text": "Hello Im posting a tweet!"} | |
#{"id": 748237961842483200, "text": "Im posting a tweet!"} | |
#JSON must have double quotes inside them for the below code to work fine. | |
#path = /tmp/data.json | |
#One JSON object per line | |
#df = sqlContext.jsonFile(path) (deprecated) | |
#df = sqlContext.read.load("FileStore/tables/36op5ogr1467404497005/citylots.json", format="json") | |
df = sqlContext.json.read(path) | |
print df.printSchema() | |
print df.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#{"id": 748238620146839552, "text": "Hello Im posting a tweet!"} | |
#{"id": 748237961842483200, "text": "Im posting a tweet!"} | |
#JSON must have double quotes inside them for the below code to work fine. | |
#path = /tmp/data.json | |
#One JSON object per line | |
rdd_strings = sc.textFile(path) | |
rdd_dicts = rds.map(json.loads) | |
print rdd_dicts.collect() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/06%20FileStream%20Word%20Count%20-%20Python.html | |
from pyspark.streaming import StreamingContext | |
batchIntervalSeconds = 10 | |
def creatingFunc(): | |
ssc = StreamingContext(sc, batchIntervalSeconds) | |
# Set each DStreams in this context to remember RDDs it generated in the last given duration. | |
# DStreams remember RDDs only for a limited duration of time and releases them for garbage | |
# collection. This method allows the developer to specify how long to remember the RDDs ( | |
# if the developer wishes to query old data outside the DStream computation). | |
ssc.remember(60) | |
lines = ssc.textFileStream("YOUR_S3_PATH_HERE") | |
lines.pprint() | |
words = lines.flatMap(lambda line: line.split(",")) | |
pairs = words.map(lambda word: (word, 1)) | |
wordCounts = pairs.reduceByKey(lambda x, y: x + y) | |
def process(time, rdd): | |
df = sqlContext.createDataFrame(rdd) | |
df.registerTempTable("myCounts") | |
wordCounts.foreachRDD(process) | |
return ssc | |
checkpointDir = "dbfs:/SOME_CHECKPOINT_DIRECTORY" | |
ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc) | |
# This line starts the streaming context in the background. | |
ssc.start() | |
# This ensures the cell is put on hold until the background streaming thread is started properly. | |
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.streaming import StreamingContext | |
dvc = [[-0.1, -0.1], [0.1, 0.1], [1.1, 1.1], [0.75, 0.75], [0.9, 0.9]] | |
dvc = [sc.parallelize(i, 1) for i in dvc] | |
ssc = StreamingContext(sc, 2.0) | |
input_stream = ssc.queueStream(dvc) | |
def get_output(rdd): | |
rdd_data = rdd.collect() | |
if 0.75 in rdd_data: | |
print "Ending marker found", rdd_data | |
ssc.stop() | |
else: | |
print "Not found ending marker. Continuing" | |
print rdd_data | |
input_stream.foreachRDD(get_output) | |
ssc.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment