Skip to content

Commit

Permalink
Merge pull request #32 from constantinpape/fix-ome-zarr
Browse files Browse the repository at this point in the history
Fix downscaling with ome zarr metadata format
  • Loading branch information
constantinpape authored Mar 17, 2022
2 parents 4500508 + 2983fe0 commit fe0adba
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 110 deletions.
92 changes: 47 additions & 45 deletions cluster_tools/copy_volume/copy_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

# TODO this should maybe be go to z5py, but first check if it actually works
DTYPE_MAPPING = {
'>u2': 'uint16',
'>u4': 'uint32',
'>u8': 'uint64',
'>i2': 'int16',
'>i4': 'int32',
'>i8': 'int64',
'>f4': 'float32',
'>f8': 'float64'
">u2": "uint16",
">u4": "uint32",
">u8": "uint64",
">i2": "int16",
">i4": "int32",
">i8": "int64",
">f4": "float32",
">f8": "float64"
}


Expand All @@ -36,7 +36,7 @@ class CopyVolumeBase(luigi.Task):
""" copy_volume base class
"""

task_name = 'copy_volume'
task_name = "copy_volume"
src_file = os.path.abspath(__file__)

# input and output volumes
Expand All @@ -48,15 +48,16 @@ class CopyVolumeBase(luigi.Task):
dtype = luigi.Parameter(default=None)
fit_to_roi = luigi.BoolParameter(default=False)
effective_scale_factor = luigi.ListParameter(default=[])
dimension_separator = luigi.Parameter(default=None)
dependency = luigi.TaskParameter(default=DummyTask())

@staticmethod
def default_task_config():
# we use this to get also get the common default config
config = LocalTask.default_task_config()
config.update({'chunks': None, 'compression': 'gzip',
'reduce_channels': None, 'map_uniform_blocks_to_background': False,
'value_list': None, 'offset': None, 'insert_mode': False})
config.update({"chunks": None, "compression": "gzip",
"reduce_channels": None, "map_uniform_blocks_to_background": False,
"value_list": None, "offset": None, "insert_mode": False})
return config

def requires(self):
Expand All @@ -72,15 +73,15 @@ def run_impl(self):
self.init(shebang)

# get shape, dtype and make block config
with vu.file_reader(self.input_path, 'r') as f:
with vu.file_reader(self.input_path, "r") as f:
ds = f[self.input_key]
shape = ds.shape
ds_chunks = ds.chunks

# if this is a label multi-set, the dtypes needs to be changed
# to be uint64
is_label_multiset = ds.attrs.get('isLabelMultiset', False)
ds_dtype = 'uint64' if is_label_multiset else ds.dtype
is_label_multiset = ds.attrs.get("isLabelMultiset", False)
ds_dtype = "uint64" if is_label_multiset else ds.dtype

# load the config
task_config = self.get_task_config()
Expand All @@ -103,37 +104,38 @@ def run_impl(self):
out_shape = tuple(roie - roib for roib, roie in zip(roi_begin, roi_end))
# if we fit to roi, the task config needs to be updated with the roi begin,
# because the output bounding boxes need to be offseted by roi_begin
task_config.update({'roi_begin': roi_begin})
task_config.update({"roi_begin": roi_begin})
else:
out_shape = shape
else:
out_shape = shape

if task_config.get('reduce_channels', None) is not None and len(out_shape) == 4:
if task_config.get("reduce_channels", None) is not None and len(out_shape) == 4:
out_shape = out_shape[1:]

compression = task_config.pop('compression', 'gzip')
compression = task_config.pop("compression", "gzip")
dtype = str(ds_dtype) if self.dtype is None else self.dtype
dtype = DTYPE_MAPPING.get(dtype, dtype)

chunks = task_config.pop('chunks', None)
chunks = task_config.pop("chunks", None)
chunks = tuple(block_shape) if chunks is None else chunks
if len(chunks) == 3 and ndim == 4:
chunks = (ds_chunks[0],) + chunks
assert all(bs % ch == 0 for bs, ch in zip(block_shape,
chunks[1:] if ndim == 4 else chunks))

# require output dataset
with vu.file_reader(self.output_path) as f:
file_kwargs = {} if self.dimension_separator is None else dict(dimension_separator=self.dimension_separator)
with vu.file_reader(self.output_path, mode="a", **file_kwargs) as f:
chunks = tuple(min(ch, sh) for ch, sh in zip(chunks, out_shape))
f.require_dataset(self.output_key, shape=out_shape, chunks=chunks,
compression=compression, dtype=dtype)

# update the config with input and output paths and keys
# as well as block shape
task_config.update({'input_path': self.input_path, 'input_key': self.input_key,
'output_path': self.output_path, 'output_key': self.output_key,
'block_shape': block_shape, 'dtype': dtype})
task_config.update({"input_path": self.input_path, "input_key": self.input_key,
"output_path": self.output_path, "output_key": self.output_key,
"block_shape": block_shape, "dtype": dtype})

if len(shape) == 4:
shape = shape[1:]
Expand All @@ -156,7 +158,7 @@ def run_impl(self):

def output(self):
return luigi.LocalTarget(os.path.join(self.tmp_folder,
self.task_name + '_%s.log' % self.prefix))
self.task_name + "_%s.log" % self.prefix))


class CopyVolumeLocal(CopyVolumeBase, LocalTask):
Expand Down Expand Up @@ -189,10 +191,10 @@ def cast_type(data, dtype):
if np.dtype(data.dtype) == np.dtype(dtype):
return data
# special casting for uint8
elif np.dtype(dtype) == 'uint8':
elif np.dtype(dtype) == "uint8":
data = vu.normalize(data)
data *= 255
return data.astype('uint8')
return data.astype("uint8")
else:
return data.astype(dtype)

Expand Down Expand Up @@ -265,44 +267,44 @@ def _copy_block(block_id):
def copy_volume(job_id, config_path):
fu.log("start processing job %i" % job_id)
fu.log("reading config from %s" % config_path)
with open(config_path, 'r') as f:
with open(config_path, "r") as f:
config = json.load(f)

# read the input cofig
input_path = config['input_path']
input_key = config['input_key']
input_path = config["input_path"]
input_key = config["input_key"]

block_shape = list(config['block_shape'])
block_list = config['block_list']
block_shape = list(config["block_shape"])
block_list = config["block_list"]

# read the output config
output_path = config['output_path']
output_key = config['output_key']
output_path = config["output_path"]
output_key = config["output_key"]

# check if we offset by roi
roi_begin = config.get('roi_begin', None)
roi_begin = config.get("roi_begin", None)

# check if we reduce channels
reduce_function = config.get('reduce_channels', None)
reduce_function = config.get("reduce_channels", None)
if reduce_function is not None:
reduce_function = getattr(np, reduce_function)

# check if we copy only specified values
value_list = config.get('value_list', None)
value_list = config.get("value_list", None)

# check if we have an offset value
offset = config.get('offset', None)
offset = config.get("offset", None)

# check if we are in insert mode
insert_mode = config.get('insert_mode', False)
insert_mode = config.get("insert_mode", False)

map_uniform_blocks_to_background = config.get('map_uniform_blocks_to_background', False)
n_threads = config.get('threads_per_job', 1)
map_uniform_blocks_to_background = config.get("map_uniform_blocks_to_background", False)
n_threads = config.get("threads_per_job", 1)

# submit blocks
with vu.file_reader(input_path, 'r') as f_in, vu.file_reader(output_path) as f_out:
with vu.file_reader(input_path, mode="r") as f_in, vu.file_reader(output_path, mode="a") as f_out:
ds_in = f_in[input_key]
if ds_in.attrs.get('isLabelMultiset', False):
if ds_in.attrs.get("isLabelMultiset", False):
ds_in = LabelMultisetWrapper(ds_in)
ds_out = f_out[output_key]

Expand All @@ -317,7 +319,7 @@ def copy_volume(job_id, config_path):
value_list, offset, insert_mode)

# copy the attributes with job 0
if job_id == 0 and hasattr(ds_in, 'attrs') and hasattr(ds_out, 'attrs'):
if job_id == 0 and hasattr(ds_in, "attrs") and hasattr(ds_out, "attrs"):
attrs_in = ds_in.attrs
for k, v in attrs_in.items():
ds_out.attrs[k] = v
Expand All @@ -326,8 +328,8 @@ def copy_volume(job_id, config_path):
fu.log_job_success(job_id)


if __name__ == '__main__':
if __name__ == "__main__":
path = sys.argv[1]
assert os.path.exists(path), path
job_id = int(os.path.split(path)[1].split('.')[0].split('_')[-1])
job_id = int(os.path.split(path)[1].split(".")[0].split("_")[-1])
copy_volume(job_id, path)
Loading

0 comments on commit fe0adba

Please sign in to comment.