document = sc.textFile("hdfs:///user/user172/5000-8.txt")
wordCounts = document.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda left, right: left + right)
wordCounts.saveAsTextFile("hdfs:///user/user172/tp/wordcount")
pets = sc.parallelize([("cat", 1), ("dog", 1), ("cat", 2), ("dog", 3) ])
pets.groupByKey().map(lambda x: (x[0], 1.*sum(x[1])/len(x[1]))).collect()
visits = sc.parallelize( [("h", "1.2.3.4"), ("a", "3.4.5.6"), ("h", "1.3.3.1")] )
pageNames = sc.parallelize( [("h", "Home"), ("a", "About"), ("o", "Other")])
visits.cogroup(pageNames).map(lambda x: (x[0], ( list(x[1][0]), list(x[1][1]) ) ) ).collect()
g = sc.parallelize([(1,4), (4,2), (2,1), (3,2), (2,5), (5,3)])
reversed = g.map(lambda x: (x[1], x[0]))
countOutgoing = g.groupByKey().map(lambda x: (x[0], len(x[1])))
countIncomming = reversed.groupByKey().map(lambda x: (x[0], len(x[1])))
totalOutIn = countOutgoing.cogroup(countIncomming).map(lambda x: (x[0], ( sum(x[1][0]), sum(x[1][1]))))
numNodes = totalOutIn.count()
sinks = totalOutIn.filter(lambda x: x[1][0] == 0 and x[1][1] > 0).collect()
universalSinks = totalOutIn.filter(lambda x: x[1][0] == 0 and x[1][1] == numNodes).collect()