-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.nf
128 lines (99 loc) · 5.26 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env nextflow
/*
* A nextflow wrapper for running FluViewer
* -----------------------------------------
*/
nextflow.enable.dsl = 2
include { hash_files } from './modules/hash_files.nf'
include { pipeline_provenance } from './modules/provenance.nf'
include { collect_provenance } from './modules/provenance.nf'
include { fastp } from './modules/fastp.nf'
include { cutadapt} from './modules/cutadapt.nf'
include { normalize_depth } from './modules/fluviewer.nf'
include { fluviewer } from './modules/fluviewer.nf'
include { multiqc } from './modules/multiqc.nf'
include { fastqc } from './modules/fastqc.nf'
include { clade_calling } from './modules/clade_calling.nf'
include { snp_calling } from './modules/snp_calling.nf'
include { pull_genoflu } from './modules/genoflu.nf'
include { checkout_genoflu } from './modules/genoflu.nf'
include { genoflu } from './modules/genoflu.nf'
// prints to the screen and to the log
log.info """
FluViewer Pipeline
===================================
projectDir : ${projectDir}
launchDir : ${launchDir}
database : ${params.db}
primers : ${params.primers}
fastqInputDir : ${params.fastq_input}
outdir : ${params.outdir}
pipeline run name : ${workflow.runName}
pipeline version : ${workflow.manifest.version}
run_name : ${params.run_name}
user : ${workflow.userName}
Git repository : ${workflow.repository}
git commit id : ${workflow.commitId}
branch : ${workflow.revision}
""".stripIndent()
workflow {
ch_workflow_metadata = Channel.value([
workflow.sessionId,
workflow.runName,
workflow.manifest.name,
workflow.manifest.version,
workflow.start,
])
ch_pipeline_provenance = pipeline_provenance(ch_workflow_metadata)
ch_primers = Channel.fromPath(params.primer_path)
ch_db = Channel.fromPath(params.db)
if (params.samplesheet_input != 'NO_FILE') {
ch_fastq_input = Channel.fromPath(params.samplesheet_input).splitCsv(header: true).map{ it -> [it['ID'], it['R1'], it['R2']] }
} else {
ch_fastq_input = Channel.fromFilePairs( params.fastq_search_path, flat: true ).map{ it -> [it[0].split('_')[0], it[1], it[2]] }.unique{ it -> it[0] }
}
ch_reference_db = Channel.of([file(params.blastx_subtype_db).parent, file(params.blastx_subtype_db).name]).first()
main:
if (params.fastq_input == 'NO_FILE' && params.samplesheet_input == 'NO_FILE') {
error "ERROR: Missing required inputs. One of '--fastq_input' or '--samplesheet_input' must be provided."
}
if (params.db == 'NO_FILE') {
error "ERROR: Missing required inputs. '--db' parameter is mandatory."
}
// Provenance channel starts with just the sample IDs
// These will be joined to various provenance files as they are generated
ch_provenance = ch_fastq_input.map{ it -> it[0] }
// Generate hashes for input files
hash_files(ch_fastq_input.map{ it -> [it[0], [it[1], it[2]]] }.combine(Channel.of("fastq_input")))
// Clean up reads - remove adapters (fastp) and primers (cutadapt)
fastp(ch_fastq_input)
cutadapt(fastp.out.trimmed_reads.combine(ch_primers))
fastqc(cutadapt.out.primer_trimmed_reads)
normalize_depth(cutadapt.out.primer_trimmed_reads)
// Run FluViewer
fluviewer(normalize_depth.out.normalized_reads.combine(ch_db))
//Collect al the relevant filesfor multiqc
ch_fastqc_collected = fastqc.out.zip.map{ it -> [it[1], it[2]]}.collect()
multiqc(fastp.out.json.mix( cutadapt.out.log, ch_fastqc_collected ).collect().ifEmpty([]) )
//Call clades for H1 and H3 samples
clade_calling(fluviewer.out.ha_consensus_seq)
snp_calling(fluviewer.out.consensus_main, ch_reference_db)
pull_genoflu(params.genoflu_github_url)
checkout_genoflu(pull_genoflu.out.repo, params.genoflu_version)
genoflu(fluviewer.out.consensus_main.combine(pull_genoflu.out.repo))
//
// Provenance collection processes
// The basic idea is to build up a channel with the following structure:
// [sample_id, [provenance_file_1.yml, provenance_file_2.yml, provenance_file_3.yml...]]
// ...and then concatenate them all together in the 'collect_provenance' process.
ch_provenance = ch_provenance.combine(ch_pipeline_provenance).map{ it -> [it[0], [it[1]]] }
ch_provenance = ch_provenance.join(hash_files.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(fastp.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(cutadapt.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(normalize_depth.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(fluviewer.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(clade_calling.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(snp_calling.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(genoflu.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
collect_provenance(ch_provenance)
}