Skip to content

Commit

Permalink
Process runs on per-sample basis
Browse files Browse the repository at this point in the history
Skip sample if processing.success present
  • Loading branch information
mcmero committed Apr 22, 2024
1 parent b37f1b5 commit 1eaf159
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .test/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ src_endpoint: ''
dest_endpoint: ''

# Globus path to transfer data to
dest_path: ''
dest_path: ''
9 changes: 9 additions & 0 deletions test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

rm -r .test/test_data/*/_transfer*

snakemake --directory .test \
--use-conda \
--conda-frontend mamba \
--cores 2 \
--config data_dir="$PWD/.test/test_data"
37 changes: 17 additions & 20 deletions workflow/rules/archive.smk
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ rule calculate_checksums:
[f"{data_dir}/{project}/{sample}" for project, sample in zip(projects, samples)],
output:
expand(
"{data_dir}/{{project}}/{transfer_dir}/checksums/{{project}}_{{sample}}_checksums.sha1",
"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/checksums/{{project}}_{{sample}}_checksums.sha1",
data_dir=data_dir,
transfer_dir=transfer_dir,
),
Expand All @@ -26,18 +26,18 @@ rule calculate_archive_checksums:
get_outputs(file_types),
output:
expand(
"{data_dir}/{{project}}/{transfer_dir}/checksums/{{project}}_archives.sha1",
"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/checksums/{{project}}_{{sample}}_archives.sha1",
data_dir=data_dir,
transfer_dir=transfer_dir,
),
log:
"logs/{project}_archive_checksums.log",
"logs/{project}_{sample}_archive_checksums.log",
threads: 1
params:
data_dir=data_dir,
shell:
"""
cd {params.data_dir}/{wildcards.project} &&
cd {params.data_dir}/{wildcards.project}/{wildcards.sample} &&
find . -type f -iname "*tar*" | xargs shasum -a 1 > {output}
"""

Expand All @@ -54,8 +54,8 @@ for project, sample in zip(projects, samples):
input:
f"{data_dir}/{project}/{sample}",
output:
tar=f"{data_dir}/{project}/{transfer_dir}/{file_type}/{project}_{sample}_{file_type}_{state}.{ext}",
txt=f"{data_dir}/{project}/{transfer_dir}/{file_type}/{project}_{sample}_{file_type}_{state}_list.txt",
tar=f"{data_dir}/{project}/{transfer_dir}_{sample}/{file_type}/{project}_{sample}_{file_type}_{state}.{ext}",
txt=f"{data_dir}/{project}/{transfer_dir}_{sample}/{file_type}/{project}_{sample}_{file_type}_{state}_list.txt",
log:
f"logs/{project}_{sample}_{file_type}_{state}_tar.log",
conda:
Expand Down Expand Up @@ -89,12 +89,12 @@ rule tar_reports:
[f"{data_dir}/{project}/{sample}" for project, sample in zip(projects, samples)],
output:
tar=expand(
"{data_dir}/{{project}}/{transfer_dir}/reports/{{project}}_{{sample}}_reports.tar.gz",
"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/reports/{{project}}_{{sample}}_reports.tar.gz",
data_dir=data_dir,
transfer_dir=transfer_dir,
),
txt=expand(
"{data_dir}/{{project}}/{transfer_dir}/reports/{{project}}_{{sample}}_reports_list.txt",
"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/reports/{{project}}_{{sample}}_reports_list.txt",
data_dir=data_dir,
transfer_dir=transfer_dir,
),
Expand All @@ -117,23 +117,20 @@ rule archive_complete:
input:
get_outputs(file_types),
output:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt",
f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_file_counts.txt",
log:
"logs/{project}_archive_complete.txt",
"logs/{project}_{sample}_archive_complete.txt",
threads: 1
params:
data_dir=data_dir,
transfer_dir=transfer_dir,
shell:
"""
transfer_path={params.data_dir}/{wildcards.project}/{params.transfer_dir}
samples=`find {params.data_dir}/{wildcards.project} -maxdepth 1 -mindepth 1 -type d -exec basename {{}} \; | grep -v _transfer`
for sample in $samples; do
count_file_regex=`echo -e ".*\/{wildcards.project}_${{sample}}_[pod5|bam|fast|reports].*_list\.txt"`
count_files=`find $transfer_path -type f -regex $count_file_regex`
tar_count=`cat $count_files | grep -v "/$" | wc -l`
sys_file_count=`find {params.data_dir}/{wildcards.project}/$sample -type f | wc -l`
echo "$sample tar file counts: $tar_count" >> {output}
echo "$sample sys file counts: $sys_file_count" >> {output}
done
transfer_path={params.data_dir}/{wildcards.project}/{params.transfer_dir}_{wildcards.sample}
count_file_regex=`echo -e ".*/{wildcards.project}_{wildcards.sample}_[pod5|bam|fast|reports].*_list.txt"`
count_files=`find $transfer_path -type f -regex $count_file_regex`
tar_count=`cat $count_files | grep -v "/$" | wc -l`
sys_file_count=`find {params.data_dir}/{wildcards.project}/{wildcards.sample} -type f | wc -l`
echo "{sample} tar file counts: $tar_count" >> {output}
echo "{sample} sys file counts: $sys_file_count" >> {output}
"""
65 changes: 37 additions & 28 deletions workflow/rules/common.smk
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pandas as pd
import numpy as np
import os
import re
import sys
from glob import iglob

# variables
Expand Down Expand Up @@ -54,6 +53,7 @@ if ignore_proj_regex and not extra_dirs:
)
sys.exit()


# functions
def get_project_dirs(data_dir, proj_dir_regex):
"""
Expand Down Expand Up @@ -85,7 +85,16 @@ def is_run_complete(sample_dir):
return any(run_complete)


def is_processing_complete(project_dir_full):
def is_run_processing_complete(sample_dir):
"""
Checks whether sample has already been processed
through precense of processing.success file
"""
processing_complete_file = os.path.join(sample_dir, "processing.success")
return os.path.exists(processing_complete_file)


def is_project_processing_complete(project_dir_full):
"""
Checks whether run has already been archived;
this is indicated by the presence of a file under
Expand Down Expand Up @@ -139,8 +148,7 @@ project_dirs = list(
filter(lambda project: project is not None and project != "", project_dirs)
)
for proj_dir in project_dirs:
print(f"Found project directory {proj_dir}.", file=sys.stderr)
projects_with_incomplete_runs = []
print(f"Found project directory {proj_dir}.", file=sys.stdout)


projects, samples = [], []
Expand All @@ -149,11 +157,11 @@ for project in project_dirs:
if not os.path.exists(project_dir_full):
print(
f"Project directory {project} does not exist; skipping.",
file=sys.stderr,
file=sys.stdout,
)
continue

if is_processing_complete(project_dir_full):
if is_project_processing_complete(project_dir_full):
print(
f"Processing of project {project} already complete; skipping.",
file=sys.stdout,
Expand All @@ -168,24 +176,29 @@ for project in project_dirs:
# add both projects and sample to keep their association together
for sample in samples_in_project:
sample_dir = os.path.join(project_dir_full, sample)
if not check_if_complete or is_run_complete(sample_dir):
if is_run_processing_complete(sample_dir):
print(
f"Found {sample} in project {project} for processing.", file=sys.stderr
f"Processing of {sample} in project {project} already complete.",
file=sys.stdout,
)
elif not check_if_complete or is_run_complete(sample_dir):
print(
f"Found {sample} in project {project} for processing.",
file=sys.stdout,
)
projects.append(project)
samples.append(sample)
elif check_if_complete and not is_run_complete(sample_dir):
print(
f"Skipping {sample} in project {project} (run incomplete).",
file=sys.stderr,
file=sys.stdout,
)
projects_with_incomplete_runs.append(project)


# input/output functions
def get_checksum_outputs():
checksum_outputs = [
f"{data_dir}/{project}/{transfer_dir}/checksums/{project}_{sample}_checksums.sha1"
f"{data_dir}/{project}/{transfer_dir}_{sample}/checksums/{project}_{sample}_checksums.sha1"
for project, sample in zip(projects, samples)
]
return checksum_outputs
Expand All @@ -195,10 +208,10 @@ def get_report_outputs():
report_outputs = []
for project, sample in zip(projects, samples):
report_outputs.append(
f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports.tar.gz"
f"{data_dir}/{project}/{transfer_dir}_{sample}/reports/{project}_{sample}_reports.tar.gz"
)
report_outputs.append(
f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports_list.txt"
f"{data_dir}/{project}/{transfer_dir}_{sample}/reports/{project}_{sample}_reports_list.txt"
)
return report_outputs

Expand All @@ -211,7 +224,7 @@ def get_output_by_type(filetype):
files_under_sample = [
os.path.basename(f) for f in iglob(f"{data_dir}/{project}/{sample}/*/*")
]
out_prefix = f"{data_dir}/{project}/{transfer_dir}/{filetype}/{project}_{sample}_{filetype}"
out_prefix = f"{data_dir}/{project}/{transfer_dir}_{sample}/{filetype}/{project}_{sample}_{filetype}"
for state in STATES:
if f"{filetype}_{state}" in files_under_sample:
outputs.append(f"{out_prefix}_{state}.{file_extension}")
Expand All @@ -238,38 +251,34 @@ def get_outputs(file_types):


def get_final_checksum_outputs():
final_checksum_outputs = expand(
"{data_dir}/{project}/{transfer_dir}/checksums/{project}_archives.sha1",
data_dir=data_dir,
project=np.unique(projects),
transfer_dir=transfer_dir,
)
final_checksum_outputs = [
f"{data_dir}/{project}/{transfer_dir}_{sample}/checksums/{project}_{sample}_archives.sha1"
for project, sample in zip(projects, samples)
]
return final_checksum_outputs


def get_validate_reports_outputs():
validate_reports_outputs = [
f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports_list.txt"
f"{data_dir}/{project}/{transfer_dir}_{sample}/reports/{project}_{sample}_reports_list.txt"
for project, sample in zip(projects, samples)
]
return validate_reports_outputs


def get_archive_complete_outputs():
archive_complete_outputs = [
f"{data_dir}/{project}/{transfer_dir}/logs/{project}_file_counts.txt"
for project in np.unique(projects)
if project not in projects_with_incomplete_runs
f"{data_dir}/{project}/{transfer_dir}_{sample}/logs/{project}_{sample}_file_counts.txt"
for project, sample in zip(projects, samples)
]
return archive_complete_outputs


def get_transfer_outputs():
if transfer:
transfer_outputs = [
f"{data_dir}/{project}/{transfer_dir}/logs/{project}_transfer.txt"
for project in np.unique(projects)
if project not in projects_with_incomplete_runs
f"{data_dir}/{project}/{transfer_dir}_{sample}/logs/{project}_{sample}_transfer.txt"
for project, sample in zip(projects, samples)
]
return transfer_outputs
else:
Expand Down
21 changes: 10 additions & 11 deletions workflow/rules/transfer.smk
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ if delete_on_transfer:
# NOTE: this step uses a Globus data flow, so first we need to create the json file
rule create_globus_json_input:
input:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt",
f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_file_counts.txt",
output:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_globus_input.json",
f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_globus_input.json",
log:
"logs/{project}_create_globus_json.log",
"logs/{project}_{sample}_create_globus_json.log",
conda:
"../envs/python.yaml"
threads: 1
Expand All @@ -18,12 +18,12 @@ if delete_on_transfer:
# will be successful. Check the Globus dashboard for the status of the transfer.
rule transfer:
input:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_globus_input.json",
f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_globus_input.json",
output:
transfer_file=f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt",
complete_file=f"{data_dir}/{{project}}/processing.success",
transfer_file=f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_transfer.txt",
complete_file=f"{data_dir}/{{project}}/{{sample}}/processing.success",
log:
"logs/{project}_transfer.log",
"logs/{project}_{sample}_transfer.log",
conda:
"../envs/globus_automate.yaml"
threads: 1
Expand All @@ -39,18 +39,17 @@ if delete_on_transfer:
touch {output.complete_file}
"""


else:

# NOTE: this step will only invoke the transfer but there is no guarantee that it
# will be successful. Check the Globus dashboard for the status of the transfer.
rule transfer:
input:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt",
f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_file_counts.txt",
output:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt",
f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_transfer.txt",
log:
"logs/{project}_transfer.log",
"logs/{project}_{sample}_transfer.log",
conda:
"../envs/globus.yaml"
threads: 1
Expand Down
5 changes: 3 additions & 2 deletions workflow/scripts/create_globus_json_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
data_dir = snakemake.config["data_dir"]
transfer_dir = snakemake.config["transfer_dir"]
project = snakemake.wildcards.project
src_path = f"{data_dir}/{project}/{transfer_dir}"
sample = snakemake.wildcards.sample
src_path = f"{data_dir}/{project}/{transfer_dir}_{sample}"

dest_path = os.path.join(snakemake.config["dest_path"], project)

Expand All @@ -32,4 +33,4 @@
}

with open(snakemake.output[0], "w") as f:
json.dump(input, f, indent=2)
json.dump(input, f, indent=2)

0 comments on commit 1eaf159

Please sign in to comment.