Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor model implementations #225

Merged
merged 9 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions anomalib/models/cflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
"""Real-Time Unsupervised Anomaly Detection via Conditional Normalizing Flows.

[CW-AD](https://arxiv.org/pdf/2107.12571v1.pdf)
"""
"""Real-Time Unsupervised Anomaly Detection via Conditional Normalizing Flows."""

# Copyright (C) 2020 Intel Corporation
#
Expand All @@ -16,3 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.

from .model import CflowModel

__all__ = ["CflowModel"]
97 changes: 97 additions & 0 deletions anomalib/models/cflow/anomaly_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Anomaly Map Generator for CFlow model implementation."""

# Copyright (C) 2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.

from typing import List, Tuple, Union, cast

import torch
import torch.nn.functional as F
from omegaconf import ListConfig
from torch import Tensor


class AnomalyMapGenerator:
"""Generate Anomaly Heatmap."""

def __init__(
self,
image_size: Union[ListConfig, Tuple],
pool_layers: List[str],
):
self.distance = torch.nn.PairwiseDistance(p=2, keepdim=True)
self.image_size = image_size if isinstance(image_size, tuple) else tuple(image_size)
self.pool_layers: List[str] = pool_layers

def compute_anomaly_map(
self, distribution: Union[List[Tensor], List[List]], height: List[int], width: List[int]
) -> Tensor:
"""Compute the layer map based on likelihood estimation.

Args:
distribution: Probability distribution for each decoder block
height: blocks height
width: blocks width

Returns:
Final Anomaly Map

"""

test_map: List[Tensor] = []
for layer_idx in range(len(self.pool_layers)):
test_norm = torch.tensor(distribution[layer_idx], dtype=torch.double) # pylint: disable=not-callable
test_norm -= torch.max(test_norm) # normalize likelihoods to (-Inf:0] by subtracting a constant
test_prob = torch.exp(test_norm) # convert to probs in range [0:1]
test_mask = test_prob.reshape(-1, height[layer_idx], width[layer_idx])
# upsample
test_map.append(
F.interpolate(
test_mask.unsqueeze(1), size=self.image_size, mode="bilinear", align_corners=True
).squeeze()
)
# score aggregation
score_map = torch.zeros_like(test_map[0])
for layer_idx in range(len(self.pool_layers)):
score_map += test_map[layer_idx]
score_mask = score_map
# invert probs to anomaly scores
anomaly_map = score_mask.max() - score_mask

return anomaly_map

def __call__(self, **kwargs: Union[List[Tensor], List[int], List[List]]) -> Tensor:
"""Returns anomaly_map.

Expects `distribution`, `height` and 'width' keywords to be passed explicitly

Example
>>> anomaly_map_generator = AnomalyMapGenerator(image_size=tuple(hparams.model.input_size),
>>> pool_layers=pool_layers)
>>> output = self.anomaly_map_generator(distribution=dist, height=height, width=width)

Raises:
ValueError: `distribution`, `height` and 'width' keys are not found

Returns:
torch.Tensor: anomaly map
"""
if not ("distribution" in kwargs and "height" in kwargs and "width" in kwargs):
raise KeyError(f"Expected keys `distribution`, `height` and `width`. Found {kwargs.keys()}")

# placate mypy
distribution: List[Tensor] = cast(List[Tensor], kwargs["distribution"])
height: List[int] = cast(List[int], kwargs["height"])
width: List[int] = cast(List[int], kwargs["width"])
return self.compute_anomaly_map(distribution, height, width)
208 changes: 5 additions & 203 deletions anomalib/models/cflow/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,215 +17,17 @@
# See the License for the specific language governing permissions
samet-akcay marked this conversation as resolved.
Show resolved Hide resolved
# and limitations under the License.

from typing import List, Tuple, Union, cast

import einops
import numpy as np
import torch
import torch.nn.functional as F
import torchvision
from omegaconf import DictConfig, ListConfig
from pytorch_lightning.callbacks import EarlyStopping
from torch import Tensor, nn, optim

from anomalib.models.cflow.backbone import cflow_head, positional_encoding_2d
from anomalib.models.components import AnomalyModule, FeatureExtractor

__all__ = ["AnomalyMapGenerator", "CflowModel", "CflowLightning"]


def get_logp(dim_feature_vector: int, p_u: torch.Tensor, logdet_j: torch.Tensor) -> torch.Tensor:
"""Returns the log likelihood estimation.

Args:
dim_feature_vector (int): Dimensions of the condition vector
p_u (torch.Tensor): Random variable u
logdet_j (torch.Tensor): log of determinant of jacobian returned from the invertable decoder

Returns:
torch.Tensor: Log probability
"""
ln_sqrt_2pi = -np.log(np.sqrt(2 * np.pi)) # ln(sqrt(2*pi))
logp = dim_feature_vector * ln_sqrt_2pi - 0.5 * torch.sum(p_u**2, 1) + logdet_j
return logp


class AnomalyMapGenerator:
"""Generate Anomaly Heatmap."""

def __init__(
self,
image_size: Union[ListConfig, Tuple],
pool_layers: List[str],
):
self.distance = torch.nn.PairwiseDistance(p=2, keepdim=True)
self.image_size = image_size if isinstance(image_size, tuple) else tuple(image_size)
self.pool_layers: List[str] = pool_layers

def compute_anomaly_map(
self, distribution: Union[List[Tensor], List[List]], height: List[int], width: List[int]
) -> Tensor:
"""Compute the layer map based on likelihood estimation.

Args:
distribution: Probability distribution for each decoder block
height: blocks height
width: blocks width

Returns:
Final Anomaly Map

"""

test_map: List[Tensor] = []
for layer_idx in range(len(self.pool_layers)):
test_norm = torch.tensor(distribution[layer_idx], dtype=torch.double) # pylint: disable=not-callable
test_norm -= torch.max(test_norm) # normalize likelihoods to (-Inf:0] by subtracting a constant
test_prob = torch.exp(test_norm) # convert to probs in range [0:1]
test_mask = test_prob.reshape(-1, height[layer_idx], width[layer_idx])
# upsample
test_map.append(
F.interpolate(
test_mask.unsqueeze(1), size=self.image_size, mode="bilinear", align_corners=True
).squeeze()
)
# score aggregation
score_map = torch.zeros_like(test_map[0])
for layer_idx in range(len(self.pool_layers)):
score_map += test_map[layer_idx]
score_mask = score_map
# invert probs to anomaly scores
anomaly_map = score_mask.max() - score_mask

return anomaly_map

def __call__(self, **kwargs: Union[List[Tensor], List[int], List[List]]) -> Tensor:
"""Returns anomaly_map.

Expects `distribution`, `height` and 'width' keywords to be passed explicitly

Example
>>> anomaly_map_generator = AnomalyMapGenerator(image_size=tuple(hparams.model.input_size),
>>> pool_layers=pool_layers)
>>> output = self.anomaly_map_generator(distribution=dist, height=height, width=width)

Raises:
ValueError: `distribution`, `height` and 'width' keys are not found

Returns:
torch.Tensor: anomaly map
"""
if not ("distribution" in kwargs and "height" in kwargs and "width" in kwargs):
raise KeyError(f"Expected keys `distribution`, `height` and `width`. Found {kwargs.keys()}")

# placate mypy
distribution: List[Tensor] = cast(List[Tensor], kwargs["distribution"])
height: List[int] = cast(List[int], kwargs["height"])
width: List[int] = cast(List[int], kwargs["width"])
return self.compute_anomaly_map(distribution, height, width)


class CflowModel(nn.Module):
"""CFLOW: Conditional Normalizing Flows."""

def __init__(self, hparams: Union[DictConfig, ListConfig]):
super().__init__()

self.backbone = getattr(torchvision.models, hparams.model.backbone)
self.fiber_batch_size = hparams.dataset.fiber_batch_size
self.condition_vector: int = hparams.model.condition_vector
self.dec_arch = hparams.model.decoder
self.pool_layers = hparams.model.layers

self.encoder = FeatureExtractor(backbone=self.backbone(pretrained=True), layers=self.pool_layers)
self.pool_dims = self.encoder.out_dims
self.decoders = nn.ModuleList(
[
cflow_head(
condition_vector=self.condition_vector,
coupling_blocks=hparams.model.coupling_blocks,
clamp_alpha=hparams.model.clamp_alpha,
n_features=pool_dim,
permute_soft=hparams.model.soft_permutation,
)
for pool_dim in self.pool_dims
]
)

# encoder model is fixed
for parameters in self.encoder.parameters():
parameters.requires_grad = False

self.anomaly_map_generator = AnomalyMapGenerator(
image_size=tuple(hparams.model.input_size), pool_layers=self.pool_layers
)

def forward(self, images):
"""Forward-pass images into the network to extract encoder features and compute probability.

Args:
images: Batch of images.

Returns:
Predicted anomaly maps.

"""

self.encoder.eval()
self.decoders.eval()
with torch.no_grad():
activation = self.encoder(images)

distribution = [torch.Tensor(0).to(images.device) for _ in self.pool_layers]

height: List[int] = []
width: List[int] = []
for layer_idx, layer in enumerate(self.pool_layers):
encoder_activations = activation[layer] # BxCxHxW

batch_size, dim_feature_vector, im_height, im_width = encoder_activations.size()
image_size = im_height * im_width
embedding_length = batch_size * image_size # number of rows in the conditional vector

height.append(im_height)
width.append(im_width)
# repeats positional encoding for the entire batch 1 C H W to B C H W
pos_encoding = einops.repeat(
positional_encoding_2d(self.condition_vector, im_height, im_width).unsqueeze(0),
"b c h w-> (tile b) c h w",
tile=batch_size,
).to(images.device)
c_r = einops.rearrange(pos_encoding, "b c h w -> (b h w) c") # BHWxP
e_r = einops.rearrange(encoder_activations, "b c h w -> (b h w) c") # BHWxC
decoder = self.decoders[layer_idx].to(images.device)

# Sometimes during validation, the last batch E / N is not a whole number. Hence we need to add 1.
# It is assumed that during training that E / N is a whole number as no errors were discovered during
# testing. In case it is observed in the future, we can use only this line and ensure that FIB is at
# least 1 or set `drop_last` in the dataloader to drop the last non-full batch.
fiber_batches = embedding_length // self.fiber_batch_size + int(
embedding_length % self.fiber_batch_size > 0
)

for batch_num in range(fiber_batches): # per-fiber processing
if batch_num < (fiber_batches - 1):
idx = torch.arange(batch_num * self.fiber_batch_size, (batch_num + 1) * self.fiber_batch_size)
else: # When non-full batch is encountered batch_num+1 * N will go out of bounds
idx = torch.arange(batch_num * self.fiber_batch_size, embedding_length)
c_p = c_r[idx] # NxP
e_p = e_r[idx] # NxC
# decoder returns the transformed variable z and the log Jacobian determinant
with torch.no_grad():
p_u, log_jac_det = decoder(e_p, [c_p])
#
decoder_log_prob = get_logp(dim_feature_vector, p_u, log_jac_det)
log_prob = decoder_log_prob / dim_feature_vector # likelihood per dim
distribution[layer_idx] = torch.cat((distribution[layer_idx], log_prob))
from torch import optim

output = self.anomaly_map_generator(distribution=distribution, height=height, width=width)
self.decoders.train()
from anomalib.models.cflow.torch_model import CflowModel
from anomalib.models.cflow.utils import get_logp, positional_encoding_2d
from anomalib.models.components import AnomalyModule

return output.to(images.device)
__all__ = ["CflowLightning"]


class CflowLightning(AnomalyModule):
Expand Down
Loading