Skip to content

Instantly share code, notes, and snippets.

@Juke34
Last active September 16, 2024 08:10
Show Gist options
  • Save Juke34/b881c602217da27a3f83e81cb367afb1 to your computer and use it in GitHub Desktop.
Save Juke34/b881c602217da27a3f83e81cb367afb1 to your computer and use it in GitHub Desktop.
Related to nextflow

How to sort a tuple

similar to the NF docu example for values

Channel
    .from( 1, 2, 3, 4, 5 )
    .filter { it % 2 == 1 }
    .view()

but addressing the tuple elements as required

 Channel
    .from( [1, 2], [1, 3], [1,4] )
    .filter { it[0]+it[1] % 2 == 1 }
    .view()

or

 Channel
    .from( ['a', 1], ['b', 2], ['c', 3] )
    .filter { letter, number -> number % 2 == 1 }
    .view()

Try to catch trace, report, timeline and log in outdir within the pipeline.nf

This did not work well and sometimes catch older file if available before the one for the run is generated

    // copy trace, report, timeline and log in outdir
    // folder called pipeline_report must be the one defined within the nextflow.config file
    // the dag cannot be copied from here because generated later.
    // some of the copied file might not be directly availabe,
    // so we try severarl times with some seconds of sleep in between
    source = file("pipeline_report/execution_timeline.html")
    retry {
      source.copyTo("${params.outdir}/pipeline_report/execution_timeline.html")
    }
    source = file("pipeline_report/execution_report.html")
    retry {
      source.copyTo("${params.outdir}/pipeline_report/execution_report.html")
    }
    source = file("pipeline_report/execution_trace.txt")
    retry {
      source.copyTo("${params.outdir}/pipeline_report/execution_trace.txt")
    }
    source = file(".nextflow.log")
    retry {
      source.copyTo("${params.outdir}/pipeline_report/nextflow.log")
    }


/*******************************************************************************
 *                          CUSTOM FUNCTIONS                                   *
 *******************************************************************************/

// This function allows to try several time and sleep between each try
def retry(int sleep_time = 6000, int times = 5, Closure errorHandler = {e-> log.warn(e.message,e)}
     , Closure body) {
  int retries = 0
  def exception
  while(retries++ < times) {
    try {
      return body.call()
    }
    catch(e) {
      exception = e
      sleep(sleep_time)
    }
  }
  def message = new Exception("Failed to copy: " + exception.message);
  errorHandler.call(message)
}

Synchronisation of input by sample name. The common mistake. He a proof it does not work:

In the main

        // Group files by sample 
        reads.view()
        tuple_sample_fastq_after_trimming.view()
        bowtie2.out.tuple_sample_fastq_matched.view()
        bowtie2.out.tuple_sample_fastq.view()
        write_output_tables.out.tuple_sample_metric_counts.view()
        
        concat_canard(reads, tuple_sample_fastq_after_trimming, bowtie2.out.tuple_sample_fastq_matched,     bowtie2.out.tuple_sample_fastq, write_output_tables.out.tuple_sample_metric_counts)

At this step I send that in the concat_canard process: reads:

[MPL3_S19_L001.R2, [/Users/jacquesdainat/git/IRD/ViroScan-nf/test/MPL3_S19_L001.R2.fastq.gz]]
[MPL12_S23_L001.R2, [/Users/jacquesdainat/git/IRD/ViroScan-nf/test/MPL12_S23_L001.R2.fastq.gz]]
[MPL3_S19_L001.R1, [/Users/jacquesdainat/git/IRD/ViroScan-nf/test/MPL3_S19_L001.R1.fastq.gz]]
[MPL12_S23_L001.R1, [/Users/jacquesdainat/git/IRD/ViroScan-nf/test/MPL12_S23_L001.R1.fastq.gz]]

tuple_sample_fastq_after_trimming:

[MPL3_S19_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/ef/76b58f3ac415d4251531398774de85/MPL3_S19_L001.R1.trimmed.fq.gz]
[MPL12_S23_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/66/d722d2fc9d3de492553dea4e937486/MPL12_S23_L001.R2.trimmed.fq.gz]
[MPL12_S23_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/86/09e10652d9b9f583f0c38f746a9904/MPL12_S23_L001.R1.trimmed.fq.gz]
[MPL3_S19_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/76/e9a24f8c76438866db434dc43b822c/MPL3_S19_L001.R2.trimmed.fq.gz]

tuple_sample_fastq_matched:

[MPL3_S19_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/b2/50eee75f90da5b03f4d474d9d9a51f/MPL3_S19_L001.R2_matched.fq.gz]
[MPL3_S19_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/b2/50eee75f90da5b03f4d474d9d9a51f/MPL3_S19_L001.R2_unmatched.fq.gz]
[MPL12_S23_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/35/89ab8e25e412e1cad6c4526b5643c0/MPL12_S23_L001.R1_unmatched.fq.gz]
[MPL12_S23_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/35/89ab8e25e412e1cad6c4526b5643c0/MPL12_S23_L001.R1_matched.fq.gz]

tuple_sample_fastq:

[MPL3_S19_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/df/de21aae2b02b2ab95d3a21a3d88f28/MPL3_S19_L001.R1_unmatched.fq.gz]
[MPL12_S23_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/35/89ab8e25e412e1cad6c4526b5643c0/MPL12_S23_L001.R1_unmatched.fq.gz]
[MPL3_S19_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/b2/50eee75f90da5b03f4d474d9d9a51f/MPL3_S19_L001.R2_unmatched.fq.gz]
[MPL12_S23_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/fc/846fb2bae0243611d3289c87b09bde/MPL12_S23_L001.R2_unmatched.fq.gz]

tuple_sample_metric_counts:

[MPL12_S23_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/fc/846fb2bae0243611d3289c87b09bde/MPL12_S23_L001.R2_unmatched.fq.gz]
[MPL3_S19_L001.R2, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/22/4670fcbc76d2693fe3c98d9f04c4f1/MPL3_S19_L001.R2_filterin.counts.txt]
[MPL12_S23_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/a4/2e2390e04e6c98f9a91cbc579c41e6/MPL12_S23_L001.R1_filterin.counts.txt]
[MPL3_S19_L001.R1, /Users/jacquesdainat/git/IRD/ViroScan-nf/work/5e/a5d3bf44d3dcff511c4c94dc7b2ffc/MPL3_S19_L001.R1_filterin.counts.txt]

Here the process to see what I get in:

process concat_canard{
    label 'bash'  
    publishDir "${params.outdir}/metrics", pattern: "*", mode: 'copy' // I want all sub-directories

    input:
        tuple val(sample), path (raw_reads)
        tuple val(sample), path (trimmed_reads)
        tuple val(sample), path (unmatched)
        tuple val(sample), path (matched)
        tuple val(sample), path (in_counts)

    output:
        path("${sample}_final_table.txt"), emit: sample_final_table

    script:
    log.info"""
        ${sample} ${raw_reads}
        ${sample} ${trimmed_reads}
        ${sample} ${unmatched}
        ${sample} ${matched}
        ${sample} ${in_counts}
    """
    """
    # work with raw fastq files input
    nb_read=\$(zcat ${raw_reads} | wc -l)
    READ_TOTAL=\$((\$nb_read/4))

    # work with fastq files trimmed
    nb_read=\$(zcat ${trimmed_reads} | wc -l)
    READ_TOTAL_TRIMMED=\$((\$nb_read/4))

    # Work with fastq files output
    nb_read=\$(zcat ${unmatched} | wc -l)
    FILTER_OUT_UNMATCH=\$((\$nb_read/4)) 
    nb_read=\$(zcat ${matched} | wc -l)
    FILTER_OUT_MATCH=\$((\$nb_read/4)) 

    # Work with table from breseq output
    NB_READS_TO_ALIGN=\$(awk '{print \$2}'  ${in_counts})
    NB_READS_ALIGNED=\$(awk '{print \$3}'  ${in_counts})
    echo -e "$sample\t\$READ_TOTAL\t\$READ_TOTAL_TRIMMED\t\$FILTER_OUT_UNMATCH\t\$FILTER_OUT_MATCH\t\$NB_READS_TO_ALIGN\t\$NB_READS_ALIGNED" >> ${sample}_final_table.txt

    """
    

And here the log output of that part:

          MPL12_S23_L001.R1 MPL3_S19_L001.R2.fastq.gz
        MPL12_S23_L001.R1 MPL3_S19_L001.R2.trimmed.fq.gz
        MPL12_S23_L001.R1 MPL3_S19_L001.R2_matched.fq.gz
        MPL12_S23_L001.R1 MPL3_S19_L001.R2_unmatched.fq.gz
        MPL12_S23_L001.R1 MPL12_S23_L001.R1_filterin.counts.txt
    

        MPL12_S23_L001.R2 MPL12_S23_L001.R2.fastq.gz
        MPL12_S23_L001.R2 MPL12_S23_L001.R1.trimmed.fq.gz
        MPL12_S23_L001.R2 MPL12_S23_L001.R1_matched.fq.gz
        MPL12_S23_L001.R2 MPL12_S23_L001.R1_unmatched.fq.gz
        MPL12_S23_L001.R2 MPL12_S23_L001.R2_filterin.counts.txt
    

        MPL3_S19_L001.R1 MPL3_S19_L001.R1.fastq.gz
        MPL3_S19_L001.R1 MPL12_S23_L001.R2.trimmed.fq.gz
        MPL3_S19_L001.R1 MPL12_S23_L001.R2_matched.fq.gz
        MPL3_S19_L001.R1 MPL12_S23_L001.R2_unmatched.fq.gz
        MPL3_S19_L001.R1 MPL3_S19_L001.R1_filterin.counts.txt
    

        MPL3_S19_L001.R2 MPL12_S23_L001.R1.fastq.gz
        MPL3_S19_L001.R2 MPL3_S19_L001.R1.trimmed.fq.gz
        MPL3_S19_L001.R2 MPL3_S19_L001.R1_matched.fq.gz
        MPL3_S19_L001.R2 MPL3_S19_L001.R1_unmatched.fq.gz
        MPL3_S19_L001.R2 MPL3_S19_L001.R2_filterin.counts.txt

Explanation:

We try to sync several channels (5 in total) using the same sample name as first value of the tuple. The log in the main shows that everything send into the process is sync, which means the sample name and the sample of the fastq send by each channel are correct e.g. (MPL12_S23 MPL12_S23.fastq).
The channels are async, so each channel send the 4 samples in its own order. When it get to the process we can see that the second value of the tuple differ while the first is the same. As we provided different variable names, the second values are correctly assigned to these variables. The first values we provided a unique variable name for each incoming tuple, each time a value is comming the variabla take its value. It is why the final value is actually the last tuple's first value. There is no magic at the input of the process making the different channel sync by the sample value, and we are mixing data from different sample in this case. To avoid that we must use a join() statement in main to provide a single channel containing all the values by sample.

P.S: For this example I used R1 and R2 as separate sample.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment