Skip to content

Commit

Permalink
Tensorflow 2.11 support (#1016)
Browse files Browse the repository at this point in the history
* Tensorflow 2.11 support

* lint

* Use only legacy optimizers in multi-optimizers

* update notebook to use legacy optimizer

* use legacy optimizers in horovod

* skip testing incremental training notebook for tf 2.8

* temporarily disable test_pickle

* temporarily disable test_pickle

---------

Co-authored-by: rnyak <ronayak@hotmail.com>
  • Loading branch information
edknv and rnyak authored Mar 29, 2023
1 parent 479246c commit e6b4bbd
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tensorflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
matrix:
python-version: [3.8]
os: [ubuntu-latest]
tensorflow-version: ["~=2.8.0", "~=2.9.0", "~=2.10.0"]
tensorflow-version: ["~=2.8.0", "~=2.9.0", "~=2.10.0", "~=2.11.0"]

steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -70,7 +70,7 @@ jobs:
matrix:
python-version: [ 3.8 ]
os: [ ubuntu-latest ]
tensorflow-version: ["~=2.8.0", "~=2.9.0", "~=2.10.0"]
tensorflow-version: ["~=2.8.0", "~=2.9.0", "~=2.10.0", "~=2.11.0"]

steps:
- uses: actions/checkout@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,8 +650,8 @@
"outputs": [],
"source": [
"optimizer = mm.MultiOptimizer(\n",
" default_optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),\n",
" optimizers_and_blocks=[mm.OptimizerBlocks(tf.keras.optimizers.Adam(learning_rate=0.001),\n",
" default_optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.01),\n",
" optimizers_and_blocks=[mm.OptimizerBlocks(tf.keras.optimizers.legacy.Adam(learning_rate=0.001),\n",
" [item_embeddings, query_embeddings])]\n",
") "
]
Expand Down
62 changes: 50 additions & 12 deletions merlin/models/tf/blocks/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import numpy as np
import tensorflow as tf
from packaging import version

import merlin.models.tf as ml
from merlin.models.tf.core.base import Block
Expand All @@ -31,6 +32,12 @@
FloatTensorLike = Union[tf.Tensor, float, np.float16, np.float32, np.float64]


if version.parse(tf.__version__) < version.parse("2.11.0"):
keras_optimizers = tf.keras.optimizers
else:
keras_optimizers = tf.keras.optimizers.legacy


@dataclass
class OptimizerBlocks:
"""dataclass for a pair of optimizer and blocks that the optimizer should apply to.
Expand All @@ -39,13 +46,18 @@ class OptimizerBlocks:
ml.OptimizerBlocks("adam", item_tower)
"""

optimizer: Union[str, tf.keras.optimizers.Optimizer]
optimizer: Union[str, keras_optimizers.Optimizer]
blocks: Sequence[Block]

def get_config(self):
"""return a tuple of serialized keras objects"""
optimizer_config = tf.keras.utils.serialize_keras_object(self.optimizer)
if version.parse(tf.__version__) >= version.parse("2.11.0") and isinstance(
self.optimizer, tf.keras.optimizers.legacy.Optimizer
):
optimizer_config["use_legacy_optimizer"] = True
return (
tf.keras.utils.serialize_keras_object(self.optimizer),
optimizer_config,
[tf.keras.utils.serialize_keras_object(block) for block in self.blocks],
)

Expand All @@ -58,7 +70,7 @@ def from_config(cls, config):


@tf.keras.utils.register_keras_serializable(package="merlin.models")
class MultiOptimizer(tf.keras.optimizers.Optimizer):
class MultiOptimizer(keras_optimizers.Optimizer):
"""An optimizer that composes multiple individual optimizers.
It allows different optimizers to be applied to different subsets of the model's variables. For
Expand All @@ -80,8 +92,8 @@ class MultiOptimizer(tf.keras.optimizers.Optimizer):
# The third_tower would be assigned the default_optimizer ("adagrad" in this example)
optimizer = ml.MultiOptimizer(default_optimizer="adagrad",
optimizers_and_blocks=[
ml.OptimizerBlocks(tf.keras.optimizers.SGD(), user_tower),
ml.OptimizerBlocks(tf.keras.optimizers.Adam(), item_tower),
ml.OptimizerBlocks(tf.keras.optimizers.legacy.SGD(), user_tower),
ml.OptimizerBlocks(tf.keras.optimizers.legacy.Adam(), item_tower),
])
# The string identification of optimizer is also acceptable, here "sgd" for the third_tower
Expand All @@ -98,7 +110,7 @@ class MultiOptimizer(tf.keras.optimizers.Optimizer):
def __init__(
self,
optimizers_and_blocks: Sequence[OptimizerBlocks],
default_optimizer: Union[str, tf.keras.optimizers.Optimizer] = "rmsprop",
default_optimizer: Union[str, keras_optimizers.Optimizer] = "rmsprop",
name: str = "MultiOptimizer",
**kwargs,
):
Expand All @@ -110,7 +122,7 @@ def __init__(
List of OptimizerBlocks(dataclass), the OptimizerBlocks contains two items, one is
optimizer, another one is a list of blocks or a block that the optimizer should apply
to. See 'class OptimizerBlocks'
default_optimizer: Union[str, tf.keras.optimizers.Optimizer]
default_optimizer: Union[str, tf.keras.optimizers.legacy.Optimizer]
Default optimizer for the rest variables not specified in optimizers_and_blocks, by
default "rmsprop".
name:str
Expand All @@ -123,7 +135,22 @@ def __init__(
self.default_optimizer = tf.keras.optimizers.get(default_optimizer)
self.optimizers_and_blocks = []
for i, pair in enumerate(optimizers_and_blocks):
pair.optimizer = tf.keras.optimizers.get(pair.optimizer)
if version.parse(tf.__version__) < version.parse("2.11.0"):
pair.optimizer = tf.keras.optimizers.get(pair.optimizer)
else:
if not (
isinstance(pair.optimizer, str)
or isinstance(pair.optimizer, tf.keras.optimizers.legacy.Optimizer)
):
raise ValueError(
"Optimizers must be a str or an instance of "
"tf.keras.optimizers.legacy.Optimizer with Tensorflow >= 2.11."
)
pair.optimizer = tf.keras.optimizers.get(
pair.optimizer,
use_legacy_optimizer=True,
)

self._track_trackable(pair.optimizer, name=f"Optimizer{i}")
pair.blocks = [pair.blocks] if isinstance(pair.blocks, Block) else pair.blocks
self.optimizers_and_blocks.append(pair)
Expand Down Expand Up @@ -289,18 +316,26 @@ def weights(self) -> List[tf.Variable]:
"""Returns the optimizer's variables."""
weights = []
for optimizer_blocks in self.optimizers_and_blocks:
weights += optimizer_blocks.optimizer.weights
optimizer = optimizer_blocks.optimizer
if hasattr(optimizer, "weights"): # Tensorflow < 2.11
weights += optimizer_blocks.optimizer.weights
elif hasattr(optimizer, "variables") and callable(
optimizer.variables
): # Tensorflow >= 2.11
weights += optimizer_blocks.optimizer.variables()
else:
raise AttributeError(f"Unable to get weights from {optimizer.__class__.__name__}")
return weights

@property
def optimizers(self) -> List[tf.keras.optimizers.Optimizer]:
def optimizers(self) -> List[keras_optimizers.Optimizer]:
"""Returns the optimizers in MultiOptimizer (in the original order). Note: default_optimizer
is included here"""
return [pair.optimizer for pair in self.optimizers_and_blocks] + [self.default_optimizer]


@tf.keras.utils.register_keras_serializable(package="merlin.models")
class LazyAdam(tf.keras.optimizers.Adam):
class LazyAdam(keras_optimizers.Adam):
"""Variant of the Adam optimizer that handles sparse updates more efficiently.
The original Adam algorithm maintains two moving-average accumulators for each trainable
Expand Down Expand Up @@ -335,7 +370,7 @@ def __init__(
----------
learning_rate: Union[FloatTensorLike, Callable]
A `Tensor` or a floating point value. or a schedule that is a
`tf.keras.optimizers.schedules.LearningRateSchedule` The learning rate.
`tf.keras.optimizers.legacy.schedules.LearningRateSchedule` The learning rate.
FloatTensorLike = Union[tf.Tensor, float, np.float16, np.float32, np.float64]
beta_1: FloatTensorLike
A `float` value or a constant `float` tensor. The exponential decay rate for the 1st
Expand Down Expand Up @@ -397,6 +432,9 @@ def _resource_apply_sparse(self, grad, var, indices):

return tf.group(*[var_update_op, m_update_op, v_update_op])

def get_weights(self):
return self.variables()

def _resource_scatter_update(self, resource, indices, update):
return self._resource_scatter_operate(
resource, indices, update, tf.raw_ops.ResourceScatterUpdate
Expand Down
5 changes: 4 additions & 1 deletion merlin/models/tf/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,10 @@ def _create_single_distributed_optimizer(opt):

return hvd.DistributedOptimizer(opt)

optimizer = tf.keras.optimizers.get(optimizer)
if version.parse(tf.__version__) < version.parse("2.11.0"):
optimizer = tf.keras.optimizers.get(optimizer)
else:
optimizer = tf.keras.optimizers.get(optimizer, use_legacy_optimizer=True)

if hvd_installed and hvd.size() > 1:
if optimizer.__module__.startswith("horovod"):
Expand Down
2 changes: 1 addition & 1 deletion requirements/test.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-r dev.txt
-r pytorch.txt
-r tensorflow.txt

tensorflow<2.11
numpy<1.24
63 changes: 47 additions & 16 deletions tests/unit/tf/blocks/test_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy as np
import pytest
import tensorflow as tf
from packaging import version
from tensorflow.test import TestCase

import merlin.models.tf as ml
Expand All @@ -28,6 +28,11 @@
from merlin.models.utils.schema_utils import create_categorical_column
from merlin.schema import Schema, Tags

if version.parse(tf.__version__) < version.parse("2.11.0"):
keras_optimizers = tf.keras.optimizers
else:
keras_optimizers = tf.keras.optimizers.legacy


def generate_two_layers():
initializer_first_layer = tf.constant_initializer(np.ones((3, 4)))
Expand All @@ -44,14 +49,37 @@ def generate_two_layers():
return first_layer, second_layer


@pytest.mark.skipif
@pytest.mark.parametrize(
"optimizers",
[
("sgd", keras_optimizers.Adagrad()),
(keras_optimizers.SGD(), "adam"),
],
)
def test_new_optimizers_raise_error(optimizers):
layers = generate_two_layers()
with pytest.raises(ValueError) as excinfo:
ml.MultiOptimizer(
default_optimizer=optimizers[0],
optimizers_and_blocks=[
ml.OptimizerBlocks(optimizers[0], layers[0]),
ml.OptimizerBlocks(optimizers[1], layers[1]),
],
)
assert "Optimizers must be an instance of tf.keras.optimizers.legacy.Optimizer" in str(
excinfo.value
)


@pytest.mark.parametrize(
"optimizers",
[
("sgd", "adam"),
("rmsprop", "sgd"),
("adam", "adagrad"),
("adagrad", "rmsprop"),
(tf.keras.optimizers.SGD(), tf.keras.optimizers.Adagrad()),
(keras_optimizers.SGD(), keras_optimizers.Adagrad()),
],
)
def test_optimizers(optimizers):
Expand Down Expand Up @@ -120,8 +148,8 @@ def test_model_with_multi_optimizers(ecommerce_data, run_eagerly):
multi_optimizers = ml.MultiOptimizer(
default_optimizer="adam",
optimizers_and_blocks=[
ml.OptimizerBlocks(tf.keras.optimizers.SGD(), user_tower),
ml.OptimizerBlocks(tf.keras.optimizers.Adam(), item_tower),
ml.OptimizerBlocks(keras_optimizers.SGD(), user_tower),
ml.OptimizerBlocks(keras_optimizers.Adam(), item_tower),
],
)
testing_utils.model_test(
Expand All @@ -141,8 +169,8 @@ def test_multi_optimizer_list_input(ecommerce_data, run_eagerly):
model = ml.Model(two_tower, ml.BinaryClassificationTask("click"))
multi_optimizers = ml.MultiOptimizer(
optimizers_and_blocks=[
ml.OptimizerBlocks(tf.keras.optimizers.SGD(), user_tower),
ml.OptimizerBlocks(tf.keras.optimizers.Adam(), [item_tower, third_tower]),
ml.OptimizerBlocks(keras_optimizers.SGD(), user_tower),
ml.OptimizerBlocks(keras_optimizers.Adam(), [item_tower, third_tower]),
],
)
testing_utils.model_test(
Expand All @@ -163,8 +191,8 @@ def test_multi_optimizer_add(ecommerce_data, run_eagerly):
multi_optimizers = ml.MultiOptimizer(
default_optimizer="adam",
optimizers_and_blocks=[
ml.OptimizerBlocks(tf.keras.optimizers.SGD(), user_tower),
ml.OptimizerBlocks(tf.keras.optimizers.Adam(), item_tower),
ml.OptimizerBlocks(keras_optimizers.SGD(), user_tower),
ml.OptimizerBlocks(keras_optimizers.Adam(), item_tower),
],
)
multi_optimizers.add(ml.OptimizerBlocks("adagrad", third_tower))
Expand All @@ -180,7 +208,7 @@ def test_multi_optimizer_add(ecommerce_data, run_eagerly):
("rmsprop", "sgd"),
("adam", "adagrad"),
("adagrad", "rmsprop"),
(tf.keras.optimizers.SGD(), tf.keras.optimizers.Adagrad()),
(keras_optimizers.SGD(), keras_optimizers.Adagrad()),
],
)
def test_multi_optimizers_from_config(ecommerce_data, optimizers):
Expand Down Expand Up @@ -210,7 +238,7 @@ def test_multi_optimizers_from_config(ecommerce_data, optimizers):
"optimizers",
[
("sgd", "adam"),
(tf.keras.optimizers.SGD(), tf.keras.optimizers.Adagrad()),
(keras_optimizers.SGD(), keras_optimizers.Adagrad()),
],
)
def test_multi_optimizers_from_config_list_input(ecommerce_data, optimizers):
Expand Down Expand Up @@ -254,8 +282,8 @@ def test_examples_in_code_comments(ecommerce_data, use_default):
optimizer = ml.MultiOptimizer(
default_optimizer="adagrad",
optimizers_and_blocks=[
ml.OptimizerBlocks(tf.keras.optimizers.SGD(), user_tower),
ml.OptimizerBlocks(tf.keras.optimizers.Adam(), item_tower),
ml.OptimizerBlocks(keras_optimizers.SGD(), user_tower),
ml.OptimizerBlocks(keras_optimizers.Adam(), item_tower),
],
)
else:
Expand Down Expand Up @@ -283,8 +311,8 @@ def test_update_optimizer(ecommerce_data, run_eagerly):
user_tower_1 = ml.InputBlock(schema.select_by_tag(Tags.USER)).connect(ml.MLPBlock([256, 128]))
two_tower = ml.ParallelBlock({"user": user_tower_0, "item": user_tower_1}, aggregation="concat")
model = ml.Model(two_tower, ml.BinaryClassificationTask("click"))
sgd = tf.keras.optimizers.SGD()
adam = tf.keras.optimizers.Adam()
sgd = keras_optimizers.SGD()
adam = keras_optimizers.Adam()
multi_optimizers = ml.MultiOptimizer(
optimizers_and_blocks=[
ml.OptimizerBlocks(adam, user_tower_0),
Expand All @@ -297,7 +325,10 @@ def test_update_optimizer(ecommerce_data, run_eagerly):
testing_utils.model_test(
model, ecommerce_data, run_eagerly=run_eagerly, optimizer=multi_optimizers
)
assert len(lazy_adam.get_weights()) == len(adam.get_weights())
if version.parse(tf.__version__) < version.parse("2.11.0"):
assert len(lazy_adam.get_weights()) == len(adam.get_weights())
else:
assert len(lazy_adam.get_weights()) == len(adam.variables())


def adam_update_numpy(param, g_t, t, m, v, lr=0.001, beta1=0.9, beta2=0.999, epsilon=1e-7):
Expand Down Expand Up @@ -557,7 +588,7 @@ def test_lazy_adam_in_model_with_multi_optimizers(ecommerce_data, run_eagerly):
multi_optimizers = ml.MultiOptimizer(
default_optimizer="adam",
optimizers_and_blocks=[
ml.OptimizerBlocks(tf.keras.optimizers.SGD(), user_tower),
ml.OptimizerBlocks(keras_optimizers.SGD(), user_tower),
ml.OptimizerBlocks(ml.LazyAdam(), item_tower),
],
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Test is currently breaks in TF 2.10

import pytest
import tensorflow as tf
from packaging import version
from testbook import testbook

from tests.conftest import REPO_ROOT
Expand All @@ -14,6 +15,10 @@
execute=False,
)
@pytest.mark.notebook
@pytest.mark.skipif(
version.parse(tf.__version__) < version.parse("2.9.0"),
reason="tf.keras.optimizers.legacy is not available in TF <= 2.8",
)
def test_usecase_incremental_training_layer_freezing(tb):
tb.inject(
"""
Expand Down
Loading

0 comments on commit e6b4bbd

Please sign in to comment.