Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Torch Inferencer and Update Openvino and Gradio Inferencers. #453

Merged
merged 18 commits into from
Jul 29, 2022
Merged
2 changes: 1 addition & 1 deletion anomalib/data/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
"""
super().__init__()

self.image_filenames = get_image_filenames(path)
self.image_filenames = [str(filename) for filename in get_image_filenames(path)]
samet-akcay marked this conversation as resolved.
Show resolved Hide resolved

if pre_process is None:
self.pre_process = PreProcessor(transform_config, image_size)
Expand Down
11 changes: 9 additions & 2 deletions anomalib/data/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

from .download import DownloadProgressBar, hash_check
from .generators import random_2d_perlin
from .image import get_image_filenames, read_image
from .image import generate_output_image_filename, get_image_filenames, read_image

__all__ = ["get_image_filenames", "hash_check", "random_2d_perlin", "read_image", "DownloadProgressBar"]
__all__ = [
"generate_output_image_filename",
"get_image_filenames",
"hash_check",
"random_2d_perlin",
"read_image",
"DownloadProgressBar",
]
110 changes: 105 additions & 5 deletions anomalib/data/utils/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# and limitations under the License.

import math
import warnings
from pathlib import Path
from typing import List, Union

Expand All @@ -25,33 +26,132 @@
from torchvision.datasets.folder import IMG_EXTENSIONS


def get_image_filenames(path: Union[str, Path]) -> List[str]:
def get_image_filenames(path: Union[str, Path]) -> List[Path]:
samet-akcay marked this conversation as resolved.
Show resolved Hide resolved
"""Get image filenames.

Args:
path (Union[str, Path]): Path to image or image-folder.

Returns:
List[str]: List of image filenames
List[Path]: List of image filenames

"""
image_filenames: List[str]
image_filenames: List[Path]

if isinstance(path, str):
path = Path(path)

if path.is_file() and path.suffix in IMG_EXTENSIONS:
image_filenames = [str(path)]
image_filenames = [path]

if path.is_dir():
image_filenames = [str(p) for p in path.glob("**/*") if p.suffix in IMG_EXTENSIONS]
image_filenames = [p for p in path.glob("**/*") if p.suffix in IMG_EXTENSIONS]

if len(image_filenames) == 0:
raise ValueError(f"Found 0 images in {path}")

return image_filenames


def duplicate_filename(path: Union[str, Path]) -> Path:
djdameln marked this conversation as resolved.
Show resolved Hide resolved
"""Check and duplicate filename.

This function checks the path and adds a suffix if it already exists on the file system.

Args:
path (Union[str, Path]): Input Path

Examples:
>>> path = Path("datasets/MVTec/bottle/test/broken_large/000.png")
>>> path.exists()
True

If we pass this to ``duplicate_filename`` function we would get the following:
>>> duplicate_filename(path)
PosixPath('datasets/MVTec/bottle/test/broken_large/000_1.png')

Returns:
Path: Duplicated output path.
"""

if isinstance(path, str):
path = Path(path)

i = 0
while True:
duplicated_path = path if i == 0 else path.parent / (path.stem + f"_{i}" + path.suffix)
if not duplicated_path.exists():
break
i += 1

return duplicated_path


def generate_output_image_filename(input_path: Union[str, Path], output_path: Union[str, Path]) -> Path:
"""Generate an output filename to save the inference image.

This function generates an output filaname by checking the input and output filenames. Input path is
the input to infer, and output path is the path to save the output predictions specified by the user.

The function expects ``input_path`` to always be a file, not a directory. ``output_path`` could be a
filename or directory. If it is a filename, the function checks if the specified filename exists on
the file system. If yes, the function calls ``duplicate_filename`` to duplicate the filename to avoid
overwriting the existing file. If ``output_path`` is a directory, this function adds the parent and
filenames of ``input_path`` to ``output_path``.

Args:
input_path (Union[str, Path]): Path to the input image to infer.
output_path (Union[str, Path]): Path to output to save the predictions.
Could be a filename or a directory.

Examples:
>>> input_path = Path("datasets/MVTec/bottle/test/broken_large/000.png")
>>> output_path = Path("datasets/MVTec/bottle/test/broken_large/000.png")
>>> generate_output_image_filename(input_path, output_path)
PosixPath('datasets/MVTec/bottle/test/broken_large/000_1.png')

>>> input_path = Path("datasets/MVTec/bottle/test/broken_large/000.png")
>>> output_path = Path("results/images")
>>> generate_output_image_filename(input_path, output_path)
PosixPath('results/images/broken_large/000.png')

Raises:
ValueError: When the ``input_path`` is not a file.

Returns:
Path: The output filename to save the output predictions from the inferencer.
"""

if isinstance(input_path, str):
input_path = Path(input_path)

if isinstance(output_path, str):
output_path = Path(output_path)

# This function expects an ``input_path`` that is a file. This is to check if output_path
if input_path.is_file() is False:
raise ValueError("input_path is expected to be a file to generate a proper output filename.")

file_path: Path
if output_path.suffix == "":
# If the output is a directory, then add parent directory name
# and filename to the path. This is to ensure we do not overwrite
# images and organize based on the categories.
file_path = output_path / input_path.parent.name / input_path.name
else:
file_path = output_path

# This new ``file_path`` might contain a directory path yet to be created.
# Create the parent directory to avoid such cases.
file_path.parent.mkdir(parents=True, exist_ok=True)

if file_path.is_file():
warnings.warn(f"{output_path} already exists. Renaming the file to avoid overwriting.")
file_path = duplicate_filename(file_path)

return file_path


def read_image(path: Union[str, Path]) -> np.ndarray:
"""Read image from disk in RGB format.

Expand Down
47 changes: 19 additions & 28 deletions anomalib/deploy/inferencers/base_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Optional, Tuple, Union, cast
from typing import Any, Dict, Optional, Tuple, Union, cast

import cv2
import numpy as np
Expand All @@ -26,7 +26,7 @@
from torch import Tensor

from anomalib.data.utils import read_image
from anomalib.post_processing import compute_mask, superimpose_anomaly_map
from anomalib.post_processing import ImageResult, compute_mask
from anomalib.post_processing.normalization.cdf import normalize as normalize_cdf
from anomalib.post_processing.normalization.cdf import standardize
from anomalib.post_processing.normalization.min_max import (
Expand Down Expand Up @@ -57,18 +57,16 @@ def forward(self, image: Union[np.ndarray, Tensor]) -> Union[np.ndarray, Tensor]

@abstractmethod
def post_process(
self, predictions: Union[np.ndarray, Tensor], meta_data: Optional[Dict]
) -> Tuple[np.ndarray, float]:
self, predictions: Union[np.ndarray, Tensor], meta_data: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Post-Process."""
raise NotImplementedError

def predict(
self,
image: Union[str, np.ndarray, Path],
superimpose: bool = True,
meta_data: Optional[dict] = None,
overlay_mask: bool = False,
) -> Tuple[np.ndarray, float]:
meta_data: Optional[Dict[str, Any]] = None,
) -> ImageResult:
"""Perform a prediction for a given input image.

The main workflow is (i) pre-processing, (ii) forward-pass, (iii) post-process.
Expand All @@ -77,14 +75,10 @@ def predict(
image (Union[str, np.ndarray]): Input image whose output is to be predicted.
It could be either a path to image or numpy array itself.

superimpose (bool): If this is set to True, output predictions
will be superimposed onto the original image. If false, `predict`
method will return the raw heatmap.

overlay_mask (bool): If this is set to True, output segmentation mask on top of image.
meta_data: Meta-data information such as shape, threshold.

Returns:
np.ndarray: Output predictions to be visualized.
ImageResult: Prediction results to be visualized.
"""
if meta_data is None:
if hasattr(self, "meta_data"):
Expand All @@ -99,16 +93,15 @@ def predict(

processed_image = self.pre_process(image_arr)
predictions = self.forward(processed_image)
anomaly_map, pred_scores = self.post_process(predictions, meta_data=meta_data)

# Overlay segmentation mask using raw predictions
if overlay_mask and meta_data is not None:
image_arr = self._superimpose_segmentation_mask(meta_data, anomaly_map, image_arr)

if superimpose is True:
anomaly_map = superimpose_anomaly_map(anomaly_map, image_arr)
output = self.post_process(predictions, meta_data=meta_data)

return anomaly_map, pred_scores
return ImageResult(
image=image_arr,
pred_score=output["pred_score"],
pred_label=output["pred_label"],
anomaly_map=output["anomaly_map"],
pred_mask=output["pred_mask"],
)

def _superimpose_segmentation_mask(self, meta_data: dict, anomaly_map: np.ndarray, image: np.ndarray):
"""Superimpose segmentation mask on top of image.
Expand All @@ -130,14 +123,14 @@ def _superimpose_segmentation_mask(self, meta_data: dict, anomaly_map: np.ndarra
image[outlines] = [255, 0, 0]
return image

def __call__(self, image: np.ndarray) -> Tuple[np.ndarray, float]:
def __call__(self, image: np.ndarray) -> ImageResult:
"""Call predict on the Image.

Args:
image (np.ndarray): Input Image

Returns:
np.ndarray: Output predictions to be visualized
ImageResult: Prediction results to be visualized.
"""
return self.predict(image)

Expand Down Expand Up @@ -185,9 +178,7 @@ def _normalize(

return anomaly_maps, float(pred_scores)

def _load_meta_data(
self, path: Optional[Union[str, Path]] = None
) -> Union[DictConfig, Dict[str, Union[float, np.ndarray, Tensor]]]:
def _load_meta_data(self, path: Optional[Union[str, Path]] = None) -> Union[DictConfig, Dict]:
"""Loads the meta data from the given path.

Args:
Expand Down
67 changes: 54 additions & 13 deletions anomalib/deploy/inferencers/openvino_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

djdameln marked this conversation as resolved.
Show resolved Hide resolved
from importlib.util import find_spec
from pathlib import Path
from typing import Dict, Optional, Tuple, Union
from typing import Any, Dict, Optional, Tuple, Union

import cv2
import numpy as np
from omegaconf import DictConfig, ListConfig

from anomalib.config import get_configurable_parameters
from anomalib.pre_processing import PreProcessor

from .base_inference import Inferencer
Expand All @@ -36,19 +37,26 @@ class OpenVINOInferencer(Inferencer):
"""OpenVINO implementation for the inference.

Args:
config (DictConfig): Configurable parameters that are used
config (Union[str, Path, DictConfig, ListConfig]): Configurable parameters that are used
during the training stage.
path (Union[str, Path]): Path to the openvino onnx, xml or bin file.
meta_data_path (Union[str, Path], optional): Path to metadata file. Defaults to None.
"""

def __init__(
self,
config: Union[DictConfig, ListConfig],
config: Union[str, Path, DictConfig, ListConfig],
path: Union[str, Path, Tuple[bytes, bytes]],
meta_data_path: Union[str, Path] = None,
):
self.config = config
# Check and load the configuration
if isinstance(config, (str, Path)):
self.config = get_configurable_parameters(config_path=config)
elif isinstance(config, (DictConfig, ListConfig)):
self.config = config
else:
raise ValueError(f"Unknown config type {type(config)}")

self.input_blob, self.output_blob, self.network = self.load_model(path)
self.meta_data = super()._load_meta_data(meta_data_path)

Expand Down Expand Up @@ -122,7 +130,7 @@ def forward(self, image: np.ndarray) -> np.ndarray:

def post_process(
self, predictions: np.ndarray, meta_data: Optional[Union[Dict, DictConfig]] = None
) -> Tuple[np.ndarray, float]:
) -> Dict[str, Any]:
"""Post process the output predictions.

Args:
Expand All @@ -132,18 +140,51 @@ def post_process(
Defaults to None.

Returns:
np.ndarray: Post processed predictions that are ready to be visualized.
Dict[str, Any]: Post processed prediction results.
"""
if meta_data is None:
meta_data = self.meta_data

predictions = predictions[self.output_blob]
anomaly_map = predictions.squeeze()
pred_score = anomaly_map.reshape(-1).max()

anomaly_map, pred_score = self._normalize(anomaly_map, pred_score, meta_data)
# Initialize the result variables.
anomaly_map: Optional[np.ndarray] = None
pred_label: Optional[float] = None
pred_mask: Optional[float] = None

if "image_shape" in meta_data and anomaly_map.shape != meta_data["image_shape"]:
anomaly_map = cv2.resize(anomaly_map, meta_data["image_shape"])

return anomaly_map, float(pred_score)
# If predictions returns a single value, this means that the task is
# classification, and the value is the classification prediction score.
if len(predictions.shape) == 1:
task = "classification"
pred_score = predictions.item()
else:
task = "segmentation"
anomaly_map = predictions.squeeze()
pred_score = anomaly_map.reshape(-1).max()

# Common practice in anomaly detection is to assign anomalous
# label to the prediction if the prediction score is greater
# than the image threshold.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we warn users if thresholds are not present in meta_data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would bring up another discussion. Do you think we should check if they are in meta_data?

For example,

if "image_threshold" in meta_data:
    pred_label = pred_score >= meta_data["image_threshold"]

We cannot calculate the pred-label if we don't have this. The question is whether we should have this if statement here, and for the rest of the computation?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default threshold is 0.5 so if we don't have image_threshold in meta_data then we can throw a warning and reduce the if statement to pred_label = pred_score >= meta_data.get("image_threshold", 0.5)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we throw a warning it should be fine. Because the users need to be made aware that there is a problem with the metadata file.

if "image_threshold" in meta_data:
pred_label = pred_score >= meta_data["image_threshold"]

if task == "segmentation":
if "pixel_threshold" in meta_data:
pred_mask = (anomaly_map >= meta_data["pixel_threshold"]).astype(np.uint8)

anomaly_map, pred_score = self._normalize(anomaly_map, pred_score, meta_data)

if "image_shape" in meta_data and anomaly_map.shape != meta_data["image_shape"]:
image_height = meta_data["image_shape"][0]
image_width = meta_data["image_shape"][1]
anomaly_map = cv2.resize(anomaly_map, (image_width, image_height))

if pred_mask is not None:
pred_mask = cv2.resize(pred_mask, (image_width, image_height))

return {
"anomaly_map": anomaly_map,
"pred_label": pred_label,
"pred_score": pred_score,
"pred_mask": pred_mask,
}
Loading