similar to the NF docu example for values
Channel
.from( 1, 2, 3, 4, 5 )
.filter { it % 2 == 1 }
.view()
but addressing the tuple elements as required
Channel
.from( [1, 2], [1, 3], [1,4] )
.filter { it[0]+it[1] % 2 == 1 }
.view()
or
Channel
.from( ['a', 1], ['b', 2], ['c', 3] )
.filter { letter, number -> number % 2 == 1 }
.view()
This did not work well and sometimes catch older file if available before the one for the run is generated
// copy trace, report, timeline and log in outdir
// folder called pipeline_report must be the one defined within the nextflow.config file
// the dag cannot be copied from here because generated later.
// some of the copied file might not be directly availabe,
// so we try severarl times with some seconds of sleep in between
source = file("pipeline_report/execution_timeline.html")
retry {
source.copyTo("${params.outdir}/pipeline_report/execution_timeline.html")
}
source = file("pipeline_report/execution_report.html")
retry {
source.copyTo("${params.outdir}/pipeline_report/execution_report.html")
}
source = file("pipeline_report/execution_trace.txt")
retry {
source.copyTo("${params.outdir}/pipeline_report/execution_trace.txt")
}
source = file(".nextflow.log")
retry {
source.copyTo("${params.outdir}/pipeline_report/nextflow.log")
}
/*******************************************************************************
* CUSTOM FUNCTIONS *
*******************************************************************************/
// This function allows to try several time and sleep between each try
def retry(int sleep_time = 6000, int times = 5, Closure errorHandler = {e-> log.warn(e.message,e)}
, Closure body) {
int retries = 0
def exception
while(retries++ < times) {
try {
return body.call()
}
catch(e) {
exception = e
sleep(sleep_time)
}
}
def message = new Exception("Failed to copy: " + exception.message);
errorHandler.call(message)
}
// Group files by sample
reads.view()
tuple_sample_fastq_after_trimming.view()
bowtie2.out.tuple_sample_fastq_matched.view()
bowtie2.out.tuple_sample_fastq.view()
write_output_tables.out.tuple_sample_metric_counts.view()
concat_canard(reads, tuple_sample_fastq_after_trimming, bowtie2.out.tuple_sample_fastq_matched, bowtie2.out.tuple_sample_fastq, write_output_tables.out.tuple_sample_metric_counts)
At this step I send that in the concat_canard process: reads:
[MPL3_S19_L001.R2, [/Users/jacquesdainat/git/IRD/ViroScan-nf/test/MPL3_S19_L001.R2.fastq.gz]]
[MPL12_S23_L001.R2, [/Users/jacquesdainat/git/IRD/ViroScan-nf/test/MPL12_S23_L001.R2.fastq.gz]]
[MPL3_S19_L001.R1, [/Users/jacquesdainat/git/IRD/ViroScan-nf/test/MPL3_S19_L001.R1.fastq.gz]]
[MPL12_S23_L001.R1, [/Users/jacquesdainat/git/IRD/ViroScan-nf/test/MPL12_S23_L001.R1.fastq.gz]]
tuple_sample_fastq_after_trimming:
[MPL3_S19_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/ef/76b58f3ac415d4251531398774de85/MPL3_S19_L001.R1.trimmed.fq.gz]
[MPL12_S23_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/66/d722d2fc9d3de492553dea4e937486/MPL12_S23_L001.R2.trimmed.fq.gz]
[MPL12_S23_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/86/09e10652d9b9f583f0c38f746a9904/MPL12_S23_L001.R1.trimmed.fq.gz]
[MPL3_S19_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/76/e9a24f8c76438866db434dc43b822c/MPL3_S19_L001.R2.trimmed.fq.gz]
tuple_sample_fastq_matched:
[MPL3_S19_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/b2/50eee75f90da5b03f4d474d9d9a51f/MPL3_S19_L001.R2_matched.fq.gz]
[MPL3_S19_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/b2/50eee75f90da5b03f4d474d9d9a51f/MPL3_S19_L001.R2_unmatched.fq.gz]
[MPL12_S23_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/35/89ab8e25e412e1cad6c4526b5643c0/MPL12_S23_L001.R1_unmatched.fq.gz]
[MPL12_S23_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/35/89ab8e25e412e1cad6c4526b5643c0/MPL12_S23_L001.R1_matched.fq.gz]
tuple_sample_fastq:
[MPL3_S19_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/df/de21aae2b02b2ab95d3a21a3d88f28/MPL3_S19_L001.R1_unmatched.fq.gz]
[MPL12_S23_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/35/89ab8e25e412e1cad6c4526b5643c0/MPL12_S23_L001.R1_unmatched.fq.gz]
[MPL3_S19_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/b2/50eee75f90da5b03f4d474d9d9a51f/MPL3_S19_L001.R2_unmatched.fq.gz]
[MPL12_S23_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/fc/846fb2bae0243611d3289c87b09bde/MPL12_S23_L001.R2_unmatched.fq.gz]
tuple_sample_metric_counts:
[MPL12_S23_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/fc/846fb2bae0243611d3289c87b09bde/MPL12_S23_L001.R2_unmatched.fq.gz]
[MPL3_S19_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/22/4670fcbc76d2693fe3c98d9f04c4f1/MPL3_S19_L001.R2_filterin.counts.txt]
[MPL12_S23_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/a4/2e2390e04e6c98f9a91cbc579c41e6/MPL12_S23_L001.R1_filterin.counts.txt]
[MPL3_S19_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/5e/a5d3bf44d3dcff511c4c94dc7b2ffc/MPL3_S19_L001.R1_filterin.counts.txt]
process concat_canard{
label 'bash'
publishDir "${params.outdir}/metrics", pattern: "*", mode: 'copy' // I want all sub-directories
input:
tuple val(sample), path (raw_reads)
tuple val(sample), path (trimmed_reads)
tuple val(sample), path (unmatched)
tuple val(sample), path (matched)
tuple val(sample), path (in_counts)
output:
path("${sample}_final_table.txt"), emit: sample_final_table
script:
log.info"""
${sample} ${raw_reads}
${sample} ${trimmed_reads}
${sample} ${unmatched}
${sample} ${matched}
${sample} ${in_counts}
"""
"""
# work with raw fastq files input
nb_read=\$(zcat ${raw_reads} | wc -l)
READ_TOTAL=\$((\$nb_read/4))
# work with fastq files trimmed
nb_read=\$(zcat ${trimmed_reads} | wc -l)
READ_TOTAL_TRIMMED=\$((\$nb_read/4))
# Work with fastq files output
nb_read=\$(zcat ${unmatched} | wc -l)
FILTER_OUT_UNMATCH=\$((\$nb_read/4))
nb_read=\$(zcat ${matched} | wc -l)
FILTER_OUT_MATCH=\$((\$nb_read/4))
# Work with table from breseq output
NB_READS_TO_ALIGN=\$(awk '{print \$2}' ${in_counts})
NB_READS_ALIGNED=\$(awk '{print \$3}' ${in_counts})
echo -e "$sample\t\$READ_TOTAL\t\$READ_TOTAL_TRIMMED\t\$FILTER_OUT_UNMATCH\t\$FILTER_OUT_MATCH\t\$NB_READS_TO_ALIGN\t\$NB_READS_ALIGNED" >> ${sample}_final_table.txt
"""
And here the log output of that part:
MPL12_S23_L001.R1 MPL3_S19_L001.R2.fastq.gz
MPL12_S23_L001.R1 MPL3_S19_L001.R2.trimmed.fq.gz
MPL12_S23_L001.R1 MPL3_S19_L001.R2_matched.fq.gz
MPL12_S23_L001.R1 MPL3_S19_L001.R2_unmatched.fq.gz
MPL12_S23_L001.R1 MPL12_S23_L001.R1_filterin.counts.txt
MPL12_S23_L001.R2 MPL12_S23_L001.R2.fastq.gz
MPL12_S23_L001.R2 MPL12_S23_L001.R1.trimmed.fq.gz
MPL12_S23_L001.R2 MPL12_S23_L001.R1_matched.fq.gz
MPL12_S23_L001.R2 MPL12_S23_L001.R1_unmatched.fq.gz
MPL12_S23_L001.R2 MPL12_S23_L001.R2_filterin.counts.txt
MPL3_S19_L001.R1 MPL3_S19_L001.R1.fastq.gz
MPL3_S19_L001.R1 MPL12_S23_L001.R2.trimmed.fq.gz
MPL3_S19_L001.R1 MPL12_S23_L001.R2_matched.fq.gz
MPL3_S19_L001.R1 MPL12_S23_L001.R2_unmatched.fq.gz
MPL3_S19_L001.R1 MPL3_S19_L001.R1_filterin.counts.txt
MPL3_S19_L001.R2 MPL12_S23_L001.R1.fastq.gz
MPL3_S19_L001.R2 MPL3_S19_L001.R1.trimmed.fq.gz
MPL3_S19_L001.R2 MPL3_S19_L001.R1_matched.fq.gz
MPL3_S19_L001.R2 MPL3_S19_L001.R1_unmatched.fq.gz
MPL3_S19_L001.R2 MPL3_S19_L001.R2_filterin.counts.txt
We try to sync several channels (5 in total) using the same sample name as first value of the tuple.
The log in the main shows that everything send into the process is sync, which means the sample name and the sample of the fastq send by each channel are correct e.g. (MPL12_S23 MPL12_S23.fastq).
The channels are async, so each channel send the 4 samples in its own order. When it get to the process we can see that the second value of the tuple differ while the first is the same.
As we provided different variable names, the second values are correctly assigned to these variables.
The first values we provided a unique variable name for each incoming tuple, each time a value is comming the variabla take its value. It is why the final value is actually the last tuple's first value.
There is no magic at the input of the process making the different channel sync by the sample value, and we are mixing data from different sample in this case. To avoid that we must use a join() statement in main to provide a single channel containing all the values by sample.
P.S: For this example I used R1 and R2 as separate sample.