Skip to content

Instantly share code, notes, and snippets.

@jlhg
Last active November 26, 2015 14:22
Show Gist options
  • Save jlhg/56a3d833a7cc4ce3238c to your computer and use it in GitHub Desktop.
Save jlhg/56a3d833a7cc4ce3238c to your computer and use it in GitHub Desktop.
Example script for calculating differential expression with Spark
#!/usr/bin/env python
from pyspark import SparkContext
SparkContext.setSystemProperty('spark.executor.memory', '2g')
sc = SparkContext('local', 'Differential Expression Calculator')
cond1 = sc.textFile('yeast/eXpress-66.txt,yeast/eXpress-67.txt,yeast/eXpress-68.txt')
cond2 = sc.textFile('yeast/eXpress-69.txt,yeast/eXpress-70.txt,yeast/eXpress-71.txt')
def title_filter(line):
if line.startswith('bundle_id'):
return False
else:
return True
def map_fpkm(line):
data = line.split('\t')
target_id = data[1]
fpkm = float(data[10])
return (target_id, fpkm)
def diff_fpkm(pair):
value1, value2 = pair
if value2 == 0:
return 0
return value1 / value2
def de_filter(pair):
target_id, value = pair
if value > 2 or value < 0.5:
return True
else:
return False
merged_cond1 = cond1.filter(title_filter) \
.map(map_fpkm) \
.aggregateByKey((0, 0), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1])) \
.mapValues(lambda x: x[0] / x[1])
merged_cond2 = cond2.filter(title_filter) \
.map(map_fpkm) \
.aggregateByKey((0, 0), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1])) \
.mapValues(lambda x: x[0] / x[1])
result = merged_cond1.join(merged_cond2).mapValues(diff_fpkm).filter(de_filter)
result.saveAsTextFile('file:///share/de-result')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment