diff --git a/bin/generate_cram_csv.sh b/bin/generate_cram_csv.sh index 89920fd..9e56831 100755 --- a/bin/generate_cram_csv.sh +++ b/bin/generate_cram_csv.sh @@ -7,36 +7,42 @@ # Author = yy5 # ><((((°> Y ><((((°> U ><((((°> M ><((((°> I ><((((°> +# NOTE: chunk_size is the number of containers per chunk (not records/reads) + # Function to process chunking of a CRAM file + chunk_cram() { local cram=$1 local chunkn=$2 local outcsv=$3 local crai=$4 - realcram=$(readlink -f ${cram}) - # realcrai=$(readlink -f ${cram}.crai) - realcrai=$(readlink -f ${crai}) + local chunk_size=$5 + local rgline=$(samtools view -H "${realcram}" | grep "@RG" | sed 's/\t/\\t/g' | sed "s/'//g") local ncontainers=$(zcat "${realcrai}" | wc -l) - # local ncontainers=$(cat "${realcram}" | wc -l) local base=$(basename "${realcram}" .cram) - local from=0 - local to=10000 + if [ $chunk_size -gt $ncontainers ]; then + chunk_size=$((ncontainers - 1)) + fi + local from=0 + local to=$((chunk_size - 1)) while [ $to -lt $ncontainers ]; do + #echo "chunk $chunkn: $from - $to" echo "${realcram},${realcrai},${from},${to},${base},${chunkn},${rgline}" >> $outcsv from=$((to + 1)) - ((to += 10000)) + to=$((to + chunk_size)) ((chunkn++)) done - if [ $from -le $ncontainers ]; then - echo "${realcram},${realcrai},${from},${ncontainers},${base},${chunkn},${rgline}" >> $outcsv + # Catch any remainder + if [ $from -lt $ncontainers ]; then + to=$((ncontainers - 1)) + #echo "chunk $chunkn: $from - $to" + echo "${realcram},${realcrai},${from},${to},${base},${chunkn},${rgline}" >> $outcsv ((chunkn++)) fi - - echo $chunkn } # Function to process a CRAM file @@ -45,24 +51,23 @@ process_cram_file() { local chunkn=$2 local outcsv=$3 local crai=$4 + local chunk_size=$5 local read_groups=$(samtools view -H "$cram" | grep '@RG' | awk '{for(i=1;i<=NF;i++){if($i ~ /^ID:/){print substr($i,4)}}}') local num_read_groups=$(echo "$read_groups" | wc -w) - if [ "$num_read_groups" -gt 1 ]; then # Multiple read groups: process each separately for rg in $read_groups; do local output_cram="$(basename "${cram%.cram}")_output_${rg}.cram" samtools view -h -r "$rg" -o "$output_cram" "$cram" - samtools index "$output_cram" - chunkn=$(chunk_cram "$output_cram" "$chunkn" "$outcsv" "$crai") + #chunkn=$(chunk_cram "$output_cram" "$chunkn" "$outcsv" "$crai" "$chunk_size") + chunk_cram "$output_cram" "$chunkn" "$outcsv" "$crai" "$chunk_size" done else # Single read group or no read groups - chunkn=$(chunk_cram "$cram" "$chunkn" "$outcsv" "$crai") + #chunkn=$(chunk_cram "$cram" "$chunkn" "$outcsv" "$crai" "$chunk_size") + chunk_cram "$cram" "$chunkn" "$outcsv" "$crai" "$chunk_size" fi - - echo $chunkn } # /\_/\ /\_/\ @@ -70,22 +75,25 @@ process_cram_file() { # > ^ < > ^ < # Check if cram_path is provided -if [ -z "$1" ]; then - echo "Usage: $0 " +if [ $# -lt 3 ]; then + echo "Usage: $0 " exit 1 fi -# cram_path=$1 cram=$1 -chunkn=0 outcsv=$2 crai=$3 +if [ -z "$4" ]; then + chunk_size=10000 +else + chunk_size=$4 +fi +chunkn=0 -# # Loop through each CRAM file in the specified directory. cram cannot be the synlinked cram -# for cram in ${cram_path}/*.cram; do -# realcram=$(readlink -f $cram) -# chunkn=$(process_cram_file $realcram $chunkn $outcsv) -# done +# Operates on a single CRAM file realcram=$(readlink -f $cram) realcrai=$(readlink -f $crai) -chunkn=$(process_cram_file $realcram $chunkn $outcsv $realcrai) +if [ -f "$outcsv" ]; then + rm "$outcsv" +fi +process_cram_file $realcram $chunkn $outcsv $realcrai $chunk_size diff --git a/conf/modules.config b/conf/modules.config index a7a59cb..ebf3fb0 100644 --- a/conf/modules.config +++ b/conf/modules.config @@ -15,14 +15,6 @@ process { ext.args = '-F 0x200 -nt' } - withName: '.*:.*:ALIGN_HIC:BWAMEM2_MEM' { - ext.args = { "-5SPCp -R ${meta.read_group}" } - } - - withName: '.*:.*:ALIGN_ILLUMINA:BWAMEM2_MEM' { - ext.args = { "-R ${meta.read_group}" } - } - withName: SAMTOOLS_MERGE { ext.args = { "-c -p" } ext.prefix = { "${meta.id}.merge" } @@ -50,23 +42,47 @@ process { ext.args = "--output-fmt cram" } - withName: ".*:.*:HIC_BWAMEM2:CRAM_FILTER_ALIGN_BWAMEM2_FIXMATE_SORT" { + withName: '.*:.*:ALIGN_HIC:BWAMEM2_MEM' { + ext.args = { "-5SPCp -R ${meta.read_group}" } + } + + withName: '.*:.*:ALIGN_ILLUMINA:BWAMEM2_MEM' { + ext.args = { "-p -R ${meta.read_group}" } + } + + withName: ".*:ALIGN_ILLUMINA:.*:CRAM_FILTER_ALIGN_BWAMEM2_FIXMATE_SORT" { ext.args = "" - ext.args1 = { "-F 0x200 -nt -1 '${base}_${chunkid}_1.fastq.gz'" } + ext.args1 = { "-F 0x200 -nt" } + ext.args2 = { "-p -R '${rglines}'" } + ext.args3 = "-mpu" + ext.args4 = { "--write-index -l1" } + } + + withName: ".*:ALIGN_ILLUMINA:.*:CRAM_FILTER_MINIMAP2_FILTER5END_FIXMATE_SORT" { + ext.args = "" + ext.args1 = { "-F 0x200 -nt" } + ext.args2 = { "-ax sr" } + ext.args3 = "-mpu" + ext.args4 = { "--write-index -l1" } + } + + withName: ".*:ALIGN_HIC:.*:CRAM_FILTER_ALIGN_BWAMEM2_FIXMATE_SORT" { + ext.args = "" + ext.args1 = { "-F 0x200 -nt" } ext.args2 = { "-5SPCp -R '${rglines}'" } ext.args3 = "-mpu" ext.args4 = { "--write-index -l1" } } - withName: ".*:.*:HIC_MINIMAP2:CRAM_FILTER_MINIMAP2_FILTER5END_FIXMATE_SORT" { + withName: ".*:ALIGN_HIC:.*:CRAM_FILTER_MINIMAP2_FILTER5END_FIXMATE_SORT" { ext.args = "" - ext.args1 = { "-F 0x200 -nt -1 '${base}_${chunkid}_1.fastq.gz'" } + ext.args1 = { "" } ext.args2 = { "-ax sr" } ext.args3 = "-mpu" ext.args4 = { "--write-index -l1" } } - withName: ".*:.*:HIC_MINIMAP2:MINIMAP2_INDEX" { + withName: "MINIMAP2_INDEX" { ext.args = { "${fasta.size() > 2.5e9 ? (" -I " + Math.ceil(fasta.size()/1e9)+"G") : ""} "} } @@ -105,6 +121,11 @@ process { ext.prefix = { "${bam.baseName}" } } + withName: SAMTOOLS_ADDREPLACERG { + ext.prefix = { "${input.baseName}_addRG" } + ext.args = { "-r ${meta.read_group} --no-PG" } + } + withName: SAMTOOLS_STATS { ext.prefix = { "${input.baseName}" } } diff --git a/conf/test.config b/conf/test.config index e59b389..add9178 100644 --- a/conf/test.config +++ b/conf/test.config @@ -26,7 +26,7 @@ params { outdir = "${projectDir}/results" // Aligner - hic_aligner = "minimap2" + short_aligner = "minimap2" // Fasta references fasta = "https://tolit.cog.sanger.ac.uk/test-data/Meles_meles/assembly/release/mMelMel3.1_paternal_haplotype/GCA_922984935.2.subset.fasta.gz" diff --git a/modules/local/cram_filter_align_bwamem2_fixmate_sort.nf b/modules/local/cram_filter_align_bwamem2_fixmate_sort.nf index 9b5ca29..667e49b 100644 --- a/modules/local/cram_filter_align_bwamem2_fixmate_sort.nf +++ b/modules/local/cram_filter_align_bwamem2_fixmate_sort.nf @@ -1,5 +1,5 @@ process CRAM_FILTER_ALIGN_BWAMEM2_FIXMATE_SORT { - tag "$meta.id" + tag "$meta.chunk_id" label "process_high" container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? diff --git a/modules/local/cram_filter_minimap2_filter5end_fixmate_sort.nf b/modules/local/cram_filter_minimap2_filter5end_fixmate_sort.nf index 8d8d69e..ceafb86 100644 --- a/modules/local/cram_filter_minimap2_filter5end_fixmate_sort.nf +++ b/modules/local/cram_filter_minimap2_filter5end_fixmate_sort.nf @@ -1,5 +1,5 @@ process CRAM_FILTER_MINIMAP2_FILTER5END_FIXMATE_SORT { - tag "$meta.id" + tag "$meta.chunk_id" label "process_high" container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? diff --git a/modules/local/generate_cram_csv.nf b/modules/local/generate_cram_csv.nf index 6a3b120..f3cfe96 100644 --- a/modules/local/generate_cram_csv.nf +++ b/modules/local/generate_cram_csv.nf @@ -1,8 +1,10 @@ process GENERATE_CRAM_CSV { tag "${meta.id}" - // label 'process_tiny' + label 'process_single' - container 'quay.io/sanger-tol/cramfilter_bwamem2_minimap2_samtools_perl:0.001-c1' + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + 'https://quay.io/sanger-tol/cramfilter_bwamem2_minimap2_samtools_perl:0.001-c1' : + 'sanger-tol/cramfilter_bwamem2_minimap2_samtools_perl:0.001-c1' }" input: tuple val(meta), path(crampath), path(crai) @@ -15,7 +17,7 @@ process GENERATE_CRAM_CSV { script: def prefix = task.ext.prefix ?: "${meta.id}" """ - generate_cram_csv.sh $crampath ${prefix}_cram.csv $crai + generate_cram_csv.sh $crampath ${prefix}_cram.csv $crai ${params.chunk_size} cat <<-END_VERSIONS > versions.yml "${task.process}": diff --git a/modules/local/samtools_addreplacerg.nf b/modules/local/samtools_addreplacerg.nf new file mode 100644 index 0000000..6515c3e --- /dev/null +++ b/modules/local/samtools_addreplacerg.nf @@ -0,0 +1,56 @@ +process SAMTOOLS_ADDREPLACERG { + tag "$meta.id" + label 'process_low' + + conda "bioconda::samtools=1.20" + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + 'https://depot.galaxyproject.org/singularity/samtools:1.20--h50ea8bc_0' : + 'biocontainers/samtools:1.20--h50ea8bc_0' }" + + input: + tuple val(meta), path(input) + + output: + tuple val(meta), path("${prefix}.bam"), emit: bam, optional: true + tuple val(meta), path("${prefix}.cram"), emit: cram, optional: true + tuple val(meta), path("${prefix}.sam"), emit: sam, optional: true + path "versions.yml", emit: versions + + when: + task.ext.when == null || task.ext.when + + script: + def args = task.ext.args ?: '' + prefix = task.ext.prefix ?: "${meta.id}" + file_type = input.getExtension() + if ("$input" == "${prefix}.${file_type}") error "Input and output names are the same, use \"task.ext.prefix\" to disambiguate!" + """ + samtools \\ + addreplacerg \\ + --threads $task.cpus \\ + $args \\ + -o ${prefix}.${file_type} \\ + $input + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + samtools: \$(echo \$(samtools --version 2>&1) | sed 's/^.*samtools //; s/Using.*\$//') + END_VERSIONS + """ + + stub: + def args = task.ext.args ?: '' + prefix = task.ext.prefix ?: "${meta.id}" + file_type = input.getExtension() + if ("$input" == "${prefix}.${file_type}") error "Input and output names are the same, use \"task.ext.prefix\" to disambiguate!" + + """ + touch ${prefix}.${file_type} + ${index} + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + samtools: \$(echo \$(samtools --version 2>&1) | sed 's/^.*samtools //; s/Using.*\$//') + END_VERSIONS + """ +} diff --git a/nextflow.config b/nextflow.config index 8c3c4d0..d143247 100644 --- a/nextflow.config +++ b/nextflow.config @@ -15,8 +15,11 @@ params { bwamem2_index = null fasta = null header = null + // Aligner option - hic_aligner = "minimap2" // Can choose minimap2 and bwamem2 + short_aligner = "minimap2" // Can choose minimap2 and bwamem2 + chunk_size = 10000 + // Output format options outfmt = "cram" compression = "crumble" diff --git a/nextflow_schema.json b/nextflow_schema.json index 63a6999..91eb9f6 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -54,13 +54,6 @@ "fa_icon": "fas fa-envelope", "help_text": "Set this parameter to your e-mail address to get a summary e-mail with details of the run sent to you when the workflow exits. If set in your user config file (`~/.nextflow/config`) then you don't need to specify this on the command line for every run.", "pattern": "^([a-zA-Z0-9_\\-\\.]+)@([a-zA-Z0-9_\\-\\.]+)\\.([a-zA-Z]{2,5})$" - }, - "hic_aligner": { - "type": "string", - "description": "The aligner to be used for HiC reads.", - "enum": ["bwamem2", "minimap2"], - "default": "minimap2", - "fa_icon": "fas fa-code" } } }, @@ -93,6 +86,26 @@ } } }, + "aligner_options": { + "title": "Aligner options", + "type": "object", + "description": "", + "default": "", + "properties": { + "short_aligner": { + "type": "string", + "default": "minimap2", + "description": "Alignment method to use for short-reads (options are: \"minimap2\" or \"bwamem2\"", + "enum": ["minimap2", "bwamem2"] + }, + "chunk_size": { + "type": "integer", + "default": 10000, + "description": "Chunk size for parallel processing of CRAM files, in number of containers", + "minimum": 1 + } + } + }, "execution": { "title": "Execution", "type": "object", @@ -292,6 +305,9 @@ { "$ref": "#/definitions/reference_genome_options" }, + { + "$ref": "#/definitions/aligner_options" + }, { "$ref": "#/definitions/execution" }, diff --git a/subworkflows/local/align_short.nf b/subworkflows/local/align_short.nf index cebc194..4f49cdc 100644 --- a/subworkflows/local/align_short.nf +++ b/subworkflows/local/align_short.nf @@ -2,21 +2,26 @@ // Align short read (HiC and Illumina) data against the genome // -include { SAMTOOLS_FASTQ } from '../../modules/nf-core/samtools/fastq/main' -include { BWAMEM2_MEM } from '../../modules/nf-core/bwamem2/mem/main' -include { SAMTOOLS_MERGE } from '../../modules/nf-core/samtools/merge/main' -include { SAMTOOLS_SORMADUP } from '../../modules/local/samtools_sormadup' - +include { SAMTOOLS_FASTQ } from '../../modules/nf-core/samtools/fastq/main' +include { SAMTOOLS_MERGE } from '../../modules/nf-core/samtools/merge/main' +include { SAMTOOLS_SORMADUP } from '../../modules/local/samtools_sormadup' +include { SAMTOOLS_INDEX } from '../../modules/nf-core/samtools/index/main' +include { GENERATE_CRAM_CSV } from '../../modules/local/generate_cram_csv' +include { SAMTOOLS_SORMADUP as CONVERT_CRAM } from '../../modules/local/samtools_sormadup' +include { SAMTOOLS_ADDREPLACERG } from '../../modules/local/samtools_addreplacerg' +include { MINIMAP2_MAPREDUCE } from '../../subworkflows/local/minimap2_mapreduce' +include { BWAMEM2_MAPREDUCE } from '../../subworkflows/local/bwamem2_mapreduce' workflow ALIGN_SHORT { take: - fasta // channel: [ val(meta), /path/to/fasta ] + fasta // channel: [ val(meta), /path/to/fasta ] reference_tuple index // channel: [ val(meta), /path/to/bwamem2/ ] - reads // channel: [ val(meta), /path/to/datafile ] + reads // channel: [ val(meta), /path/to/datafile ] hic_reads_path main: ch_versions = Channel.empty() + ch_merged_bam = Channel.empty() // Check file types and branch reads @@ -28,23 +33,60 @@ workflow ALIGN_SHORT { | set { ch_reads } - // Convert from CRAM to FASTQ only if CRAM files were provided as input - SAMTOOLS_FASTQ ( ch_reads.cram, false ) - ch_versions = ch_versions.mix ( SAMTOOLS_FASTQ.out.versions.first() ) - - - SAMTOOLS_FASTQ.out.fastq - | mix ( ch_reads.fastq ) - | set { ch_reads_fastq } - + // Convert FASTQ to CRAM only if FASTQ were provided as input + CONVERT_CRAM ( ch_reads.fastq, fasta ) + ch_versions = ch_versions.mix ( CONVERT_CRAM.out.versions ) + + SAMTOOLS_ADDREPLACERG ( CONVERT_CRAM.out.bam ) + ch_versions = ch_versions.mix ( SAMTOOLS_ADDREPLACERG.out.versions ) + + SAMTOOLS_ADDREPLACERG.out.cram + | mix ( ch_reads.cram ) + | set { ch_reads_cram } + + // Index the CRAM file + SAMTOOLS_INDEX ( ch_reads_cram ) + ch_versions = ch_versions.mix( SAMTOOLS_INDEX.out.versions ) + + ch_reads_cram + | join ( SAMTOOLS_INDEX.out.crai ) + | set { ch_reads_cram_crai } + + + // + // MODULE: generate a CRAM CSV file containing the required parametres for CRAM_FILTER_ALIGN_BWAMEM2_FIXMATE_SORT + // + GENERATE_CRAM_CSV( ch_reads_cram_crai ) + ch_versions = ch_versions.mix( GENERATE_CRAM_CSV.out.versions ) + + // + // SUBWORKFLOW: mapping hic reads using minimap2 or bwamem2 + // + if (params.short_aligner.startsWith("minimap")) { + MINIMAP2_MAPREDUCE ( + fasta, + GENERATE_CRAM_CSV.out.csv + ) + ch_versions = ch_versions.mix( MINIMAP2_MAPREDUCE.out.versions ) + ch_merged_bam = ch_merged_bam.mix(MINIMAP2_MAPREDUCE.out.mergedbam) + } else { + BWAMEM2_MAPREDUCE ( + fasta, + GENERATE_CRAM_CSV.out.csv, + index + ) + ch_versions = ch_versions.mix( BWAMEM2_MAPREDUCE.out.versions ) + ch_merged_bam = ch_merged_bam.mix(BWAMEM2_MAPREDUCE.out.mergedbam) + } - // Align Fastq to Genome and output sorted BAM - BWAMEM2_MEM ( ch_reads_fastq, index, fasta, true ) - ch_versions = ch_versions.mix ( BWAMEM2_MEM.out.versions.first() ) + ch_merged_bam + | combine( ch_reads_cram_crai ) + | map { meta_bam, bam, meta_cram, cram, crai -> [ meta_cram, bam ] } + | set { ch_merged_bam } - // Collect all BWAMEM2 output by sample name - BWAMEM2_MEM.out.bam + // Collect all BAM output by sample name + ch_merged_bam | map { meta, bam -> [['id': meta.id.split('_')[0..-2].join('_'), 'datatype': meta.datatype], meta.read_count, bam] } | groupTuple( by: [0] ) | map { meta, read_counts, bams -> [meta + [read_count: read_counts.sum()], bams] } @@ -58,7 +100,7 @@ workflow ALIGN_SHORT { // Merge, but only if there is more than 1 file SAMTOOLS_MERGE ( ch_bams.multi_bams, [ [], [] ], [ [], [] ] ) - ch_versions = ch_versions.mix ( SAMTOOLS_MERGE.out.versions.first() ) + ch_versions = ch_versions.mix ( SAMTOOLS_MERGE.out.versions ) SAMTOOLS_MERGE.out.bam diff --git a/subworkflows/local/align_short_hic.nf b/subworkflows/local/align_short_hic.nf deleted file mode 100644 index 39e0cab..0000000 --- a/subworkflows/local/align_short_hic.nf +++ /dev/null @@ -1,116 +0,0 @@ -// -// Align short read (HiC and Illumina) data against the genome -// - -include { SAMTOOLS_FASTQ } from '../../modules/nf-core/samtools/fastq/main' -include { SAMTOOLS_MERGE } from '../../modules/nf-core/samtools/merge/main' -include { SAMTOOLS_SORMADUP } from '../../modules/local/samtools_sormadup' -include { SAMTOOLS_INDEX } from '../../modules/nf-core/samtools/index/main' -include { GENERATE_CRAM_CSV } from '../../modules/local/generate_cram_csv' -include { SAMTOOLS_SORMADUP as CONVERT_CRAM } from '../../modules/local/samtools_sormadup' -include { HIC_MINIMAP2 } from '../../subworkflows/local/hic_minimap2' -include { HIC_BWAMEM2 } from '../../subworkflows/local/hic_bwamem2' - -workflow ALIGN_SHORT_HIC { - take: - fasta // channel: [ val(meta), /path/to/fasta ] reference_tuple - index // channel: [ val(meta), /path/to/bwamem2/ ] - reads // channel: [ val(meta), /path/to/datafile ] hic_reads_path - - - main: - ch_versions = Channel.empty() - ch_merged_bam = Channel.empty() - - // Check file types and branch - reads - | branch { - meta, reads -> - fastq : reads.findAll { it.getName().toLowerCase() =~ /.*f.*\.gz/ } - cram : true - } - | set { ch_reads } - - - // Convert FASTQ to CRAM only if FASTQ were provided as input - CONVERT_CRAM ( ch_reads.fastq, fasta ) - ch_versions = ch_versions.mix ( CONVERT_CRAM.out.versions ) - - CONVERT_CRAM.out.bam - | mix ( ch_reads.cram ) - | set { ch_reads_cram } - - - // Index the CRAM file - SAMTOOLS_INDEX ( ch_reads_cram ) - ch_versions = ch_versions.mix( SAMTOOLS_INDEX.out.versions ) - - ch_reads_cram - | join ( SAMTOOLS_INDEX.out.crai ) - | set { ch_reads_cram_crai } - - - // - // MODULE: generate a CRAM CSV file containing the required parametres for CRAM_FILTER_ALIGN_BWAMEM2_FIXMATE_SORT - // - GENERATE_CRAM_CSV( ch_reads_cram_crai ) - ch_versions = ch_versions.mix( GENERATE_CRAM_CSV.out.versions ) - - - // - // SUBWORKFLOW: mapping hic reads using minimap2 or bwamem2 - // - if (params.hic_aligner == 'minimap2') { - HIC_MINIMAP2 ( - fasta, - GENERATE_CRAM_CSV.out.csv - ) - ch_versions = ch_versions.mix( HIC_MINIMAP2.out.versions ) - ch_merged_bam = ch_merged_bam.mix(HIC_MINIMAP2.out.mergedbam) - } else { - HIC_BWAMEM2 ( - fasta, - GENERATE_CRAM_CSV.out.csv, - index - ) - ch_versions = ch_versions.mix( HIC_BWAMEM2.out.versions ) - ch_merged_bam = ch_merged_bam.mix(HIC_BWAMEM2.out.mergedbam) - } - - ch_merged_bam - | combine( ch_reads_cram_crai ) - | map { meta_bam, bam, meta_cram, cram, crai -> [ meta_cram, bam ] } - | set { ch_merged_bam } - - - // Collect all BAM output by sample name - ch_merged_bam - | map { meta, bam -> [['id': meta.id.split('_')[0..-2].join('_'), 'datatype': meta.datatype], meta.read_count, bam] } - | groupTuple( by: [0] ) - | map { meta, read_counts, bams -> [meta + [read_count: read_counts.sum()], bams] } - | branch { - meta, bams -> - single_bam: bams.size() == 1 - multi_bams: true - } - | set { ch_bams } - - - // Merge, but only if there is more than 1 file - SAMTOOLS_MERGE ( ch_bams.multi_bams, [ [], [] ], [ [], [] ] ) - ch_versions = ch_versions.mix ( SAMTOOLS_MERGE.out.versions.first() ) - - - SAMTOOLS_MERGE.out.bam - | mix ( ch_bams.single_bam ) - | set { ch_bam } - - - // Mark duplicates - SAMTOOLS_SORMADUP ( ch_bam, fasta ) - ch_versions = ch_versions.mix ( SAMTOOLS_SORMADUP.out.versions ) - - emit: - bam = SAMTOOLS_SORMADUP.out.bam // channel: [ val(meta), /path/to/bam ] - versions = ch_versions // channel: [ versions.yml ] -} diff --git a/subworkflows/local/hic_bwamem2.nf b/subworkflows/local/bwamem2_mapreduce.nf similarity index 91% rename from subworkflows/local/hic_bwamem2.nf rename to subworkflows/local/bwamem2_mapreduce.nf index edf4691..13711fb 100644 --- a/subworkflows/local/hic_bwamem2.nf +++ b/subworkflows/local/bwamem2_mapreduce.nf @@ -6,7 +6,7 @@ include { CRAM_FILTER_ALIGN_BWAMEM2_FIXMATE_SORT } from '../../modules/local/cram_filter_align_bwamem2_fixmate_sort' include { SAMTOOLS_MERGE } from '../../modules/nf-core/samtools/merge/main' -workflow HIC_BWAMEM2 { +workflow BWAMEM2_MAPREDUCE { take: fasta // Channel: tuple [ val(meta), path( file ) ] csv_ch @@ -22,7 +22,8 @@ workflow HIC_BWAMEM2 { .splitCsv() .map{ cram_id, cram_info -> tuple([ - id: cram_id.id + id: cram_id.id, + chunk_id: cram_id.id + "_" + cram_info[5] ], file(cram_info[0]), cram_info[1], @@ -35,8 +36,9 @@ workflow HIC_BWAMEM2 { } .set { ch_filtering_input } + // - // MODULE: map hic reads by 10,000 container per time using bwamem2 + // MODULE: map hic reads in each chunk using bwamem2 // CRAM_FILTER_ALIGN_BWAMEM2_FIXMATE_SORT ( ch_filtering_input, diff --git a/subworkflows/local/hic_minimap2.nf b/subworkflows/local/minimap2_mapreduce.nf similarity index 95% rename from subworkflows/local/hic_minimap2.nf rename to subworkflows/local/minimap2_mapreduce.nf index 23ab866..35b5aae 100644 --- a/subworkflows/local/hic_minimap2.nf +++ b/subworkflows/local/minimap2_mapreduce.nf @@ -8,7 +8,7 @@ include { SAMTOOLS_MERGE } from '../../modules/ include { MINIMAP2_INDEX } from '../../modules/nf-core/minimap2/index/main' -workflow HIC_MINIMAP2 { +workflow MINIMAP2_MAPREDUCE { take: fasta // Channel: tuple [ val(meta), path( file ) ] csv_ch @@ -37,7 +37,8 @@ workflow HIC_MINIMAP2 { .combine ( MINIMAP2_INDEX.out.index ) .map{ cram_id, cram_info, ref_id, ref_dir, mmi_id, mmi_path-> tuple([ - id: cram_id.id + id: cram_id.id, + chunk_id: cram_id.id + "_" + cram_info[5] ], file(cram_info[0]), cram_info[1], diff --git a/subworkflows/local/prepare_genome.nf b/subworkflows/local/prepare_genome.nf index aa951b6..f7a4754 100644 --- a/subworkflows/local/prepare_genome.nf +++ b/subworkflows/local/prepare_genome.nf @@ -30,7 +30,7 @@ workflow PREPARE_GENOME { // Unmask genome fasta UNMASK ( ch_fasta ) - ch_versions = ch_versions.mix ( UNMASK.out.versions.first() ) + ch_versions = ch_versions.mix ( UNMASK.out.versions ) // Generate BWA index if ( checkShortReads( params.input ) ) { @@ -42,14 +42,14 @@ workflow PREPARE_GENOME { if ( params.bwamem2_index.endsWith('.tar.gz') ) { ch_bwamem2_index = UNTAR ( ch_bwamem ).untar - ch_versions = ch_versions.mix ( UNTAR.out.versions.first() ) + ch_versions = ch_versions.mix ( UNTAR.out.versions ) } else { ch_bwamem2_index = ch_bwamem } } else { ch_bwamem2_index = BWAMEM2_INDEX ( UNMASK.out.fasta ).index - ch_versions = ch_versions.mix ( BWAMEM2_INDEX.out.versions.first() ) + ch_versions = ch_versions.mix ( BWAMEM2_INDEX.out.versions ) } } else { ch_bwamem2_index = Channel.empty() diff --git a/workflows/readmapping.nf b/workflows/readmapping.nf index a5e9575..a71c77e 100644 --- a/workflows/readmapping.nf +++ b/workflows/readmapping.nf @@ -18,7 +18,7 @@ include { SAMTOOLS_REHEADER } from '../modules/local/samtools_replaceh include { INPUT_CHECK } from '../subworkflows/local/input_check' include { PREPARE_GENOME } from '../subworkflows/local/prepare_genome' -include { ALIGN_SHORT_HIC as ALIGN_HIC } from '../subworkflows/local/align_short_hic' +include { ALIGN_SHORT as ALIGN_HIC } from '../subworkflows/local/align_short' include { ALIGN_SHORT as ALIGN_ILLUMINA } from '../subworkflows/local/align_short' include { ALIGN_PACBIO as ALIGN_HIFI } from '../subworkflows/local/align_pacbio' include { ALIGN_PACBIO as ALIGN_CLR } from '../subworkflows/local/align_pacbio'