From 854fa2598b2015ea3f8b7de43ba291891584ed37 Mon Sep 17 00:00:00 2001 From: Abhinav Sharma Date: Sun, 11 Sep 2022 18:13:32 +0200 Subject: [PATCH] add draft modules with pseudocode for formatdata subworkflow --- .../formatdata_processes/extract_chro_gwas.nf | 17 ++ .../extract_rs_id_chro.nf | 12 ++ .../formatdata_processes/get_list_echro.nf | 13 ++ .../local/formatdata_processes/merge_all.nf | 20 +++ .../merge_rs_gwas_chro.nf | 21 +++ nextflow.config | 155 +++++++++++++++--- subworkflows/local/formatdata.nf | 105 ++++++++++++ 7 files changed, 316 insertions(+), 27 deletions(-) create mode 100644 modules/local/formatdata_processes/extract_chro_gwas.nf create mode 100644 modules/local/formatdata_processes/extract_rs_id_chro.nf create mode 100644 modules/local/formatdata_processes/get_list_echro.nf create mode 100644 modules/local/formatdata_processes/merge_all.nf create mode 100644 modules/local/formatdata_processes/merge_rs_gwas_chro.nf create mode 100644 subworkflows/local/formatdata.nf diff --git a/modules/local/formatdata_processes/extract_chro_gwas.nf b/modules/local/formatdata_processes/extract_chro_gwas.nf new file mode 100644 index 0000000..f857109 --- /dev/null +++ b/modules/local/formatdata_processes/extract_chro_gwas.nf @@ -0,0 +1,17 @@ +process ExtractChroGWAS{ + memory params.mem_req + input : + file(gwas) from gwas_chrolist_ext + each chro from chrolist2 + output : + set val(chro), file(gwas_out) into gwas_format_chro + script : + gwas_out=gwas.baseName+"_"+chro+".gwas" + sep=(params.sep!="") ? "" : "" + infofile="Chro:${params.head_chr}:${params.headnew_chr},Pos:${params.head_bp}:${params.headnew_bp},A2:${params.head_A2}:${params.headnew_A2},A1:${params.head_A1}:${params.headnew_A1},af:${params.head_freq}:${params.headnew_freq},Beta:${params.head_beta}:${params.headnew_beta},Se:${params.head_se}:${params.headnew_se},Pval:${params.head_pval}:${params.headnew_pval},N:${params.head_N}:${params.headnew_N}" + """ + extractandformat_gwas.py --input_file $gwas --out_file ${gwas_out} --chr $chro --info_file $infofile + """ +} + + diff --git a/modules/local/formatdata_processes/extract_rs_id_chro.nf b/modules/local/formatdata_processes/extract_rs_id_chro.nf new file mode 100644 index 0000000..01a64c8 --- /dev/null +++ b/modules/local/formatdata_processes/extract_rs_id_chro.nf @@ -0,0 +1,12 @@ +process ExtractRsIDChro{ + memory params.mem_req + input : + set val(chro), file(gwas), file(rsinfo) from gwas_format_chro_rs + output : + set val(chro), file(gwas),file(outrs) into rsinfo_chro + script : + outrs="info_rs_"+chro+".rs" + """ + zcat $rsinfo | extractrsid_bypos.py --file_chrbp $gwas --out_file $outrs --ref_file stdin --chr $chro --chro_ps ${params.poshead_chro_inforef} --bp_ps ${params.poshead_bp_inforef} --rs_ps ${params.poshead_rs_inforef} --a1_ps ${params.poshead_a1_inforef} --a2_ps ${params.poshead_a2_inforef} + """ +} diff --git a/modules/local/formatdata_processes/get_list_echro.nf b/modules/local/formatdata_processes/get_list_echro.nf new file mode 100644 index 0000000..7d7c3e7 --- /dev/null +++ b/modules/local/formatdata_processes/get_list_echro.nf @@ -0,0 +1,13 @@ +process getListeChro{ + input : + file(gwas_res) from gwas_chrolist + output : + file("filechro") into chrolist + script: + sep=(params.sep!="") ? " --sep ${params.sep}" : "" + """ + extractlistchro.py --input_file $gwas_res --chro_header ${params.head_chr} $sep > filechro + """ +} + + diff --git a/modules/local/formatdata_processes/merge_all.nf b/modules/local/formatdata_processes/merge_all.nf new file mode 100644 index 0000000..6618013 --- /dev/null +++ b/modules/local/formatdata_processes/merge_all.nf @@ -0,0 +1,20 @@ +process MergeAll{ + memory params.mem_req + input : + file(allfile) from gwas_rsmerge_all + publishDir "${params.output_dir}/", overwrite:true, mode:'copy' + output : + file(fileout) + script : + file1=allfile[0] + listefiles=allfile.join(" ") + fileout=params.output + """ + head -1 $file1 > $fileout + ls $listefiles | xargs -n 1 tail -n +2 >> $fileout + """ + +} + + + diff --git a/modules/local/formatdata_processes/merge_rs_gwas_chro.nf b/modules/local/formatdata_processes/merge_rs_gwas_chro.nf new file mode 100644 index 0000000..68c4fe0 --- /dev/null +++ b/modules/local/formatdata_processes/merge_rs_gwas_chro.nf @@ -0,0 +1,21 @@ +process MergeRsGwasChro{ + memory params.mem_req + input : + set val(chro), file(gwas),file(chrors), file(bed), file(bim), file(fam) from rsinfo_chroplk + output : + file(outmerge) into gwas_rsmerge + script : + outmerge="merge_"+chro+".gwas" + bfileopt= (params.input_pat!="" || params.input_dir!="") ? " --bfile "+bed.baseName : "" + Nheadopt=(params.head_N!="") ? " --N_head ${params.head_N} " : "" + Freqheadopt=(params.head_freq!="") ? " --freq_head ${params.head_freq} " : "" + + NheadNewopt=(params.headnew_N!="") ? " --Nnew_head ${params.headnew_N} " : "" + FreqNewheadopt=(params.headnew_freq!="") ? " --freqnew_head ${params.headnew_freq} " : "" + """ + mergeforrs.py --input_gwas $gwas --input_rs $chrors --out_file $outmerge --chro_head ${params.headnew_chr} --bp_head ${params.headnew_bp} --rs_head ${params.headnew_rs} --chro $chro $bfileopt $Nheadopt $Freqheadopt $NheadNewopt $FreqNewheadopt --a1_head ${params.headnew_A1} --a2_head ${params.headnew_A2} + """ + +} + + diff --git a/nextflow.config b/nextflow.config index 89c4853..a4b45f7 100644 --- a/nextflow.config +++ b/nextflow.config @@ -1,15 +1,60 @@ /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - nf-core/gwas Nextflow config file +nf-core/gwas Nextflow config file ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Default config options for all compute environments +Default config options for all compute environments ---------------------------------------------------------------------------------------- */ // Global default params, used in configs params { + + //------------------------ - // TODO: h3abionet qc params + // NOTE: h3abionet formatdata params + //------------------------ + + + mem_req = '2GB' // how much plink needs for this + file_gwas="" + output_dir="out" + output="output" + input_dir="" + input_pat="" + + head_pval = "" + head_freq = "" + head_bp = "" + head_chr = "" + head_rs = "" + head_beta="" + head_se="" + head_A1="" + head_A2="" + head_N="" + sep="" + + headnew_pval = "" + headnew_freq = "" + headnew_bp = "" + headnew_chr = "" + headnew_rs = "rs" + headnew_beta="" + headnew_se="" + headnew_A1="" + headnew_A2="" + headnew_N="" + + poshead_chro_inforef=0 + poshead_bp_inforef=1 + poshead_rs_inforef=2 + poshead_a1_inforef=3 + poshead_a2_inforef=4 + + + + //------------------------ + // NOTE: h3abionet qc params //------------------------ // Input options @@ -286,34 +331,90 @@ manifest { includeConfig 'conf/modules.config' // Function to ensure that resource requirements don't go beyond -// a maximum limit -def check_max(obj, type) { - if (type == 'memory') { - try { - if (obj.compareTo(params.max_memory as nextflow.util.MemoryUnit) == 1) + // a maximum limit + def check_max(obj, type) { + if (type == 'memory') + { + try + { + if (obj.compareTo(params.max_memory as nextflow.util.MemoryUnit) == 1) return params.max_memory as nextflow.util.MemoryUnit - else + else return obj - } catch (all) { - println " ### ERROR ### Max memory '${params.max_memory}' is not valid! Using default value: $obj" - return obj - } - } else if (type == 'time') { - try { - if (obj.compareTo(params.max_time as nextflow.util.Duration) == 1) + } catch (all) { + println " ### ERROR ### Max memory '${params.max_memory}' is not valid! Using default value: $obj" + return obj + } + } else if (type == 'time') + { + try + { + if (obj.compareTo(params.max_time as nextflow.util.Duration) == 1) return params.max_time as nextflow.util.Duration - else + else return obj - } catch (all) { - println " ### ERROR ### Max time '${params.max_time}' is not valid! Using default value: $obj" - return obj - } - } else if (type == 'cpus') { - try { - return Math.min( obj, params.max_cpus as int ) - } catch (all) { - println " ### ERROR ### Max cpus '${params.max_cpus}' is not valid! Using default value: $obj" - return obj + } catch (all) { + println " ### ERROR ### Max time '${params.max_time}' is not valid! Using default value: $obj" + return obj + } + } else if (type == 'cpus') + { + try + { + return Math.min( obj, params.max_cpus as int ) + } catch (all) { + println " ### ERROR ### Max cpus '${params.max_cpus}' is not valid! Using default value: $obj" + return obj + } } } + name = 'nf-core/gwas' + author = 'Davi Josué Marcon (@Mxrcon), Abhinav Sharma (@abhi18av), Jean-Tristan Brandenburg (@jeantristanb), Lindsay Clark (@lvclark) and Scott Hazelhurst (@shaze).' + homePage = 'https://github.com/nf-core/gwas' + description = 'A pipeline for genome wide association studies.' + mainScript = 'main.nf' + nextflowVersion = '!>=21.10.3' + version = '1.0dev' } + +// Load modules.config for DSL2 module specific options +includeConfig 'conf/modules.config' + +// Function to ensure that resource requirements don't go beyond + // a maximum limit + def check_max(obj, type) { + if (type == 'memory') + { + try + { + if (obj.compareTo(params.max_memory as nextflow.util.MemoryUnit) == 1) + return params.max_memory as nextflow.util.MemoryUnit + else + return obj + } catch (all) { + println " ### ERROR ### Max memory '${params.max_memory}' is not valid! Using default value: $obj" + return obj + } + } else if (type == 'time') + { + try + { + if (obj.compareTo(params.max_time as nextflow.util.Duration) == 1) + return params.max_time as nextflow.util.Duration + else + return obj + } catch (all) { + println " ### ERROR ### Max time '${params.max_time}' is not valid! Using default value: $obj" + return obj + } + } else if (type == 'cpus') + { + try + { + return Math.min( obj, params.max_cpus as int ) + } catch (all) { + println " ### ERROR ### Max cpus '${params.max_cpus}' is not valid! Using default value: $obj" + return obj + } + } + } diff --git a/subworkflows/local/formatdata.nf b/subworkflows/local/formatdata.nf new file mode 100644 index 0000000..4d68a73 --- /dev/null +++ b/subworkflows/local/formatdata.nf @@ -0,0 +1,105 @@ +#!/usr/bin/env nextflow + +import java.nio.file.Paths; +import sun.nio.fs.UnixPath; +import java.security.MessageDigest; + + +filescript=file(workflow.scriptFile) +projectdir="${filescript.getParent()}" +dummy_dir="${projectdir}/../../qc/input" + + +// Checks if the file exists +checker = { fn -> + if (fn.exists()) + return fn; + else + error("\n\n------\nError in your config\nFile $fn does not exist\n\n---\n") +} + +def helps = [ 'help' : 'help' ] +allowed_params = ['file_gwas', 'file_ref_gzip', "output_dir","output", "input_dir", "input_pat"] +allowed_header = ['head_pval', 'head_freq', 'head_bp', 'head_chr', 'head_rs', 'head_beta', 'head_se', 'head_A1', 'head_A2', 'sep'] +allowed_headnewernew = ['headnew_pval', 'headnew_freq', 'headnew_bp', 'headnew_chr', 'headnew_rs', 'headnew_beta', 'headnew_se', 'headnew_A1', 'headnew_A2'] +//chro_ps 0 --bp_ps 1 --rs_ps +allowed_posfilref=['poshead_chro_inforef', 'poshead_bp_inforef','poshead_rs_inforef'] +allowed_params+=allowed_header +allowed_params+=allowed_headnewernew + +if(params.file_gwas==""){ +error('params.file_gwas: file contains gwas not found') +} + +if(params.headnew_pval=="")params.headnew_pval=params.head_pval +if(params.headnew_freq=="")params.headnew_freq=params.head_freq +if(params.headnew_bp=="")params.headnew_bp=params.head_bp +if(params.headnew_chr=="")params.headnew_chr=params.head_chr +if(params.headnew_beta=="")params.headnew_beta=params.head_beta +if(params.headnew_se=="")params.headnew_se=params.head_se +if(params.headnew_A1=="")params.headnew_A1=params.head_A1 +if(params.headnew_A2=="")params.headnew_A2=params.head_A2 +//if(params.headnew_N=="")params.headnew_N=params.head_N + + +gwas_chrolist = Channel.fromPath(params.file_gwas) +gwas_chrolist_ext = Channel.fromPath(params.file_gwas) +gwas_chrolist_ext = Channel.fromPath(params.file_gwas) + + +getListeChro( + gwas_chrolist +).out.chrolist + +chrolist2=Channel.create() +chrolist.flatMap { list_str -> list_str.readLines()[0].split() }.set { chrolist2 } + +ExtractChroGWAS( + file(gwas) from gwas_chrolist_ext + each chro from chrolist2 +}.out.gwas_format_chro + +if(params.file_ref_gzip==""){ + error('params.file_ref_gzip : file contains information for rs notnot found') +} + +gwas_format_chro_rs=gwas_format_chro.combine(Channel.fromPath(params.file_ref_gzip)) + +ExtractRsIDChro( + gwas_format_chro_rs +).out.rsinfo_chro + +if(params.input_dir!="" || params.input_pat!=''){ +print("used plink file") +bed = Paths.get(params.input_dir,"${params.input_pat}.bed").toString().replaceFirst(/^az:/, "az:/").replaceFirst(/^s3:/, "s3:/") +bim = Paths.get(params.input_dir,"${params.input_pat}.bim").toString().replaceFirst(/^az:/, "az:/").replaceFirst(/^s3:/, "s3:/") +fam = Paths.get(params.input_dir,"${params.input_pat}.fam").toString().replaceFirst(/^az:/, "az:/").replaceFirst(/^s3:/, "s3:/") + + +}else{ + bed=file('${dummy_dir}/00') + bim=file('${dummy_dir}/01') + fam=file('${dummy_dir}/02') +} + +fileplk= Channel.create() +Channel + .from(file(bed),file(bim),file(fam)) + .buffer(size:3) + .map { a -> [a[0], a[1], a[2]] } + .set { fileplk } + +rsinfo_chroplk=rsinfo_chro.combine(fileplk) + +process MergeRsGwasChro{ + rsinfo_chroplk +).out.gwas_rsmerge + +gwas_rsmerge_all=gwas_rsmerge.collect() + +MergeAll( + gwas_rsmerge_all +}.out + + +