Skip to content

Commit

Permalink
Feature/gffquant stream 20230628 (#4)
Browse files Browse the repository at this point in the history
* version 0.8.1
* feature: added gffquant streaming
  • Loading branch information
cschu authored Oct 18, 2023
1 parent 7367036 commit 5fd3be8
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 10 deletions.
21 changes: 20 additions & 1 deletion main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,26 @@ workflow {

if (params.run_gffquant) {

gffquant_flow(nevermore_main.out.alignments)
if (params.gq_stream) {
gq_input_ch = nevermore_main.out.fastqs
.map { sample, fastqs ->
sample_id = sample.id.replaceAll(/.(orphans|singles|chimeras)$/, "")
return tuple(sample_id, [fastqs].flatten())
}
.groupTuple()
.map { sample_id, fastqs -> return tuple(sample_id, fastqs.flatten()) }
gq_input_ch.view()
//.groupTuple(sort: true)

} else {

gq_input_ch = nevermore_main.out.alignments

}

// gq_input_ch = ((params.gq_stream) ? nevermore_main.out.fastqs : never_main.out.alignments)
// .map { sample, files -> return tuple(sample.id, files) }
gffquant_flow(gq_input_ch)

}

Expand Down
67 changes: 67 additions & 0 deletions nevermore/modules/profilers/gffquant.nf
Original file line number Diff line number Diff line change
@@ -1,3 +1,70 @@
params.gq_aligner = "bwa_mem"

process stream_gffquant {
label "gffquant"
tag "gffquant.${sample}"

input:
tuple val(sample), path(fastqs)
path(gq_db)
path(reference)
output:
tuple val(sample), path("profiles/${sample}/*.txt.gz"), emit: results
tuple val(sample), path("logs/${sample}.log")

script:
def gq_output = "-o profiles/${sample}/${sample}"

def gq_params = "-m ${params.gq_mode} --ambig_mode ${params.gq_ambig_mode}"
gq_params += (params.gq_strand_specific) ? " --strand_specific" : ""
gq_params += (params.gq_min_seqlen) ? (" --min_seqlen " + params.gq_min_seqlen) : ""
gq_params += (params.gq_min_identity) ? (" --min_identity " + params.gq_min_identity) : ""
gq_params += (params.gq_restrict_metrics) ? " --restrict_metrics ${params.gq_restrict_metrics}" : ""
gq_params += (params.gq_keep_alignments) ? " --keep_alignment_file ${sample}.sam" : ""
gq_params += " -t ${task.cpus}"

if (params.gq_mode == "domain") {
gq_params += " --db_separator , --db_coordinates hmmer"
}

def input_files = ""
// we cannot auto-detect SE vs. PE-orphan!
if (params.gq_single_end_library) {
//input_files += "--singles \$(find . -maxdepth 1 -type l -name '*_R1.fastq.gz')"
input_files += "--fastq-singles ${fastqs}"
} else {
r1_files = fastqs.findAll( { it.name.endsWith("_R1.fastq.gz") && !it.name.matches("(.*)singles(.*)") } )
r2_files = fastqs.findAll( { it.name.endsWith("_R2.fastq.gz") } )
orphans = fastqs.findAll( { it.name.matches("(.*)singles(.*)") } )

if (r1_files.size() != 0) {
input_files += "--fastq-r1 ${r1_files.join(' ')}"
}
if (r2_files.size() != 0) {
input_files += " --fastq-r2 ${r2_files.join(' ')}"
}
if (orphans.size() != 0) {
input_files += " --fastq-orphans ${orphans.join(' ')}"
}

// input_files += "--fastq-r1 \$(find . -maxdepth 1 -type l -name '*_R1.fastq.gz' | grep -v singles)"
// input_files += " --fastq-r2 \$(find . -maxdepth 1 -type l -name '*_R2.fastq.gz')"
// input_files += " --fastq-orphans \$(find . -maxdepth 1 -type l -name '*singles*.fastq.gz')"
}

def gq_cmd = "gffquant ${gq_output} ${gq_params} --db GQ_DATABASE --reference \$(readlink ${reference}) --aligner ${params.gq_aligner} ${input_files}"

"""
set -e -o pipefail
mkdir -p logs/ tmp/ profiles/
echo 'Copying database...'
cp -v ${gq_db} GQ_DATABASE
${gq_cmd} &> logs/${sample}.log
rm -rfv GQ_DATABASE* tmp/
"""

}

process run_gffquant {
label "gffquant"

Expand Down
34 changes: 28 additions & 6 deletions nevermore/workflows/gffquant.nf
Original file line number Diff line number Diff line change
@@ -1,19 +1,41 @@
include { run_gffquant; collate_feature_counts } from "../modules/profilers/gffquant"
include { stream_gffquant; run_gffquant; collate_feature_counts } from "../modules/profilers/gffquant"

params.gq_collate_columns = "uniq_scaled,combined_scaled"


// workflow gffquant_stream {
// take:
// fastq_ch
// main:
// gq_stream_ch = fastq_ch
// .map {
// sample, files -> return tuple(sample.id, files)
// }
// stream_gffquant(gq_stream_ch, params.gffquant_db)
// emit:

// }


workflow gffquant_flow {

take:

bam_ch
input_ch

main:

run_gffquant(bam_ch, params.gffquant_db)

feature_count_ch = run_gffquant.out.results
if (params.gq_stream) {
stream_gffquant(input_ch, params.gffquant_db, params.reference)
feature_count_ch = stream_gffquant.out.results
counts = stream_gffquant.out.results
} else {
run_gffquant(input_ch, params.gffquant_db)
feature_count_ch = run_gffquant.out.results
counts = run_gffquant.out.results
}

feature_count_ch = feature_count_ch
.map { sample, files -> return files }
.flatten()
.filter { !it.name.endsWith("Counter.txt.gz") }
Expand All @@ -33,7 +55,7 @@ workflow gffquant_flow {

emit:

counts = run_gffquant.out.results
counts // = run_gffquant.out.results
collated = collate_feature_counts.out.collated

}
6 changes: 4 additions & 2 deletions nevermore/workflows/nevermore.nf
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ include { nevermore_align; nevermore_prep_align } from "./align"

def do_preprocessing = (!params.skip_preprocessing || params.run_preprocessing)
def do_alignment = params.run_gffquant || !params.skip_alignment
def do_stream = params.gq_stream

workflow nevermore_main {

Expand Down Expand Up @@ -60,6 +61,7 @@ workflow nevermore_main {

nevermore_prep_align(preprocessed_ch)
align_ch = Channel.empty()
collate_ch = Channel.empty()

if (do_preprocessing) {
collate_ch = nevermore_simple_preprocessing.out.raw_counts
Expand All @@ -70,9 +72,9 @@ workflow nevermore_main {
.map { sample, file -> return file }
.collect()
)
}
}

if (do_alignment) {
if (!do_stream && do_alignment) {
nevermore_align(nevermore_prep_align.out.fastqs)
align_ch = nevermore_align.out.alignments

Expand Down
2 changes: 1 addition & 1 deletion nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ manifest {
description = "Metagenomic functional profiling pipeline"
name = "nevermore"
nextflowVersion = ">=21.10.4"
version = "0.8"
version = "0.8.1"
}

0 comments on commit 5fd3be8

Please sign in to comment.