diff --git a/.test/config/config.yaml b/.test/config/config.yaml index 9744efa..d05047a 100644 --- a/.test/config/config.yaml +++ b/.test/config/config.yaml @@ -65,4 +65,4 @@ src_endpoint: '' dest_endpoint: '' # Globus path to transfer data to -dest_path: '' \ No newline at end of file +dest_path: '' diff --git a/test.sh b/test.sh new file mode 100755 index 0000000..a74d19c --- /dev/null +++ b/test.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +rm -r .test/test_data/*/_transfer* + +snakemake --directory .test \ + --use-conda \ + --conda-frontend mamba \ + --cores 2 \ + --config data_dir="$PWD/.test/test_data" diff --git a/workflow/rules/archive.smk b/workflow/rules/archive.smk index 21093fc..b65d3e2 100644 --- a/workflow/rules/archive.smk +++ b/workflow/rules/archive.smk @@ -3,7 +3,7 @@ rule calculate_checksums: [f"{data_dir}/{project}/{sample}" for project, sample in zip(projects, samples)], output: expand( - "{data_dir}/{{project}}/{transfer_dir}/checksums/{{project}}_{{sample}}_checksums.sha1", + "{data_dir}/{{project}}/{transfer_dir}_{{sample}}/checksums/{{project}}_{{sample}}_checksums.sha1", data_dir=data_dir, transfer_dir=transfer_dir, ), @@ -26,18 +26,18 @@ rule calculate_archive_checksums: get_outputs(file_types), output: expand( - "{data_dir}/{{project}}/{transfer_dir}/checksums/{{project}}_archives.sha1", + "{data_dir}/{{project}}/{transfer_dir}_{{sample}}/checksums/{{project}}_{{sample}}_archives.sha1", data_dir=data_dir, transfer_dir=transfer_dir, ), log: - "logs/{project}_archive_checksums.log", + "logs/{project}_{sample}_archive_checksums.log", threads: 1 params: data_dir=data_dir, shell: """ - cd {params.data_dir}/{wildcards.project} && + cd {params.data_dir}/{wildcards.project}/{wildcards.sample} && find . -type f -iname "*tar*" | xargs shasum -a 1 > {output} """ @@ -54,8 +54,8 @@ for project, sample in zip(projects, samples): input: f"{data_dir}/{project}/{sample}", output: - tar=f"{data_dir}/{project}/{transfer_dir}/{file_type}/{project}_{sample}_{file_type}_{state}.{ext}", - txt=f"{data_dir}/{project}/{transfer_dir}/{file_type}/{project}_{sample}_{file_type}_{state}_list.txt", + tar=f"{data_dir}/{project}/{transfer_dir}_{sample}/{file_type}/{project}_{sample}_{file_type}_{state}.{ext}", + txt=f"{data_dir}/{project}/{transfer_dir}_{sample}/{file_type}/{project}_{sample}_{file_type}_{state}_list.txt", log: f"logs/{project}_{sample}_{file_type}_{state}_tar.log", conda: @@ -89,12 +89,12 @@ rule tar_reports: [f"{data_dir}/{project}/{sample}" for project, sample in zip(projects, samples)], output: tar=expand( - "{data_dir}/{{project}}/{transfer_dir}/reports/{{project}}_{{sample}}_reports.tar.gz", + "{data_dir}/{{project}}/{transfer_dir}_{{sample}}/reports/{{project}}_{{sample}}_reports.tar.gz", data_dir=data_dir, transfer_dir=transfer_dir, ), txt=expand( - "{data_dir}/{{project}}/{transfer_dir}/reports/{{project}}_{{sample}}_reports_list.txt", + "{data_dir}/{{project}}/{transfer_dir}_{{sample}}/reports/{{project}}_{{sample}}_reports_list.txt", data_dir=data_dir, transfer_dir=transfer_dir, ), @@ -117,23 +117,20 @@ rule archive_complete: input: get_outputs(file_types), output: - f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt", + f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_file_counts.txt", log: - "logs/{project}_archive_complete.txt", + "logs/{project}_{sample}_archive_complete.txt", threads: 1 params: data_dir=data_dir, transfer_dir=transfer_dir, shell: """ - transfer_path={params.data_dir}/{wildcards.project}/{params.transfer_dir} - samples=`find {params.data_dir}/{wildcards.project} -maxdepth 1 -mindepth 1 -type d -exec basename {{}} \; | grep -v _transfer` - for sample in $samples; do - count_file_regex=`echo -e ".*\/{wildcards.project}_${{sample}}_[pod5|bam|fast|reports].*_list\.txt"` - count_files=`find $transfer_path -type f -regex $count_file_regex` - tar_count=`cat $count_files | grep -v "/$" | wc -l` - sys_file_count=`find {params.data_dir}/{wildcards.project}/$sample -type f | wc -l` - echo "$sample tar file counts: $tar_count" >> {output} - echo "$sample sys file counts: $sys_file_count" >> {output} - done + transfer_path={params.data_dir}/{wildcards.project}/{params.transfer_dir}_{wildcards.sample} + count_file_regex=`echo -e ".*/{wildcards.project}_{wildcards.sample}_[pod5|bam|fast|reports].*_list.txt"` + count_files=`find $transfer_path -type f -regex $count_file_regex` + tar_count=`cat $count_files | grep -v "/$" | wc -l` + sys_file_count=`find {params.data_dir}/{wildcards.project}/{wildcards.sample} -type f | wc -l` + echo "{sample} tar file counts: $tar_count" >> {output} + echo "{sample} sys file counts: $sys_file_count" >> {output} """ diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk index e710335..600fb4a 100644 --- a/workflow/rules/common.smk +++ b/workflow/rules/common.smk @@ -1,7 +1,6 @@ -import pandas as pd -import numpy as np import os import re +import sys from glob import iglob # variables @@ -54,6 +53,7 @@ if ignore_proj_regex and not extra_dirs: ) sys.exit() + # functions def get_project_dirs(data_dir, proj_dir_regex): """ @@ -85,7 +85,16 @@ def is_run_complete(sample_dir): return any(run_complete) -def is_processing_complete(project_dir_full): +def is_run_processing_complete(sample_dir): + """ + Checks whether sample has already been processed + through precense of processing.success file + """ + processing_complete_file = os.path.join(sample_dir, "processing.success") + return os.path.exists(processing_complete_file) + + +def is_project_processing_complete(project_dir_full): """ Checks whether run has already been archived; this is indicated by the presence of a file under @@ -139,8 +148,7 @@ project_dirs = list( filter(lambda project: project is not None and project != "", project_dirs) ) for proj_dir in project_dirs: - print(f"Found project directory {proj_dir}.", file=sys.stderr) -projects_with_incomplete_runs = [] + print(f"Found project directory {proj_dir}.", file=sys.stdout) projects, samples = [], [] @@ -149,11 +157,11 @@ for project in project_dirs: if not os.path.exists(project_dir_full): print( f"Project directory {project} does not exist; skipping.", - file=sys.stderr, + file=sys.stdout, ) continue - if is_processing_complete(project_dir_full): + if is_project_processing_complete(project_dir_full): print( f"Processing of project {project} already complete; skipping.", file=sys.stdout, @@ -168,24 +176,29 @@ for project in project_dirs: # add both projects and sample to keep their association together for sample in samples_in_project: sample_dir = os.path.join(project_dir_full, sample) - if not check_if_complete or is_run_complete(sample_dir): + if is_run_processing_complete(sample_dir): print( - f"Found {sample} in project {project} for processing.", file=sys.stderr + f"Processing of {sample} in project {project} already complete.", + file=sys.stdout, + ) + elif not check_if_complete or is_run_complete(sample_dir): + print( + f"Found {sample} in project {project} for processing.", + file=sys.stdout, ) projects.append(project) samples.append(sample) elif check_if_complete and not is_run_complete(sample_dir): print( f"Skipping {sample} in project {project} (run incomplete).", - file=sys.stderr, + file=sys.stdout, ) - projects_with_incomplete_runs.append(project) # input/output functions def get_checksum_outputs(): checksum_outputs = [ - f"{data_dir}/{project}/{transfer_dir}/checksums/{project}_{sample}_checksums.sha1" + f"{data_dir}/{project}/{transfer_dir}_{sample}/checksums/{project}_{sample}_checksums.sha1" for project, sample in zip(projects, samples) ] return checksum_outputs @@ -195,10 +208,10 @@ def get_report_outputs(): report_outputs = [] for project, sample in zip(projects, samples): report_outputs.append( - f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports.tar.gz" + f"{data_dir}/{project}/{transfer_dir}_{sample}/reports/{project}_{sample}_reports.tar.gz" ) report_outputs.append( - f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports_list.txt" + f"{data_dir}/{project}/{transfer_dir}_{sample}/reports/{project}_{sample}_reports_list.txt" ) return report_outputs @@ -211,7 +224,7 @@ def get_output_by_type(filetype): files_under_sample = [ os.path.basename(f) for f in iglob(f"{data_dir}/{project}/{sample}/*/*") ] - out_prefix = f"{data_dir}/{project}/{transfer_dir}/{filetype}/{project}_{sample}_{filetype}" + out_prefix = f"{data_dir}/{project}/{transfer_dir}_{sample}/{filetype}/{project}_{sample}_{filetype}" for state in STATES: if f"{filetype}_{state}" in files_under_sample: outputs.append(f"{out_prefix}_{state}.{file_extension}") @@ -238,18 +251,16 @@ def get_outputs(file_types): def get_final_checksum_outputs(): - final_checksum_outputs = expand( - "{data_dir}/{project}/{transfer_dir}/checksums/{project}_archives.sha1", - data_dir=data_dir, - project=np.unique(projects), - transfer_dir=transfer_dir, - ) + final_checksum_outputs = [ + f"{data_dir}/{project}/{transfer_dir}_{sample}/checksums/{project}_{sample}_archives.sha1" + for project, sample in zip(projects, samples) + ] return final_checksum_outputs def get_validate_reports_outputs(): validate_reports_outputs = [ - f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports_list.txt" + f"{data_dir}/{project}/{transfer_dir}_{sample}/reports/{project}_{sample}_reports_list.txt" for project, sample in zip(projects, samples) ] return validate_reports_outputs @@ -257,9 +268,8 @@ def get_validate_reports_outputs(): def get_archive_complete_outputs(): archive_complete_outputs = [ - f"{data_dir}/{project}/{transfer_dir}/logs/{project}_file_counts.txt" - for project in np.unique(projects) - if project not in projects_with_incomplete_runs + f"{data_dir}/{project}/{transfer_dir}_{sample}/logs/{project}_{sample}_file_counts.txt" + for project, sample in zip(projects, samples) ] return archive_complete_outputs @@ -267,9 +277,8 @@ def get_archive_complete_outputs(): def get_transfer_outputs(): if transfer: transfer_outputs = [ - f"{data_dir}/{project}/{transfer_dir}/logs/{project}_transfer.txt" - for project in np.unique(projects) - if project not in projects_with_incomplete_runs + f"{data_dir}/{project}/{transfer_dir}_{sample}/logs/{project}_{sample}_transfer.txt" + for project, sample in zip(projects, samples) ] return transfer_outputs else: diff --git a/workflow/rules/transfer.smk b/workflow/rules/transfer.smk index 555dfa4..b2fdafc 100644 --- a/workflow/rules/transfer.smk +++ b/workflow/rules/transfer.smk @@ -3,11 +3,11 @@ if delete_on_transfer: # NOTE: this step uses a Globus data flow, so first we need to create the json file rule create_globus_json_input: input: - f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt", + f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_file_counts.txt", output: - f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_globus_input.json", + f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_globus_input.json", log: - "logs/{project}_create_globus_json.log", + "logs/{project}_{sample}_create_globus_json.log", conda: "../envs/python.yaml" threads: 1 @@ -18,12 +18,12 @@ if delete_on_transfer: # will be successful. Check the Globus dashboard for the status of the transfer. rule transfer: input: - f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_globus_input.json", + f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_globus_input.json", output: - transfer_file=f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt", - complete_file=f"{data_dir}/{{project}}/processing.success", + transfer_file=f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_transfer.txt", + complete_file=f"{data_dir}/{{project}}/{{sample}}/processing.success", log: - "logs/{project}_transfer.log", + "logs/{project}_{sample}_transfer.log", conda: "../envs/globus_automate.yaml" threads: 1 @@ -39,18 +39,17 @@ if delete_on_transfer: touch {output.complete_file} """ - else: # NOTE: this step will only invoke the transfer but there is no guarantee that it # will be successful. Check the Globus dashboard for the status of the transfer. rule transfer: input: - f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt", + f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_file_counts.txt", output: - f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt", + f"{data_dir}/{{project}}/{transfer_dir}_{{sample}}/logs/{{project}}_{{sample}}_transfer.txt", log: - "logs/{project}_transfer.log", + "logs/{project}_{sample}_transfer.log", conda: "../envs/globus.yaml" threads: 1 diff --git a/workflow/scripts/create_globus_json_input.py b/workflow/scripts/create_globus_json_input.py index ad5dfce..2831028 100644 --- a/workflow/scripts/create_globus_json_input.py +++ b/workflow/scripts/create_globus_json_input.py @@ -14,7 +14,8 @@ data_dir = snakemake.config["data_dir"] transfer_dir = snakemake.config["transfer_dir"] project = snakemake.wildcards.project -src_path = f"{data_dir}/{project}/{transfer_dir}" +sample = snakemake.wildcards.sample +src_path = f"{data_dir}/{project}/{transfer_dir}_{sample}" dest_path = os.path.join(snakemake.config["dest_path"], project) @@ -32,4 +33,4 @@ } with open(snakemake.output[0], "w") as f: - json.dump(input, f, indent=2) \ No newline at end of file + json.dump(input, f, indent=2)