diff --git a/bio2zarr/cli.py b/bio2zarr/cli.py index b088bdf..1eafbe7 100644 --- a/bio2zarr/cli.py +++ b/bio2zarr/cli.py @@ -233,7 +233,7 @@ def dexplode_partition(icf_path, partition, verbose): from 0 (inclusive) to the number of paritions returned by dexplode_init (exclusive). """ setup_logging(verbose) - vcf.explode_partition(icf_path, partition, show_progress=False) + vcf.explode_partition(icf_path, partition) @click.command diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index 1961cc8..a2a090f 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -1,7 +1,6 @@ import collections import contextlib import dataclasses -import functools import json import logging import math @@ -145,29 +144,41 @@ class VcfPartition: num_records: int = -1 -ICF_METADATA_FORMAT_VERSION = "0.2" +ICF_METADATA_FORMAT_VERSION = "0.3" ICF_DEFAULT_COMPRESSOR = numcodecs.Blosc( cname="zstd", clevel=7, shuffle=numcodecs.Blosc.NOSHUFFLE ) -# TODO refactor this to have embedded Contig dataclass, Filters -# and Samples dataclasses to allow for more information to be -# retained and forward compatibility. + +@dataclasses.dataclass +class Contig: + id: str + length: int = None + + +@dataclasses.dataclass +class Sample: + id: str + + +@dataclasses.dataclass +class Filter: + id: str + description: str = "" @dataclasses.dataclass class IcfMetadata: samples: list - contig_names: list - contig_record_counts: dict + contigs: list filters: list fields: list partitions: list = None - contig_lengths: list = None format_version: str = None compressor: dict = None column_chunk_size: int = None provenance: dict = None + num_records: int = -1 @property def info_fields(self): @@ -187,16 +198,12 @@ def format_fields(self): @property def num_contigs(self): - return len(self.contig_names) + return len(self.contigs) @property def num_filters(self): return len(self.filters) - @property - def num_records(self): - return sum(self.contig_record_counts.values()) - @staticmethod def fromdict(d): if d["format_version"] != ICF_METADATA_FORMAT_VERSION: @@ -204,18 +211,23 @@ def fromdict(d): "Intermediate columnar metadata format version mismatch: " f"{d['format_version']} != {ICF_METADATA_FORMAT_VERSION}" ) - fields = [VcfField.fromdict(fd) for fd in d["fields"]] partitions = [VcfPartition(**pd) for pd in d["partitions"]] for p in partitions: p.region = vcf_utils.Region(**p.region) d = d.copy() - d["fields"] = fields d["partitions"] = partitions + d["fields"] = [VcfField.fromdict(fd) for fd in d["fields"]] + d["samples"] = [Sample(**sd) for sd in d["samples"]] + d["filters"] = [Filter(**fd) for fd in d["filters"]] + d["contigs"] = [Contig(**cd) for cd in d["contigs"]] return IcfMetadata(**d) def asdict(self): return dataclasses.asdict(self) + def asjson(self): + return json.dumps(self.asdict(), indent=4) + def fixed_vcf_field_definitions(): def make_field_def(name, vcf_type, vcf_number): @@ -243,15 +255,22 @@ def make_field_def(name, vcf_type, vcf_number): def scan_vcf(path, target_num_partitions): with vcf_utils.IndexedVcf(path) as indexed_vcf: vcf = indexed_vcf.vcf - filters = [ - h["ID"] - for h in vcf.header_iter() - if h["HeaderType"] == "FILTER" and isinstance(h["ID"], str) - ] + filters = [] + pass_index = -1 + for h in vcf.header_iter(): + if h["HeaderType"] == "FILTER" and isinstance(h["ID"], str): + try: + description = h["Description"].strip('"') + except KeyError: + description = "" + if h["ID"] == "PASS": + pass_index = len(filters) + filters.append(Filter(h["ID"], description)) + # Ensure PASS is the first filter if present - if "PASS" in filters: - filters.remove("PASS") - filters.insert(0, "PASS") + if pass_index > 0: + pass_filter = filters.pop(pass_index) + filters.insert(0, pass_filter) fields = fixed_vcf_field_definitions() for h in vcf.header_iter(): @@ -262,18 +281,22 @@ def scan_vcf(path, target_num_partitions): field.vcf_number = "." fields.append(field) + try: + contig_lengths = vcf.seqlens + except AttributeError: + contig_lengths = [None for _ in vcf.seqnames] + metadata = IcfMetadata( - samples=vcf.samples, - contig_names=vcf.seqnames, - contig_record_counts=indexed_vcf.contig_record_counts(), + samples=[Sample(sample_id) for sample_id in vcf.samples], + contigs=[ + Contig(contig_id, length) + for contig_id, length in zip(vcf.seqnames, contig_lengths) + ], filters=filters, fields=fields, partitions=[], + num_records=sum(indexed_vcf.contig_record_counts().values()), ) - try: - metadata.contig_lengths = vcf.seqlens - except AttributeError: - pass regions = indexed_vcf.partition_into_regions(num_parts=target_num_partitions) logger.info( @@ -292,22 +315,6 @@ def scan_vcf(path, target_num_partitions): return metadata, vcf.raw_header -def check_overlap(partitions): - for i in range(1, len(partitions)): - prev_region = partitions[i - 1].region - current_region = partitions[i].region - if prev_region.contig == current_region.contig: - if prev_region.end is None: - logger.warning("Cannot check overlaps; issue #146") - continue - if prev_region.end > current_region.start: - raise ValueError( - f"Multiple VCFs have the region " - f"{prev_region.contig}:{prev_region.start}-" - f"{current_region.end}" - ) - - def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1): logger.info( f"Scanning {len(paths)} VCFs attempting to split into {target_num_partitions}" @@ -336,27 +343,30 @@ def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1): # We just take the first header, assuming the others # are compatible. all_partitions = [] - contig_record_counts = collections.Counter() + total_records = 0 for metadata, _ in results: - all_partitions.extend(metadata.partitions) - metadata.partitions.clear() - contig_record_counts += metadata.contig_record_counts - metadata.contig_record_counts.clear() + for partition in metadata.partitions: + logger.debug(f"Scanned partition {partition}") + all_partitions.append(partition) + total_records += metadata.num_records + metadata.num_records = 0 + metadata.partitions = [] icf_metadata, header = results[0] for metadata, _ in results[1:]: if metadata != icf_metadata: raise ValueError("Incompatible VCF chunks") - icf_metadata.contig_record_counts = dict(contig_record_counts) + # Note: this will be infinity here if any of the chunks has an index + # that doesn't keep track of the number of records per-contig + icf_metadata.num_records = total_records # Sort by contig (in the order they appear in the header) first, # then by start coordinate - contig_index_map = {contig: j for j, contig in enumerate(metadata.contig_names)} + contig_index_map = {contig.id: j for j, contig in enumerate(metadata.contigs)} all_partitions.sort( key=lambda x: (contig_index_map[x.region.contig], x.region.start) ) - check_overlap(all_partitions) icf_metadata.partitions = all_partitions logger.info(f"Scan complete, resulting in {len(all_partitions)} partitions.") return icf_metadata, header @@ -853,19 +863,18 @@ def __init__(self, path): self.metadata = IcfMetadata.fromdict(json.load(f)) with open(self.path / "header.txt") as f: self.vcf_header = f.read() - self.compressor = numcodecs.get_codec(self.metadata.compressor) - self.columns = {} + self.fields = {} partition_num_records = [ partition.num_records for partition in self.metadata.partitions ] # Allow us to find which partition a given record is in self.partition_record_index = np.cumsum([0, *partition_num_records]) for field in self.metadata.fields: - self.columns[field.full_name] = IntermediateColumnarFormatField(self, field) + self.fields[field.full_name] = IntermediateColumnarFormatField(self, field) logger.info( f"Loaded IntermediateColumnarFormat(partitions={self.num_partitions}, " - f"records={self.num_records}, columns={self.num_columns})" + f"records={self.num_records}, fields={self.num_fields})" ) def __repr__(self): @@ -876,17 +885,17 @@ def __repr__(self): ) def __getitem__(self, key): - return self.columns[key] + return self.fields[key] def __iter__(self): - return iter(self.columns) + return iter(self.fields) def __len__(self): - return len(self.columns) + return len(self.fields) def summary_table(self): data = [] - for name, col in self.columns.items(): + for name, col in self.fields.items(): summary = col.vcf_field.summary d = { "name": name, @@ -902,9 +911,9 @@ def summary_table(self): data.append(d) return data - @functools.cached_property + @property def num_records(self): - return sum(self.metadata.contig_record_counts.values()) + return self.metadata.num_records @property def num_partitions(self): @@ -915,8 +924,42 @@ def num_samples(self): return len(self.metadata.samples) @property - def num_columns(self): - return len(self.columns) + def num_fields(self): + return len(self.fields) + + +@dataclasses.dataclass +class IcfPartitionMetadata: + num_records: int + last_position: int + field_summaries: dict + + def asdict(self): + return dataclasses.asdict(self) + + def asjson(self): + return json.dumps(self.asdict(), indent=4) + + @staticmethod + def fromdict(d): + md = IcfPartitionMetadata(**d) + for k, v in md.field_summaries.items(): + md.field_summaries[k] = VcfFieldSummary.fromdict(v) + return md + + +def check_overlapping_partitions(partitions): + for i in range(1, len(partitions)): + prev_region = partitions[i - 1].region + current_region = partitions[i].region + if prev_region.contig == current_region.contig: + assert prev_region.end is not None + # Regions are *inclusive* + if prev_region.end >= current_region.start: + raise ValueError( + f"Overlapping VCF regions in partitions {i - 1} and {i}: " + f"{prev_region} and {current_region}" + ) class IntermediateColumnarFormatWriter: @@ -990,11 +1033,8 @@ def load_partition_summaries(self): not_found = [] for j in range(self.num_partitions): try: - with open(self.wip_path / f"p{j}_summary.json") as f: - summary = json.load(f) - for k, v in summary["field_summaries"].items(): - summary["field_summaries"][k] = VcfFieldSummary.fromdict(v) - summaries.append(summary) + with open(self.wip_path / f"p{j}.json") as f: + summaries.append(IcfPartitionMetadata.fromdict(json.load(f))) except FileNotFoundError: not_found.append(j) if len(not_found) > 0: @@ -1011,7 +1051,7 @@ def load_metadata(self): def process_partition(self, partition_index): self.load_metadata() - summary_path = self.wip_path / f"p{partition_index}_summary.json" + summary_path = self.wip_path / f"p{partition_index}.json" # If someone is rewriting a summary path (for whatever reason), make sure it # doesn't look like it's already been completed. # NOTE to do this properly we probably need to take a lock on this file - but @@ -1032,6 +1072,7 @@ def process_partition(self, partition_index): else: format_fields.append(field) + last_position = None with IcfPartitionWriter( self.metadata, self.path, @@ -1041,6 +1082,7 @@ def process_partition(self, partition_index): num_records = 0 for variant in ivcf.variants(partition.region): num_records += 1 + last_position = variant.POS tcw.append("CHROM", variant.CHROM) tcw.append("POS", variant.POS) tcw.append("QUAL", variant.QUAL) @@ -1065,37 +1107,32 @@ def process_partition(self, partition_index): f"flushing buffers" ) - partition_metadata = { - "num_records": num_records, - "field_summaries": {k: v.asdict() for k, v in tcw.field_summaries.items()}, - } + partition_metadata = IcfPartitionMetadata( + num_records=num_records, + last_position=last_position, + field_summaries=tcw.field_summaries, + ) with open(summary_path, "w") as f: - json.dump(partition_metadata, f, indent=4) + f.write(partition_metadata.asjson()) logger.info( - f"Finish p{partition_index} {partition.vcf_path}__{partition.region}=" - f"{num_records} records" + f"Finish p{partition_index} {partition.vcf_path}__{partition.region} " + f"{num_records} records last_pos={last_position}" ) - def process_partition_slice( - self, - start, - stop, - *, - worker_processes=1, - show_progress=False, - ): + def explode(self, *, worker_processes=1, show_progress=False): self.load_metadata() - if start == 0 and stop == self.num_partitions: - num_records = self.metadata.num_records - else: - # We only know the number of records if all partitions are done at once, - # and we signal this to tqdm by passing None as the total. + num_records = self.metadata.num_records + if np.isinf(num_records): + logger.warning( + "Total records unknown, cannot show progress; " + "reindex VCFs with bcftools index to fix" + ) num_records = None - num_columns = len(self.metadata.fields) + num_fields = len(self.metadata.fields) num_samples = len(self.metadata.samples) logger.info( - f"Exploding columns={num_columns} samples={num_samples}; " - f"partitions={stop - start} " + f"Exploding fields={num_fields} samples={num_samples}; " + f"partitions={self.num_partitions} " f"variants={'unknown' if num_records is None else num_records}" ) progress_config = core.ProgressConfig( @@ -1105,48 +1142,43 @@ def process_partition_slice( show=show_progress, ) with core.ParallelWorkManager(worker_processes, progress_config) as pwm: - for j in range(start, stop): + for j in range(self.num_partitions): pwm.submit(self.process_partition, j) - def explode(self, *, worker_processes=1, show_progress=False): - self.load_metadata() - return self.process_partition_slice( - 0, - self.num_partitions, - worker_processes=worker_processes, - show_progress=show_progress, - ) - - def explode_partition(self, partition, *, show_progress=False, worker_processes=1): + def explode_partition(self, partition): self.load_metadata() if partition < 0 or partition >= self.num_partitions: raise ValueError( "Partition index must be in the range 0 <= index < num_partitions" ) - return self.process_partition_slice( - partition, - partition + 1, - worker_processes=worker_processes, - show_progress=show_progress, - ) + self.process_partition(partition) def finalise(self): self.load_metadata() partition_summaries = self.load_partition_summaries() total_records = 0 for index, summary in enumerate(partition_summaries): - partition_records = summary["num_records"] + partition_records = summary.num_records self.metadata.partitions[index].num_records = partition_records + self.metadata.partitions[index].region.end = summary.last_position total_records += partition_records - assert total_records == self.metadata.num_records + if not np.isinf(self.metadata.num_records): + # Note: this is just telling us that there's a bug in the + # index based record counting code, but it doesn't actually + # matter much. We may want to just make this a warning if + # we hit regular problems. + assert total_records == self.metadata.num_records + self.metadata.num_records = total_records + + check_overlapping_partitions(self.metadata.partitions) for field in self.metadata.fields: for summary in partition_summaries: - field.summary.update(summary["field_summaries"][field.full_name]) + field.summary.update(summary.field_summaries[field.full_name]) logger.info("Finalising metadata") with open(self.path / "metadata.json", "w") as f: - json.dump(self.metadata.asdict(), f, indent=4) + f.write(self.metadata.asjson()) logger.debug("Removing WIP directory") shutil.rmtree(self.wip_path) @@ -1197,14 +1229,9 @@ def explode_init( ) -# NOTE only including worker_processes here so we can use the 0 option to get the -# work done syncronously and so we can get test coverage on it. Should find a -# better way to do this. -def explode_partition(icf_path, partition, *, show_progress=False, worker_processes=1): +def explode_partition(icf_path, partition): writer = IntermediateColumnarFormatWriter(icf_path) - writer.explode_partition( - partition, show_progress=show_progress, worker_processes=worker_processes - ) + writer.explode_partition(partition) def explode_finalise(icf_path): @@ -1332,7 +1359,7 @@ def variant_chunk_nbytes(self): return chunk_items * dt.itemsize -ZARR_SCHEMA_FORMAT_VERSION = "0.2" +ZARR_SCHEMA_FORMAT_VERSION = "0.3" @dataclasses.dataclass @@ -1341,11 +1368,10 @@ class VcfZarrSchema: samples_chunk_size: int variants_chunk_size: int dimensions: list - sample_id: list - contig_id: list - contig_length: list - filter_id: list - columns: dict + samples: list + contigs: list + filters: list + fields: dict def asdict(self): return dataclasses.asdict(self) @@ -1361,8 +1387,11 @@ def fromdict(d): f"{d['format_version']} != {ZARR_SCHEMA_FORMAT_VERSION}" ) ret = VcfZarrSchema(**d) - ret.columns = { - key: ZarrColumnSpec(**value) for key, value in d["columns"].items() + ret.samples = [Sample(**sd) for sd in d["samples"]] + ret.contigs = [Contig(**sd) for sd in d["contigs"]] + ret.filters = [Filter(**sd) for sd in d["filters"]] + ret.fields = { + key: ZarrColumnSpec(**value) for key, value in d["fields"].items() } return ret @@ -1406,7 +1435,7 @@ def fixed_field_spec( chunks=[variants_chunk_size], ) - alt_col = icf.columns["ALT"] + alt_col = icf.fields["ALT"] max_alleles = alt_col.vcf_field.summary.max_number + 1 colspecs = [ @@ -1498,12 +1527,11 @@ def fixed_field_spec( format_version=ZARR_SCHEMA_FORMAT_VERSION, samples_chunk_size=samples_chunk_size, variants_chunk_size=variants_chunk_size, - columns={col.name: col for col in colspecs}, + fields={col.name: col for col in colspecs}, dimensions=["variants", "samples", "ploidy", "alleles", "filters"], - sample_id=icf.metadata.samples, - contig_id=icf.metadata.contig_names, - contig_length=icf.metadata.contig_lengths, - filter_id=icf.metadata.filters, + samples=icf.metadata.samples, + contigs=icf.metadata.contigs, + filters=icf.metadata.filters, ) @@ -1671,7 +1699,7 @@ def init( store = zarr.DirectoryStore(self.arrays_path) root = zarr.group(store=store) - for column in self.schema.columns.values(): + for column in self.schema.fields.values(): self.init_array(root, column, partitions[-1].stop) logger.info("Writing WIP metadata") @@ -1680,13 +1708,13 @@ def init( return len(partitions) def encode_samples(self, root): - if not np.array_equal(self.schema.sample_id, self.icf.metadata.samples): + if self.schema.samples != self.icf.metadata.samples: raise ValueError( "Subsetting or reordering samples not supported currently" ) # NEEDS TEST array = root.array( "sample_id", - self.schema.sample_id, + [sample.id for sample in self.schema.samples], dtype="str", compressor=DEFAULT_ZARR_COMPRESSOR, chunks=(self.schema.samples_chunk_size,), @@ -1697,24 +1725,26 @@ def encode_samples(self, root): def encode_contig_id(self, root): array = root.array( "contig_id", - self.schema.contig_id, + [contig.id for contig in self.schema.contigs], dtype="str", compressor=DEFAULT_ZARR_COMPRESSOR, ) array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"] - if self.schema.contig_length is not None: + if all(contig.length is not None for contig in self.schema.contigs): array = root.array( "contig_length", - self.schema.contig_length, + [contig.length for contig in self.schema.contigs], dtype=np.int64, compressor=DEFAULT_ZARR_COMPRESSOR, ) array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"] def encode_filter_id(self, root): + # TODO need a way to store description also + # https://github.com/sgkit-dev/vcf-zarr-spec/issues/19 array = root.array( "filter_id", - self.schema.filter_id, + [filt.id for filt in self.schema.filters], dtype="str", compressor=DEFAULT_ZARR_COMPRESSOR, ) @@ -1782,16 +1812,16 @@ def encode_partition(self, partition_index): self.encode_filters_partition(partition_index) self.encode_contig_partition(partition_index) self.encode_alleles_partition(partition_index) - for col in self.schema.columns.values(): + for col in self.schema.fields.values(): if col.vcf_field is not None: self.encode_array_partition(col, partition_index) - if "call_genotype" in self.schema.columns: + if "call_genotype" in self.schema.fields: self.encode_genotypes_partition(partition_index) final_path = self.partition_path(partition_index) logger.info(f"Finalising {partition_index} at {final_path}") if final_path.exists(): - logger.warning("Removing existing partition at {final_path}") + logger.warning(f"Removing existing partition at {final_path}") shutil.rmtree(final_path) os.rename(partition_path, final_path) @@ -1813,7 +1843,7 @@ def encode_array_partition(self, column, partition_index): partition = self.metadata.partitions[partition_index] ba = core.BufferedArray(array, partition.start) - source_col = self.icf.columns[column.vcf_field] + source_col = self.icf.fields[column.vcf_field] sanitiser = source_col.sanitiser_factory(ba.buff.shape) for value in source_col.iter_values(partition.start, partition.stop): @@ -1836,7 +1866,7 @@ def encode_genotypes_partition(self, partition_index): gt_mask = core.BufferedArray(gt_mask_array, partition.start) gt_phased = core.BufferedArray(gt_phased_array, partition.start) - source_col = self.icf.columns["FORMAT/GT"] + source_col = self.icf.fields["FORMAT/GT"] for value in source_col.iter_values(partition.start, partition.stop): j = gt.next_buffer_row() sanitise_value_int_2d(gt.buff, j, value[:, :-1]) @@ -1859,8 +1889,8 @@ def encode_alleles_partition(self, partition_index): alleles_array = self.init_partition_array(partition_index, array_name) partition = self.metadata.partitions[partition_index] alleles = core.BufferedArray(alleles_array, partition.start) - ref_col = self.icf.columns["REF"] - alt_col = self.icf.columns["ALT"] + ref_col = self.icf.fields["REF"] + alt_col = self.icf.fields["ALT"] for ref, alt in zip( ref_col.iter_values(partition.start, partition.stop), @@ -1880,7 +1910,7 @@ def encode_id_partition(self, partition_index): partition = self.metadata.partitions[partition_index] vid = core.BufferedArray(vid_array, partition.start) vid_mask = core.BufferedArray(vid_mask_array, partition.start) - col = self.icf.columns["ID"] + col = self.icf.fields["ID"] for value in col.iter_values(partition.start, partition.stop): j = vid.next_buffer_row() @@ -1899,13 +1929,13 @@ def encode_id_partition(self, partition_index): self.finalise_partition_array(partition_index, "variant_id_mask") def encode_filters_partition(self, partition_index): - lookup = {filt: index for index, filt in enumerate(self.schema.filter_id)} + lookup = {filt.id: index for index, filt in enumerate(self.schema.filters)} array_name = "variant_filter" array = self.init_partition_array(partition_index, array_name) partition = self.metadata.partitions[partition_index] var_filter = core.BufferedArray(array, partition.start) - col = self.icf.columns["FILTERS"] + col = self.icf.fields["FILTERS"] for value in col.iter_values(partition.start, partition.stop): j = var_filter.next_buffer_row() var_filter.buff[j] = False @@ -1921,12 +1951,12 @@ def encode_filters_partition(self, partition_index): self.finalise_partition_array(partition_index, array_name) def encode_contig_partition(self, partition_index): - lookup = {contig: index for index, contig in enumerate(self.schema.contig_id)} + lookup = {contig.id: index for index, contig in enumerate(self.schema.contigs)} array_name = "variant_contig" array = self.init_partition_array(partition_index, array_name) partition = self.metadata.partitions[partition_index] contig = core.BufferedArray(array, partition.start) - col = self.icf.columns["CHROM"] + col = self.icf.fields["CHROM"] for value in col.iter_values(partition.start, partition.stop): j = contig.next_buffer_row() @@ -1986,7 +2016,7 @@ def finalise(self, show_progress=False): raise FileNotFoundError(f"Partitions not encoded: {missing}") progress_config = core.ProgressConfig( - total=len(self.schema.columns), + total=len(self.schema.fields), title="Finalise", units="array", show=show_progress, @@ -2000,7 +2030,7 @@ def finalise(self, show_progress=False): # for multiple workers, or making a standard wrapper for tqdm # that allows us to have a consistent look and feel. with core.ParallelWorkManager(0, progress_config) as pwm: - for name in self.schema.columns: + for name in self.schema.fields: pwm.submit(self.finalise_array, name) logger.debug(f"Removing {self.wip_path}") shutil.rmtree(self.wip_path) @@ -2016,18 +2046,17 @@ def get_max_encoding_memory(self): Return the approximate maximum memory used to encode a variant chunk. """ max_encoding_mem = max( - col.variant_chunk_nbytes for col in self.schema.columns.values() + col.variant_chunk_nbytes for col in self.schema.fields.values() ) gt_mem = 0 - if "call_genotype" in self.schema.columns: + if "call_genotype" in self.schema.fields: encoded_together = [ "call_genotype", "call_genotype_phased", "call_genotype_mask", ] gt_mem = sum( - self.schema.columns[col].variant_chunk_nbytes - for col in encoded_together + self.schema.fields[col].variant_chunk_nbytes for col in encoded_together ) return max(max_encoding_mem, gt_mem) @@ -2059,7 +2088,7 @@ def encode_all_partitions( num_workers = min(max_num_workers, worker_processes) total_bytes = 0 - for col in self.schema.columns.values(): + for col in self.schema.fields.values(): # Open the array definition to get the total size total_bytes += zarr.open(self.arrays_path / col.name).nbytes diff --git a/bio2zarr/vcf_utils.py b/bio2zarr/vcf_utils.py index 8b201b8..30fc9d3 100644 --- a/bio2zarr/vcf_utils.py +++ b/bio2zarr/vcf_utils.py @@ -76,6 +76,10 @@ def read_bytes_as_tuple(f: IO[Any], fmt: str) -> Sequence[Any]: @dataclass class Region: + """ + A htslib style region, where coordinates are 1-based and inclusive. + """ + contig: str start: Optional[int] = None end: Optional[int] = None @@ -86,7 +90,7 @@ def __post_init__(self): assert self.start > 0 if self.end is not None: self.end = int(self.end) - assert self.end > self.start + assert self.end >= self.start def __str__(self): s = f"{self.contig}" @@ -113,6 +117,9 @@ class CSIBin: chunks: Sequence[Chunk] +RECORD_COUNT_UNKNOWN = np.inf + + @dataclass class CSIIndex: min_shift: int @@ -221,7 +228,9 @@ def read_csi( for _ in range(n_ref): n_bin = read_bytes_as_value(f, "= start: yield var - def _filter_empty(self, regions): + def _filter_empty_and_refine(self, regions): """ - Return all regions in the specified list that have one or more records. - - Sometimes with Tabix indexes these seem to crop up: - - - https://github.com/sgkit-dev/bio2zarr/issues/45 - - https://github.com/sgkit-dev/bio2zarr/issues/120 + Return all regions in the specified list that have one or more records, + and refine the start coordinate of the region to be the actual first coord """ ret = [] for region in regions: - variants = self.variants(region) - if next(variants, None) is not None: + var = next(self.variants(region), None) + if var is not None: + region.start = var.POS ret.append(region) return ret @@ -528,4 +536,4 @@ def partition_into_regions( if self.index.record_counts[ri] > 0: regions.append(Region(self.sequence_names[ri])) - return self._filter_empty(regions) + return self._filter_empty_and_refine(regions) diff --git a/tests/data/vcf/CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz b/tests/data/vcf/CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz new file mode 100644 index 0000000..19c4701 Binary files /dev/null and b/tests/data/vcf/CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz differ diff --git a/tests/data/vcf/CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz.tbi b/tests/data/vcf/CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz.tbi new file mode 100644 index 0000000..b40852d Binary files /dev/null and b/tests/data/vcf/CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz.tbi differ diff --git a/tests/data/vcf/sample_extra_contig.bcf b/tests/data/vcf/sample_extra_contig.bcf new file mode 100644 index 0000000..103eba8 Binary files /dev/null and b/tests/data/vcf/sample_extra_contig.bcf differ diff --git a/tests/data/vcf/sample_extra_contig.bcf.csi b/tests/data/vcf/sample_extra_contig.bcf.csi new file mode 100644 index 0000000..58dea24 Binary files /dev/null and b/tests/data/vcf/sample_extra_contig.bcf.csi differ diff --git a/tests/data/vcf/sample_extra_contig.vcf.gz b/tests/data/vcf/sample_extra_contig.vcf.gz new file mode 100644 index 0000000..c0bd87a Binary files /dev/null and b/tests/data/vcf/sample_extra_contig.vcf.gz differ diff --git a/tests/data/vcf/sample_extra_contig.vcf.gz.csi b/tests/data/vcf/sample_extra_contig.vcf.gz.csi new file mode 100644 index 0000000..32aa0cc Binary files /dev/null and b/tests/data/vcf/sample_extra_contig.vcf.gz.csi differ diff --git a/tests/data/vcf/sample_old_tabix.vcf.gz b/tests/data/vcf/sample_old_tabix.vcf.gz new file mode 100644 index 0000000..c0bd87a Binary files /dev/null and b/tests/data/vcf/sample_old_tabix.vcf.gz differ diff --git a/tests/data/vcf/sample_old_tabix.vcf.gz.tbi b/tests/data/vcf/sample_old_tabix.vcf.gz.tbi new file mode 100644 index 0000000..75b755d Binary files /dev/null and b/tests/data/vcf/sample_old_tabix.vcf.gz.tbi differ diff --git a/tests/test_cli.py b/tests/test_cli.py index 8c22a38..e60c9a2 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -14,7 +14,7 @@ show_progress=True, ) -DEFAULT_DEXPLODE_PARTITION_ARGS = dict(show_progress=False) +DEFAULT_DEXPLODE_PARTITION_ARGS = dict() DEFAULT_DEXPLODE_INIT_ARGS = dict( worker_processes=1, @@ -614,10 +614,10 @@ def test_num_parts(self): cli.vcf_partition, [path, "-n", "5"], catch_exceptions=False ) assert list(result.stdout.splitlines()) == [ - "20:1-278528", + "20:60001-278528", "20:278529-442368", - "20:442369-638976", - "20:638977-819200", + "20:442381-638976", + "20:638982-819200", "20:819201-", ] diff --git a/tests/test_core.py b/tests/test_core.py index bdb1e9f..312f145 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -188,8 +188,8 @@ def test_5_chunk_1(self, n, expected): # It *might* work in CI, but it may well not either, as it's # probably dependent on a whole bunch of things. Expect to fail # at some point. - ("tests/data", 4630726), - ("tests/data/vcf", 4618589), + ("tests/data", 4960266), + ("tests/data/vcf", 4948129), ("tests/data/vcf/sample.vcf.gz", 1089), ], ) diff --git a/tests/test_icf.py b/tests/test_icf.py index 8cf9d87..4580000 100644 --- a/tests/test_icf.py +++ b/tests/test_icf.py @@ -13,7 +13,7 @@ class TestSmallExample: data_path = "tests/data/vcf/sample.vcf.gz" # fmt: off - columns = ( + fields = ( 'ALT', 'CHROM', 'FILTERS', 'FORMAT/DP', 'FORMAT/GQ', 'FORMAT/GT', 'FORMAT/HQ', 'ID', 'INFO/AA', 'INFO/AC', 'INFO/AF', 'INFO/AN', 'INFO/DB', 'INFO/DP', 'INFO/H2', @@ -46,14 +46,14 @@ def test_mkschema(self, tmp_path, icf): def test_summary_table(self, icf): data = icf.summary_table() cols = [d["name"] for d in data] - assert tuple(sorted(cols)) == self.columns + assert tuple(sorted(cols)) == self.fields def test_inspect(self, icf): assert icf.summary_table() == vcf.inspect(icf.path) def test_mapping_methods(self, icf): - assert len(icf) == len(self.columns) - assert icf["ALT"] is icf.columns["ALT"] + assert len(icf) == len(self.fields) + assert icf["ALT"] is icf.fields["ALT"] assert list(iter(icf)) == list(iter(icf)) def test_num_partitions(self, icf): @@ -94,7 +94,7 @@ class TestIcfWriterExample: data_path = "tests/data/vcf/sample.vcf.gz" # fmt: off - columns = ( + fields = ( 'ALT', 'CHROM', 'FILTERS', 'FORMAT/DP', 'FORMAT/GQ', 'FORMAT/GT', 'FORMAT/HQ', 'ID', 'INFO/AA', 'INFO/AC', 'INFO/AF', 'INFO/AN', 'INFO/DB', 'INFO/DP', 'INFO/H2', @@ -110,7 +110,7 @@ def test_init_paths(self, tmp_path): assert icf_path.exists() wip_path = icf_path / "wip" assert wip_path.exists() - for column in self.columns: + for column in self.fields: col_path = icf_path / column assert col_path.exists() assert col_path.is_dir() @@ -147,7 +147,7 @@ def test_finalise_missing_partition_fails(self, tmp_path, partition): def test_explode_partition(self, tmp_path, partition): icf_path = tmp_path / "x.icf" vcf.explode_init(icf_path, [self.data_path]) - summary_file = icf_path / "wip" / f"p{partition}_summary.json" + summary_file = icf_path / "wip" / f"p{partition}.json" assert not summary_file.exists() vcf.explode_partition(icf_path, partition) assert summary_file.exists() @@ -156,12 +156,12 @@ def test_double_explode_partition(self, tmp_path): partition = 1 icf_path = tmp_path / "x.icf" vcf.explode_init(icf_path, [self.data_path]) - summary_file = icf_path / "wip" / f"p{partition}_summary.json" + summary_file = icf_path / "wip" / f"p{partition}.json" assert not summary_file.exists() - vcf.explode_partition(icf_path, partition, worker_processes=0) + vcf.explode_partition(icf_path, partition) with open(summary_file) as f: s1 = f.read() - vcf.explode_partition(icf_path, partition, worker_processes=0) + vcf.explode_partition(icf_path, partition) with open(summary_file) as f: s2 = f.read() assert s1 == s2 @@ -173,6 +173,16 @@ def test_explode_partition_out_of_range(self, tmp_path, partition): with pytest.raises(ValueError, match="Partition index must be in the range"): vcf.explode_partition(icf_path, partition) + def test_explode_same_file_twice(self, tmp_path): + icf_path = tmp_path / "x.icf" + with pytest.raises(ValueError, match="Duplicate path provided"): + vcf.explode(icf_path, [self.data_path, self.data_path]) + + def test_explode_same_data_twice(self, tmp_path): + icf_path = tmp_path / "x.icf" + with pytest.raises(ValueError, match="Overlapping VCF regions"): + vcf.explode(icf_path, [self.data_path, "tests/data/vcf/sample.bcf"]) + class TestGeneratedFieldsExample: data_path = "tests/data/vcf/field_type_combos.vcf.gz" @@ -218,7 +228,7 @@ def schema(self, icf): ], ) def test_info_schemas(self, schema, name, dtype, shape, dimensions): - v = schema.columns[name] + v = schema.fields[name] assert v.dtype == dtype assert tuple(v.shape) == shape assert v.dimensions == dimensions diff --git a/tests/test_vcf.py b/tests/test_vcf.py index 60fa1ae..40723dc 100644 --- a/tests/test_vcf.py +++ b/tests/test_vcf.py @@ -140,28 +140,28 @@ def test_generated_no_changes(self, icf_path): icf = vcf.IntermediateColumnarFormat(icf_path) self.assert_json_round_trip(vcf.VcfZarrSchema.generate(icf)) - def test_generated_no_columns(self, icf_path): + def test_generated_no_fields(self, icf_path): icf = vcf.IntermediateColumnarFormat(icf_path) schema = vcf.VcfZarrSchema.generate(icf) - schema.columns.clear() + schema.fields.clear() self.assert_json_round_trip(schema) def test_generated_no_samples(self, icf_path): icf = vcf.IntermediateColumnarFormat(icf_path) schema = vcf.VcfZarrSchema.generate(icf) - schema.sample_id.clear() + schema.samples.clear() self.assert_json_round_trip(schema) def test_generated_change_dtype(self, icf_path): icf = vcf.IntermediateColumnarFormat(icf_path) schema = vcf.VcfZarrSchema.generate(icf) - schema.columns["variant_position"].dtype = "i8" + schema.fields["variant_position"].dtype = "i8" self.assert_json_round_trip(schema) def test_generated_change_compressor(self, icf_path): icf = vcf.IntermediateColumnarFormat(icf_path) schema = vcf.VcfZarrSchema.generate(icf) - schema.columns["variant_position"].compressor = {"cname": "FAKE"} + schema.fields["variant_position"].compressor = {"cname": "FAKE"} self.assert_json_round_trip(schema) @@ -173,7 +173,7 @@ def test_codec(self, tmp_path, icf_path, cname, clevel, shuffle): zarr_path = tmp_path / "zarr" icf = vcf.IntermediateColumnarFormat(icf_path) schema = vcf.VcfZarrSchema.generate(icf) - for var in schema.columns.values(): + for var in schema.fields.values(): var.compressor["cname"] = cname var.compressor["clevel"] = clevel var.compressor["shuffle"] = shuffle @@ -182,7 +182,7 @@ def test_codec(self, tmp_path, icf_path, cname, clevel, shuffle): f.write(schema.asjson()) vcf.encode(icf_path, zarr_path, schema_path=schema_path) root = zarr.open(zarr_path) - for var in schema.columns.values(): + for var in schema.fields.values(): a = root[var.name] assert a.compressor.cname == cname assert a.compressor.clevel == clevel @@ -193,7 +193,7 @@ def test_genotype_dtype(self, tmp_path, icf_path, dtype): zarr_path = tmp_path / "zarr" icf = vcf.IntermediateColumnarFormat(icf_path) schema = vcf.VcfZarrSchema.generate(icf) - schema.columns["call_genotype"].dtype = dtype + schema.fields["call_genotype"].dtype = dtype schema_path = tmp_path / "schema" with open(schema_path, "w") as f: f.write(schema.asjson()) @@ -219,20 +219,25 @@ def test_dimensions(self, schema): "filters", ] - def test_sample_id(self, schema): - assert schema["sample_id"] == ["NA00001", "NA00002", "NA00003"] - - def test_contig_id(self, schema): - assert schema["contig_id"] == ["19", "20", "X"] + def test_samples(self, schema): + assert schema["samples"] == [ + {"id": s} for s in ["NA00001", "NA00002", "NA00003"] + ] - def test_contig_length(self, schema): - assert schema["contig_length"] is None + def test_contigs(self, schema): + assert schema["contigs"] == [ + {"id": s, "length": None} for s in ["19", "20", "X"] + ] - def test_filter_id(self, schema): - assert schema["filter_id"] == ["PASS", "s50", "q10"] + def test_filters(self, schema): + assert schema["filters"] == [ + {"id": "PASS", "description": "All filters passed"}, + {"id": "s50", "description": "Less than 50% of samples have data"}, + {"id": "q10", "description": "Quality below 10"}, + ] def test_variant_contig(self, schema): - assert schema["columns"]["variant_contig"] == { + assert schema["fields"]["variant_contig"] == { "name": "variant_contig", "dtype": "i1", "shape": [9], @@ -251,7 +256,7 @@ def test_variant_contig(self, schema): } def test_call_genotype(self, schema): - assert schema["columns"]["call_genotype"] == { + assert schema["fields"]["call_genotype"] == { "name": "call_genotype", "dtype": "i1", "shape": [9, 3, 2], @@ -270,7 +275,7 @@ def test_call_genotype(self, schema): } def test_call_genotype_mask(self, schema): - assert schema["columns"]["call_genotype_mask"] == { + assert schema["fields"]["call_genotype_mask"] == { "name": "call_genotype_mask", "dtype": "bool", "shape": [9, 3, 2], @@ -289,7 +294,7 @@ def test_call_genotype_mask(self, schema): } def test_call_genotype_phased(self, schema): - assert schema["columns"]["call_genotype_mask"] == { + assert schema["fields"]["call_genotype_mask"] == { "name": "call_genotype_mask", "dtype": "bool", "shape": [9, 3, 2], @@ -308,7 +313,7 @@ def test_call_genotype_phased(self, schema): } def test_call_GQ(self, schema): - assert schema["columns"]["call_GQ"] == { + assert schema["fields"]["call_GQ"] == { "name": "call_GQ", "dtype": "i1", "shape": [9, 3], @@ -334,6 +339,8 @@ def test_call_GQ(self, schema): [("1", 100, 200), ("1", 150, 250)], # Overlap by one position [("1", 100, 201), ("1", 200, 300)], + # End coord is *inclusive* + [("1", 100, 201), ("1", 201, 300)], # Contained overlap [("1", 100, 300), ("1", 150, 250)], # Exactly equal @@ -345,8 +352,8 @@ def test_check_overlap(regions): vcf.VcfPartition("", region=vcf_utils.Region(contig, start, end)) for contig, start, end in regions ] - with pytest.raises(ValueError, match="Multiple VCFs have the region"): - vcf.check_overlap(partitions) + with pytest.raises(ValueError, match="Overlapping VCF regions"): + vcf.check_overlapping_partitions(partitions) class TestVcfDescriptions: @@ -371,7 +378,7 @@ class TestVcfDescriptions: ], ) def test_fields(self, schema, field, description): - assert schema["columns"][field]["description"] == description + assert schema["fields"][field]["description"] == description # This information is not in the schema yet, # https://github.com/sgkit-dev/bio2zarr/issues/123 diff --git a/tests/test_vcf_examples.py b/tests/test_vcf_examples.py index ad06ea0..07545b1 100644 --- a/tests/test_vcf_examples.py +++ b/tests/test_vcf_examples.py @@ -832,6 +832,7 @@ def test_duplicate_paths(self, tmp_path): "name", [ "sample.vcf.gz", + "sample_old_tabix.vcf.gz", "sample_no_genotypes.vcf.gz", "1kg_2020_chrM.vcf.gz", "field_type_combos.vcf.gz", @@ -880,7 +881,7 @@ def test_split_explode(tmp_path): vcf.explode_partition(out, j) vcf.explode_finalise(out) pcvcf = vcf.IntermediateColumnarFormat(out) - assert pcvcf.columns["POS"].vcf_field.summary.asdict() == { + assert pcvcf.fields["POS"].vcf_field.summary.asdict() == { "num_chunks": 3, "compressed_size": 587, "uncompressed_size": 1008, diff --git a/tests/test_vcf_utils.py b/tests/test_vcf_utils.py index 476ce56..4800176 100644 --- a/tests/test_vcf_utils.py +++ b/tests/test_vcf_utils.py @@ -4,6 +4,7 @@ import pytest from bio2zarr import vcf_utils +from bio2zarr.vcf_utils import RECORD_COUNT_UNKNOWN data_path = pathlib.Path("tests/data/vcf/") @@ -33,9 +34,23 @@ def test_context_manager_error(self): ("index_file", "expected"), [ ("sample.vcf.gz.tbi", {"19": 2, "20": 6, "X": 1}), + ( + "sample_old_tabix.vcf.gz.tbi", + { + "19": RECORD_COUNT_UNKNOWN, + "20": RECORD_COUNT_UNKNOWN, + "X": RECORD_COUNT_UNKNOWN, + }, + ), ("sample.bcf.csi", {"19": 2, "20": 6, "X": 1}), + ("sample_extra_contig.vcf.gz.csi", {"19": 2, "20": 6, "X": 1}), + ("sample_extra_contig.bcf.csi", {"19": 2, "20": 6, "X": 1}), ("sample_no_genotypes.vcf.gz.csi", {"19": 2, "20": 6, "X": 1}), ("CEUTrio.20.21.gatk3.4.g.vcf.bgz.tbi", {"20": 3450, "21": 16460}), + ( + "CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz.tbi", + {"20": RECORD_COUNT_UNKNOWN, "21": RECORD_COUNT_UNKNOWN}, + ), ("CEUTrio.20.21.gatk3.4.g.bcf.csi", {"20": 3450, "21": 16460}), ("1kg_2020_chrM.vcf.gz.tbi", {"chrM": 23}), ("1kg_2020_chrM.vcf.gz.csi", {"chrM": 23}), @@ -52,17 +67,21 @@ def test_contig_record_counts(self, index_file, expected): @pytest.mark.parametrize( ("index_file", "expected"), [ - ("sample.vcf.gz.tbi", ["19:1-", "20", "X"]), - ("sample.bcf.csi", ["19:1-", "20", "X"]), - ("sample_no_genotypes.vcf.gz.csi", ["19:1-", "20", "X"]), - ("CEUTrio.20.21.gatk3.4.g.vcf.bgz.tbi", ["20:1-", "21"]), - ("CEUTrio.20.21.gatk3.4.g.bcf.csi", ["20:1-", "21"]), - ("1kg_2020_chrM.vcf.gz.tbi", ["chrM:1-"]), - ("1kg_2020_chrM.vcf.gz.csi", ["chrM:1-"]), - ("1kg_2020_chrM.bcf.csi", ["chrM:1-"]), - ("1kg_2020_chr20_annotations.bcf.csi", ["chr20:49153-"]), - ("NA12878.prod.chr20snippet.g.vcf.gz.tbi", ["20:1-"]), - ("multi_contig.vcf.gz.tbi", ["0:1-"] + [str(j) for j in range(1, 5)]), + ("sample.vcf.gz.tbi", ["19:111-", "20:14370-", "X:10-"]), + ("sample_old_tabix.vcf.gz.tbi", ["19:111-", "20:14370-", "X:10-"]), + ("sample.bcf.csi", ["19:111-", "20:14370-", "X:10-"]), + ("sample_extra_contig.bcf.csi", ["19:111-", "20:14370-", "X:10-"]), + ("sample_extra_contig.vcf.gz.csi", ["19:111-", "20:14370-", "X:10-"]), + ("sample_no_genotypes.vcf.gz.csi", ["19:111-", "20:14370-", "X:10-"]), + ("CEUTrio.20.21.gatk3.4.g.vcf.bgz.tbi", ["20:1-", "21:1-"]), + ("CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz.tbi", ["20:1-", "21:1-"]), + ("CEUTrio.20.21.gatk3.4.g.bcf.csi", ["20:1-", "21:1-"]), + ("1kg_2020_chrM.vcf.gz.tbi", ["chrM:26-"]), + ("1kg_2020_chrM.vcf.gz.csi", ["chrM:26-"]), + ("1kg_2020_chrM.bcf.csi", ["chrM:26-"]), + ("1kg_2020_chr20_annotations.bcf.csi", ["chr20:60070-"]), + ("NA12878.prod.chr20snippet.g.vcf.gz.tbi", ["20:60001-"]), + ("multi_contig.vcf.gz.tbi", [f"{j}:1-" for j in range(5)]), ], ) def test_partition_into_one_part(self, index_file, expected): @@ -75,9 +94,11 @@ def test_partition_into_one_part(self, index_file, expected): ("index_file", "num_expected", "total_records"), [ ("sample.vcf.gz.tbi", 3, 9), + ("sample_old_tabix.vcf.gz.tbi", 3, 9), ("sample.bcf.csi", 3, 9), ("sample_no_genotypes.vcf.gz.csi", 3, 9), ("CEUTrio.20.21.gatk3.4.g.vcf.bgz.tbi", 17, 19910), + ("CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz.tbi", 17, 19910), ("CEUTrio.20.21.gatk3.4.g.bcf.csi", 3, 19910), ("1kg_2020_chrM.vcf.gz.tbi", 1, 23), ("1kg_2020_chrM.vcf.gz.csi", 1, 23), @@ -103,9 +124,11 @@ def test_partition_into_max_parts(self, index_file, num_expected, total_records) ("index_file", "total_records"), [ ("sample.vcf.gz.tbi", 9), + ("sample_old_tabix.vcf.gz.tbi", 9), ("sample.bcf.csi", 9), ("sample_no_genotypes.vcf.gz.csi", 9), ("CEUTrio.20.21.gatk3.4.g.vcf.bgz.tbi", 19910), + ("CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz.tbi", 19910), ("CEUTrio.20.21.gatk3.4.g.bcf.csi", 19910), ("1kg_2020_chrM.vcf.gz.tbi", 23), ("1kg_2020_chrM.vcf.gz.csi", 23), @@ -132,7 +155,7 @@ def test_tabix_multi_chrom_bug(self): # An earlier version of the code returned this, i.e. with a duplicate # for 4 with end coord of 0 # ["0:1-", "1", "2", "3", "4:1-0", "4:1-"] - expected = ["0:1-", "1", "2", "3", "4:1-"] + expected = ["0:1-", "1:1-", "2:1-", "3:1-", "4:1-"] assert [str(r) for r in regions] == expected @pytest.mark.parametrize( @@ -143,8 +166,15 @@ def test_tabix_multi_chrom_bug(self): "100 kB", ], ) - def test_target_part_size(self, target_part_size): - indexed_vcf = self.get_instance("CEUTrio.20.21.gatk3.4.g.vcf.bgz.tbi") + @pytest.mark.parametrize( + "filename", + [ + "CEUTrio.20.21.gatk3.4.g.vcf.bgz.tbi", + "CEUTrio.20.21.gatk3.4.g.old_tabix.vcf.bgz.tbi", + ], + ) + def test_target_part_size(self, target_part_size, filename): + indexed_vcf = self.get_instance(filename) regions = indexed_vcf.partition_into_regions(target_part_size=target_part_size) assert len(regions) == 5 part_variant_counts = [indexed_vcf.count_variants(region) for region in regions] diff --git a/validation-data/Makefile b/validation-data/Makefile index 283f85a..283f56f 100644 --- a/validation-data/Makefile +++ b/validation-data/Makefile @@ -39,9 +39,14 @@ all: 1kg_2020_chr20.bcf.csi \ # 1000 genomes phase 1 1KG_P1_ALL_URL=http://ftp.1000genomes.ebi.ac.uk/vol1/ftp/phase1/analysis_results/integrated_call_sets/ALL.chr6.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.gz +old_tabix: + rm -fR tabix old_tabix + git clone https://github.com/samtools/tabix.git + cd tabix && make + cp tabix/tabix ./old_tabix %.vcf.gz.tbi: %.vcf.gz - tabix $< + ./old_tabix $< %.2.split: % ./split.sh $< 2