Skip to content

Instantly share code, notes, and snippets.

@mwiewior
Forked from agaszmurlo/coverage_bases_bed.scala
Last active May 11, 2019 18:48
Show Gist options
  • Save mwiewior/3fabbdfd8799e5ccfc6c727d59d21681 to your computer and use it in GitHub Desktop.
Save mwiewior/3fabbdfd8799e5ccfc6c727d59d21681 to your computer and use it in GitHub Desktop.
spark-shell -v --master=local[$cores] --driver-memory=12g --conf "spark.sql.catalogImplementation=in-memory" --packages org.biodatageeks:bdg-sequila_2.11:0.5.3-spark-2.4.0-SNAPSHOT --repositories http://repo.hortonworks.com/content/repositories/releases/,http://zsibio.ii.pw.edu.pl/nexus/repository/maven-snapshots/
import org.apache.spark.sql.SequilaSession
import org.biodatageeks.utils.{SequilaRegister, UDFRegister,BDGInternalParams}
val ss = SequilaSession(spark)
SequilaRegister.register(ss)
ss.sqlContext.setConf("spark.biodatageeks.bam.useGKLInflate","true")
ss.sqlContext.setConf("spark.biodatageeks.bam.useSparkBAM","false")
/* create table */
ss.sql("""
CREATE TABLE IF NOT EXISTS reads_exome
USING org.biodatageeks.datasources.BAM.BAMDataSource
OPTIONS(path '/data/granges/exome/NA12878.proper.wes.bam')""")
ss.sql("""
CREATE TABLE IF NOT EXISTS reads_wgs
USING org.biodatageeks.datasources.BAM.BAMDataSource
OPTIONS(path '/data/granges/exome/NA12878.proper.wgs.bam')""")
/* bases - do powtorzenia dla core = 1 TYLKO, bo mamy sie porownac z samtoolsem tylko */
spark.time{ ss.sql(s"SELECT * FROM bdg_coverage('reads_exome','NA12878', 'bases')").count }
spark.time{ ss.sql(s"SELECT * FROM bdg_coverage('reads_wgs','NA12878', 'bases')").count }
/* store as BED file do powtorzenia dla core = 1, 5, 10 */
spark.time { ss.sql(s"SELECT * FROM bdg_coverage('reads_exome','NA12878', 'blocks')")
.coalesce(1)
.write.mode("overwrite").option("delimiter", "\t").csv("/data/granges/exome/coverage.bed")}
spark.time { ss.sql(s"SELECT * FROM bdg_coverage('reads_wgs','NA12878', 'blocks')")
.coalesce(1)
.write.mode("overwrite").option("delimiter", "\t").csv("/data/granges/exome/coverage.bed")}
/*** NANOPORE **/
import org.apache.spark.sql.SequilaSession
import org.biodatageeks.utils.{SequilaRegister, UDFRegister,BDGInternalParams}
val ss = SequilaSession(spark)
SequilaRegister.register(ss)
ss.sqlContext.setConf("spark.biodatageeks.bam.useGKLInflate","true")
ss.sqlContext.setConf("spark.biodatageeks.bam.useSparkBAM","false")
//ss.sqlContext.setConf("spark.biodatageeks.readAligment.method", "disq")
// ss.sql("""
// CREATE TABLE IF NOT EXISTS qreads
// USING org.biodatageeks.datasources.BAM.BAMDataSource
// OPTIONS(path '/data/granges/nanopore/NA12878.chr21.quality.bam')""")
ss.sql("""
CREATE TABLE IF NOT EXISTS qreads
USING org.biodatageeks.datasources.BAM.BAMDataSource
OPTIONS(path 'file://data/work/nanopore_bam/NA12878.chr21.quality.bam')""")
ss.sql("select * from qreads limit 10").show
spark.time{ ss.sql(s"SELECT * FROM bdg_coverage('qreads','NA12878', 'blocks')").count }
/*ING*/
import org.apache.spark.sql.SequilaSession
import org.biodatageeks.utils.{SequilaRegister, UDFRegister,BDGInternalParams}
val ss = SequilaSession(spark)
SequilaRegister.register(ss)
ss.sqlContext.setConf("spark.biodatageeks.bam.useGKLInflate","true")
ss.sqlContext.setConf("spark.biodatageeks.bam.useSparkBAM","false")
ss.sql("""CREATE TABLE IF NOT EXISTS reads_exome USING org.biodatageeks.datasources.BAM.BAMDataSource OPTIONS(path '/tmp/fp16yq/data/exome/NA12878.*.bam')""")
ss.sql("""CREATE TABLE IF NOT EXISTS reads_wgs USING org.biodatageeks.datasources.BAM.BAMDataSource OPTIONS(path '/tmp/fp16yq/data/wgs/NA12878.*.bam')""")
/* bases - do powtorzenia dla core = 1 TYLKO, bo mamy sie porownac z samtoolsem tylko */
spark.time{ ss.sql(s"SELECT * FROM bdg_coverage('reads_exome','NA12878', 'bases')").write.format("parquet").save("/tmp/fp16yq/data/wes_1_base_3.parquet") }
spark.time{ ss.sql(s"SELECT * FROM bdg_coverage('reads_wgs','NA12878', 'bases')").count }
/* store as BED file do powtorzenia dla core = 1, 5, 10 */
spark.time { ss.sql(s"SELECT * FROM bdg_coverage('reads_exome','NA12878', 'blocks')")
.coalesce(1)
.write.mode("overwrite").option("delimiter", "\t").csv("/tmp/fp16yq/data/wes_10_blocks_3.bed")}
spark.time { ss.sql(s"SELECT * FROM bdg_coverage('reads_wgs','NA12878', 'blocks')")
.coalesce(1)
.write.mode("overwrite").option("delimiter", "\t").csv("/tmp/fp16yq/data/wgs_1_blocks_1.bed")}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment