forked from zarr-developers/numcodecs
-
Notifications
You must be signed in to change notification settings - Fork 1
/
fixedscaleoffset.py
140 lines (112 loc) · 4.19 KB
/
fixedscaleoffset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
import numpy as np
from .abc import Codec
from .compat import ensure_ndarray, ndarray_copy
class FixedScaleOffset(Codec):
"""Simplified version of the scale-offset filter available in HDF5.
Applies the transformation `(x - offset) * scale` to all chunks. Results
are rounded to the nearest integer but are not packed according to the
minimum number of bits.
Parameters
----------
offset : float
Value to subtract from data.
scale : int
Value to multiply by data.
dtype : dtype
Data type to use for decoded data.
astype : dtype, optional
Data type to use for encoded data.
Notes
-----
If `astype` is an integer data type, please ensure that it is
sufficiently large to store encoded values. No checks are made and data
may become corrupted due to integer overflow if `astype` is too small.
Examples
--------
>>> import numcodecs
>>> import numpy as np
>>> x = np.linspace(1000, 1001, 10, dtype='f8')
>>> x
array([1000. , 1000.11111111, 1000.22222222, 1000.33333333,
1000.44444444, 1000.55555556, 1000.66666667, 1000.77777778,
1000.88888889, 1001. ])
>>> codec = numcodecs.FixedScaleOffset(offset=1000, scale=10, dtype='f8', astype='u1')
>>> y1 = codec.encode(x)
>>> y1
array([ 0, 1, 2, 3, 4, 6, 7, 8, 9, 10], dtype=uint8)
>>> z1 = codec.decode(y1)
>>> z1
array([1000. , 1000.1, 1000.2, 1000.3, 1000.4, 1000.6, 1000.7,
1000.8, 1000.9, 1001. ])
>>> codec = numcodecs.FixedScaleOffset(offset=1000, scale=10**2, dtype='f8', astype='u1')
>>> y2 = codec.encode(x)
>>> y2
array([ 0, 11, 22, 33, 44, 56, 67, 78, 89, 100], dtype=uint8)
>>> z2 = codec.decode(y2)
>>> z2
array([1000. , 1000.11, 1000.22, 1000.33, 1000.44, 1000.56,
1000.67, 1000.78, 1000.89, 1001. ])
>>> codec = numcodecs.FixedScaleOffset(offset=1000, scale=10**3, dtype='f8', astype='u2')
>>> y3 = codec.encode(x)
>>> y3
array([ 0, 111, 222, 333, 444, 556, 667, 778, 889, 1000], dtype=uint16)
>>> z3 = codec.decode(y3)
>>> z3
array([1000. , 1000.111, 1000.222, 1000.333, 1000.444, 1000.556,
1000.667, 1000.778, 1000.889, 1001. ])
See Also
--------
numcodecs.quantize.Quantize
"""
codec_id = 'fixedscaleoffset'
def __init__(self, offset, scale, dtype, astype=None):
self.offset = offset
self.scale = scale
self.dtype = np.dtype(dtype)
if astype is None:
self.astype = self.dtype
else:
self.astype = np.dtype(astype)
if self.dtype == object or self.astype == object:
raise ValueError('object arrays are not supported')
def encode(self, buf):
# normalise input
arr = ensure_ndarray(buf).view(self.dtype)
# flatten to simplify implementation
arr = arr.reshape(-1, order='A')
# compute scale offset
enc = (arr - self.offset) * self.scale
# round to nearest integer
enc = np.around(enc)
# convert dtype
enc = enc.astype(self.astype, copy=False)
return enc
def decode(self, buf, out=None):
# interpret buffer as numpy array
enc = ensure_ndarray(buf).view(self.astype)
# flatten to simplify implementation
enc = enc.reshape(-1, order='A')
# decode scale offset
dec = (enc / self.scale) + self.offset
# convert dtype
dec = dec.astype(self.dtype, copy=False)
# handle output
return ndarray_copy(dec, out)
def get_config(self):
# override to handle encoding dtypes
return dict(
id=self.codec_id,
scale=self.scale,
offset=self.offset,
dtype=self.dtype.str,
astype=self.astype.str
)
def __repr__(self):
r = '%s(scale=%s, offset=%s, dtype=%r' % \
(type(self).__name__, self.scale, self.offset, self.dtype.str)
if self.astype != self.dtype:
r += ', astype=%r' % self.astype.str
r += ')'
return r