Skip to content

Commit

Permalink
Update and add new tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gsmalik committed May 12, 2021
1 parent bd1593d commit 9f6b553
Showing 1 changed file with 194 additions and 17 deletions.
211 changes: 194 additions & 17 deletions keras_lmu/tests/test_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import numpy as np
import pytest
import tensorflow as tf
from scipy.signal import cont2discrete

from keras_lmu import layers


def test_multivariate_lmu(rng):
@pytest.mark.parametrize("discretizer", ("zoh", "euler"))
def test_multivariate_lmu(rng, discretizer):
memory_d = 4
order = 16
n_steps = 10
n_steps = 10 * order
input_d = 32

input_enc = rng.uniform(0, 1, size=(input_d, memory_d))
Expand All @@ -23,6 +25,7 @@ def test_multivariate_lmu(rng):
memory_d=memory_d,
order=order,
theta=n_steps,
discretizer=discretizer,
kernel_initializer=tf.initializers.constant(input_enc),
hidden_cell=tf.keras.layers.SimpleRNNCell(
units=memory_d * order,
Expand All @@ -39,6 +42,7 @@ def test_multivariate_lmu(rng):
memory_d=1,
order=order,
theta=n_steps,
discretizer=discretizer,
kernel_initializer=tf.initializers.constant(input_enc[:, [i]]),
hidden_cell=tf.keras.layers.SimpleRNNCell(
units=order,
Expand All @@ -58,11 +62,12 @@ def test_multivariate_lmu(rng):

for i in range(memory_d):
assert np.allclose(
results[0][..., i * order : (i + 1) * order], results[i + 1], atol=1e-6
results[0][..., i * order : (i + 1) * order], results[i + 1], atol=1e-5
)


def test_layer_vs_cell(rng):
@pytest.mark.parametrize("discretizer", ("zoh", "euler"))
def test_layer_vs_cell(rng, discretizer):
memory_d = 4
order = 12
n_steps = 10
Expand All @@ -72,7 +77,11 @@ def test_layer_vs_cell(rng):

lmu_cell = tf.keras.layers.RNN(
layers.LMUCell(
memory_d, order, n_steps, tf.keras.layers.SimpleRNNCell(units=64)
memory_d,
order,
n_steps,
tf.keras.layers.SimpleRNNCell(units=64),
discretizer=discretizer,
),
return_sequences=True,
)
Expand All @@ -83,6 +92,7 @@ def test_layer_vs_cell(rng):
order,
n_steps,
tf.keras.layers.SimpleRNNCell(units=64),
discretizer=discretizer,
return_sequences=True,
)
lmu_layer.build(inp.shape)
Expand All @@ -99,7 +109,9 @@ def test_layer_vs_cell(rng):
assert np.allclose(cell_out, layer_out)


def test_save_load_weights(rng, tmp_path):
@pytest.mark.parametrize("discretizer", ("zoh", "euler"))
@pytest.mark.parametrize("train_theta", (True, False))
def test_save_load_weights(rng, tmp_path, discretizer, train_theta):
memory_d = 4
order = 12
n_steps = 10
Expand All @@ -113,6 +125,8 @@ def test_save_load_weights(rng, tmp_path):
order,
n_steps,
tf.keras.layers.SimpleRNNCell(units=64),
discretizer=discretizer,
train_theta=train_theta,
return_sequences=True,
)(inp)
model0 = tf.keras.Model(inp, lmu0)
Expand All @@ -123,6 +137,8 @@ def test_save_load_weights(rng, tmp_path):
order,
n_steps,
tf.keras.layers.SimpleRNNCell(units=64),
discretizer=discretizer,
train_theta=train_theta,
return_sequences=True,
)(inp)
model1 = tf.keras.Model(inp, lmu1)
Expand All @@ -137,12 +153,21 @@ def test_save_load_weights(rng, tmp_path):
assert np.allclose(out0, out2)


@pytest.mark.parametrize("discretizer", ("zoh", "euler"))
@pytest.mark.parametrize("train_theta", (True, False))
@pytest.mark.parametrize("mode", ("cell", "lmu", "fft"))
def test_save_load_serialization(mode, tmp_path):
def test_save_load_serialization(mode, tmp_path, train_theta, discretizer):
inp = tf.keras.Input((10 if mode == "fft" else None, 32))
if mode == "cell":
out = tf.keras.layers.RNN(
layers.LMUCell(1, 2, 3, tf.keras.layers.SimpleRNNCell(4)),
layers.LMUCell(
1,
2,
3,
tf.keras.layers.SimpleRNNCell(4),
train_theta=train_theta,
discretizer=discretizer,
),
return_sequences=True,
)(inp)
elif mode == "lmu":
Expand All @@ -153,13 +178,17 @@ def test_save_load_serialization(mode, tmp_path):
tf.keras.layers.SimpleRNNCell(4),
return_sequences=True,
memory_to_memory=True,
train_theta=train_theta,
discretizer=discretizer,
)(inp)
elif mode == "fft":
out = layers.LMUFFT(
1,
2,
3,
tf.keras.layers.SimpleRNNCell(4),
train_theta=train_theta,
discretizer=discretizer,
return_sequences=True,
)(inp)

Expand All @@ -185,19 +214,36 @@ def test_save_load_serialization(mode, tmp_path):
@pytest.mark.parametrize(
"hidden_cell", (None, tf.keras.layers.Dense(4), tf.keras.layers.SimpleRNNCell(4))
)
def test_fft(return_sequences, hidden_cell, rng):
@pytest.mark.parametrize("discretizer", ("zoh", "euler"))
@pytest.mark.parametrize("train_theta", (True, False))
def test_fft(return_sequences, hidden_cell, rng, train_theta, discretizer):
x = rng.uniform(-1, 1, size=(2, 10, 32))

rnn_layer = tf.keras.layers.RNN(
layers.LMUCell(1, 2, 3, hidden_cell),
layers.LMUCell(
1,
2,
3,
hidden_cell,
discretizer=discretizer,
train_theta=train_theta,
),
return_sequences=return_sequences,
)
rnn_out = rnn_layer(x)

fft_layer = layers.LMUFFT(1, 2, 3, hidden_cell, return_sequences=return_sequences)
fft_layer = layers.LMUFFT(
1,
2,
3,
hidden_cell,
return_sequences=return_sequences,
discretizer=discretizer,
train_theta=train_theta,
)
fft_layer.build(x.shape)
fft_layer.kernel.assign(rnn_layer.cell.kernel)
fft_out = fft_layer(x)
fft_out = fft_layer(x, training=None)

assert np.allclose(rnn_out, fft_out, atol=2e-6)

Expand Down Expand Up @@ -385,16 +431,23 @@ def test_dropout(
assert np.allclose(y0, y1)


@pytest.mark.parametrize("train_theta", (True, False))
@pytest.mark.parametrize("discretizer", ("zoh", "euler"))
@pytest.mark.parametrize("fft", (True, False))
def test_fit(fft):
def test_fit(fft, discretizer, train_theta):
order = 256
theta_init = (1 if discretizer == "zoh" else 12) * order
lmu_layer = layers.LMU(
memory_d=1,
order=256,
theta=784,
hidden_cell=tf.keras.layers.SimpleRNNCell(units=10),
order=order,
theta=theta_init,
train_theta=train_theta,
hidden_cell=tf.keras.layers.SimpleRNNCell(units=30),
hidden_to_memory=not fft,
memory_to_memory=not fft,
input_to_hidden=not fft,
discretizer=discretizer,
kernel_initializer="zeros",
)

inputs = tf.keras.layers.Input((5 if fft else None, 10))
Expand All @@ -409,7 +462,7 @@ def test_fit(fft):
y_test = tf.ones((5, 1))
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.RMSprop(),
optimizer=tf.keras.optimizers.Adam(),
metrics=["accuracy"],
)

Expand All @@ -423,3 +476,127 @@ def test_fit(fft):
assert isinstance(lmu_layer.layer, tf.keras.layers.RNN)

assert acc == 1.0


def test_discretizer_types():
with pytest.raises(ValueError, match="discretizer must be 'zoh' or 'euler'"):
layers.LMUCell(
memory_d=1, order=256, theta=784, hidden_cell=None, discretizer="test"
)


def test_cont2discrete_zoh():
lmu_cell = layers.LMUCell(
memory_d=5, order=64, theta=784, hidden_cell=None, discretizer="zoh"
)

lmu_cell.build((2, 40))

A, B, _, _, _ = cont2discrete(
(
lmu_cell.CONST_A * lmu_cell.theta_inv,
lmu_cell.CONST_B * lmu_cell.theta_inv,
np.ones(256),
np.ones(256),
),
dt=1.0,
method="zoh",
)

# slight numerical rounding differences between scipy and tf implementation
assert np.allclose(A.T, lmu_cell._A, atol=1e-6)
assert np.allclose(B.T, lmu_cell._B, atol=1e-6)


@pytest.mark.parametrize("hidden_cell", (None, tf.keras.layers.SimpleRNNCell(units=30)))
@pytest.mark.parametrize("discretizer", ("euler", "zoh"))
@pytest.mark.parametrize("train_theta", (True, False))
def test_theta_AB_updates(discretizer, hidden_cell, train_theta, tmp_path):
# typically you need at least 4*order as minimum value of theta with euler
# but we increase it even further since we only have a single memory tape
# and the network is smaller, which might cause even more numerical
# instabilities.
order = 64
theta_init = 10 * order

# create model
lmu_layer = layers.LMU(
memory_d=1,
order=order,
theta=theta_init,
train_theta=train_theta,
hidden_cell=hidden_cell,
discretizer=discretizer,
hidden_to_memory=hidden_cell is not None,
memory_to_memory=hidden_cell is not None,
input_to_hidden=hidden_cell is not None,
)

inputs = tf.keras.layers.Input((None, 20))
lmu = lmu_layer(inputs)
outputs = tf.keras.layers.Dense(10, activation="sigmoid")(lmu)

model = tf.keras.Model(inputs=inputs, outputs=outputs)

model.compile(
loss=tf.keras.losses.MeanSquaredError(),
optimizer=tf.keras.optimizers.Adam(),
metrics=["accuracy"],
)

# determine index of weights
for index, weight in enumerate(model.layers[1].weights):
if "A" in weight.name:
index_A = index
if "B" in weight.name:
index_B = index
if "theta_kernel" in weight.name:
index_t = index

# store initial value of A and B
A_init, B_init = (
model.layers[1].weights[index_A].numpy(),
model.layers[1].weights[index_B].numpy(),
)

# make sure theta_inv is set correctly to initital value
assert 1 / theta_init == (
model.layers[1].weights[index_t].numpy()
if train_theta
else (model.layers[1]).layer.cell.theta_inv
)

# fit model on some data
x_train = tf.random.uniform((100, 5, 20), dtype=tf.float32)
y_train = tf.random.uniform((100, 10), dtype=tf.float32)

model.fit(x_train, y_train, epochs=10, validation_split=0.2)

# make sure A and B got updated if trained with zoh
assert np.any(A_init != model.layers[1].weights[index_A].numpy()) == (
train_theta and discretizer == "zoh"
)
assert np.any(B_init != model.layers[1].weights[index_B].numpy()) == (
train_theta and discretizer == "zoh"
)

# make sure theta kernel got updated if trained
if train_theta:
assert model.layers[1].weights[index_t].numpy() != (1 / theta_init)

# save model and make sure you get same outputs, that is, correct values of A
# B and theta were stored after training
if hidden_cell:
model.save(str(tmp_path))

model_load = tf.keras.models.load_model(
str(tmp_path),
custom_objects={
"LMU": layers.LMU,
},
)

assert np.allclose(
model.predict(np.ones((32, 10, 20))),
model_load.predict(np.ones((32, 10, 20))),
)

0 comments on commit 9f6b553

Please sign in to comment.