Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Augmentation update #1263

Merged
merged 1 commit into from
Aug 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/full/lib/training.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ augmentation module
:undoc-members:
:show-inheritance:

cache module
============

.. automodule:: lib.training.cache
:members:
:undoc-members:
:show-inheritance:


generator module
================

Expand Down
254 changes: 141 additions & 113 deletions lib/align/detected_face.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def __init__(self,
self._landmarks_xy = landmarks_xy
self.thumbnail: Optional[np.ndarray] = None
self.mask = {} if mask is None else mask
self._training_masks: Optional[Tuple[bytes, Tuple[int, int, int]]] = None

self._aligned: Optional[AlignedFace] = None
logger.trace("Initialized %s", self.__class__.__name__) # type: ignore
Expand Down Expand Up @@ -172,59 +173,84 @@ def add_mask(self,
fsmask.add(mask, affine_matrix, interpolator)
self.mask[name] = fsmask

def get_landmark_mask(self, size, area,
aligned=True, centering="face", dilation=0, blur_kernel=0, as_zip=False):
""" Obtain a single channel mask based on the face's landmark points.
def get_landmark_mask(self,
area: Literal["eye", "face", "mouth"],
blur_kernel: int,
dilation: int) -> np.ndarray:
""" Add a :class:`LandmarksMask` to this detected face

Landmark based masks are generated from face Aligned Face landmark points. An aligned
face must be loaded. As the data is coming from the already aligned face, no further mask
cropping is required.

Parameters
----------
size: int or tuple
The size of the aligned mask to retrieve. Should be an `int` if an aligned face is
being requested, or a ('height', 'width') shape tuple if a full frame is being
requested
area: ["mouth", "eyes"]
area: ["face", "mouth", "eye"]
The type of mask to obtain. `face` is a full face mask the others are masks for those
specific areas
aligned: bool, optional
``True`` if the returned mask should be for an aligned face. ``False`` if a full frame
mask should be returned. Default ``True``
centering: ["legacy", "face", "head"], optional
Only used if `aligned`=``True``. The centering for the landmarks based mask. Should be
the same as the centering used for the extracted face that this mask will be applied
to. "legacy" places the nose in the center of the image (the original method for
aligning). "face" aligns for the nose to be in the center of the face (top to bottom)
but the center of the skull for left to right. "head" aligns for the center of the
skull (in 3D space) being the center of the extracted image, with the crop holding the
full head. Default: `"face"`
dilation: int, optional
blur_kernel: int
The size of the kernel for blurring the mask edges
dilation: int
The amount of dilation to apply to the mask. `0` for none. Default: `0`
blur_kernel: int, optional
The kernel size for applying gaussian blur to apply to the mask. `0` for none.
Default: `0`
as_zip: bool, optional
``True`` if the mask should be returned zipped otherwise ``False``

Returns
-------
:class:`numpy.ndarray` or zipped array
The mask as a single channel image of the given :attr:`size` dimension. If
:attr:`as_zip` is ``True`` then the :class:`numpy.ndarray` will be contained within a
zipped container
:class:`numpy.ndarray`
The generated landmarks mask for the selected area
"""
# TODO Face mask generation from landmarks
logger.trace("size: %s, area: %s, aligned: %s, dilation: %s, blur_kernel: %s, as_zip: %s",
size, area, aligned, dilation, blur_kernel, as_zip)
areas = dict(mouth=[slice(48, 60)], eyes=[slice(36, 42), slice(42, 48)])
if aligned:
face = AlignedFace(self.landmarks_xy, centering=centering, size=size)
landmarks = face.landmarks
size = (size, size)
else:
landmarks = self.landmarks_xy
points = [landmarks[zone] for zone in areas[area]] # pylint:disable=unsubscriptable-object
mask = _LandmarksMask(size, points, dilation=dilation, blur_kernel=blur_kernel)
retval = mask.get(as_zip=as_zip)
return retval
logger.trace("area: %s, dilation: %s", area, dilation) # type: ignore
areas = dict(mouth=[slice(48, 60)], eye=[slice(36, 42), slice(42, 48)])
points = [self.aligned.landmarks[zone]
for zone in areas[area]]

lmmask = LandmarksMask(points,
storage_size=self.aligned.size,
storage_centering=self.aligned.centering,
dilation=dilation)
lmmask.set_blur_and_threshold(blur_kernel=blur_kernel)
lmmask.generate_mask(
self.aligned.adjusted_matrix,
self.aligned.interpolators[1])
return lmmask.mask

def store_training_masks(self,
masks: List[Optional[np.ndarray]],
delete_masks: bool = False) -> None:
""" Concatenate and compress the given training masks and store for retrieval.

Parameters
----------
masks: list
A list of training mask. Must be all be uint-8 3D arrays of the same size in
0-255 range
delete_masks: bool, optional
``True`` to delete any of the :class:`Mask` objects owned by this detected face. Use to
free up unrequired memory usage. Default: ``False``
"""
if delete_masks:
del self.mask
self.mask = {}

valid = [msk for msk in masks if msk is not None]
if not valid:
return
combined = np.concatenate(valid, axis=-1)
self._training_masks = (compress(combined), combined.shape)

def get_training_masks(self) -> Optional[np.ndarray]:
""" Obtain the decompressed combined training masks.

Returns
-------
:class:`numpy.ndarray`
A 3D array containing the decompressed training masks as uint8 in 0-255 range if
training masks are present otherwise ``None``
"""
if not self._training_masks:
return None
return np.frombuffer(decompress(self._training_masks[0]),
dtype="uint8").reshape(self._training_masks[1])

def to_alignment(self) -> AlignmentFileDict:
""" Return the detected face formatted for an alignments file
Expand Down Expand Up @@ -412,77 +438,6 @@ def load_aligned(self,
is_legacy=is_aligned and is_legacy)


class _LandmarksMask(): # pylint:disable=too-few-public-methods
""" Create a single channel mask from aligned landmark points.

size: tuple
The (height, width) shape tuple that the mask should be returned as
points: list
A list of landmark points that correspond to the given shape tuple to create
the mask. Each item in the list should be a :class:`numpy.ndarray` that a filled
convex polygon will be created from
dilation: int, optional
The amount of dilation to apply to the mask. `0` for none. Default: `0`
blur_kernel: int, optional
The kernel size for applying gaussian blur to apply to the mask. `0` for none. Default: `0`
"""
def __init__(self, size, points, dilation=0, blur_kernel=0):
logger.trace("Initializing: %s: (size: %s, points: %s, dilation: %s, blur_kernel: %s)",
self.__class__.__name__, size, points, dilation, blur_kernel)
self._size = size
self._points = points
self._dilation = dilation
self._blur_kernel = blur_kernel
self._mask = None
logger.trace("Initialized: %s", self.__class__.__name__)

def get(self, as_zip=False):
""" Obtain the mask.

Parameters
----------
as_zip: bool, optional
``True`` if the mask should be returned zipped otherwise ``False``

Returns
-------
:class:`numpy.ndarray` or zipped array
The mask as a single channel image of the given :attr:`size` dimension. If
:attr:`as_zip` is ``True`` then the :class:`numpy.ndarray` will be contained within a
zipped container
"""
if not np.any(self._mask):
self._generate_mask()
retval = compress(self._mask) if as_zip else self._mask
logger.trace("as_zip: %s, retval type: %s", as_zip, type(retval))
return retval

def _generate_mask(self):
""" Generate the mask.

Creates the mask applying any requested dilation and blurring and assigns to
:attr:`_mask`

Returns
-------
:class:`numpy.ndarray`
The mask as a single channel image of the given :attr:`size` dimension.
"""
mask = np.zeros((self._size) + (1, ), dtype="float32")
for landmarks in self._points:
lms = np.rint(landmarks).astype("int")
cv2.fillConvexPoly(mask, cv2.convexHull(lms), 1.0, lineType=cv2.LINE_AA)
if self._dilation != 0:
mask = cv2.dilate(mask,
cv2.getStructuringElement(cv2.MORPH_ELLIPSE,
(self._dilation, self._dilation)),
iterations=1)
if self._blur_kernel != 0:
mask = BlurMask("gaussian", mask, self._blur_kernel).blurred
logger.trace("mask: (shape: %s, dtype: %s)", mask.shape, mask.dtype)
self._mask = (mask * 255.0).astype("uint8")


class Mask():
""" Face Mask information and convenience methods

Expand Down Expand Up @@ -805,6 +760,79 @@ def from_dict(self, mask_dict: MaskAlignmentsFileDict) -> None:
for k, v in mask_dict.items()})


class LandmarksMask(Mask):
""" Create a single channel mask from aligned landmark points.

Landmarks masks are created on the fly, so the stored centering and size should be the same as
the aligned face that the mask will be applied to. As the masks are created on the fly, blur +
dilation is applied to the mask at creation (prior to compression) rather than after
decompression when requested.

Note
----
Threshold is not used for Landmarks mask as the mask is binary

Parameters
----------
points: list
A list of landmark points that correspond to the given storage_size to create
the mask. Each item in the list should be a :class:`numpy.ndarray` that a filled
convex polygon will be created from
storage_size: int, optional
The size (in pixels) that the compressed mask should be stored at. Default: 128.
storage_centering, str (optional):
The centering to store the mask at. One of `"legacy"`, `"face"`, `"head"`.
Default: `"face"`
dilation: int, optional
The amount of dilation to apply to the mask. `0` for none. Default: `0`
"""
def __init__(self,
points: List[np.ndarray],
storage_size: int = 128,
storage_centering: "CenteringType" = "face",
dilation: int = 0) -> None:
super().__init__(storage_size=storage_size, storage_centering=storage_centering)
self._points = points
self._dilation = dilation

@property
def mask(self) -> np.ndarray:
""" :class:`numpy.ndarray`: Overrides the default mask property, creating the processed
mask at first call and compressing it. The decompressed mask is returned from this
property. """
return self.stored_mask

def generate_mask(self, affine_matrix: np.ndarray, interpolator: int) -> None:
""" Generate the mask.

Creates the mask applying any requested dilation and blurring and assigns compressed mask
to :attr:`_mask`

Parameters
----------
affine_matrix: :class:`numpy.ndarray`
The transformation matrix required to transform the mask to the original frame.
interpolator, int:
The CV2 interpolator required to transform this mask to it's original frame
"""
mask = np.zeros((self.stored_size, self.stored_size, 1), dtype="float32")
for landmarks in self._points:
lms = np.rint(landmarks).astype("int")
cv2.fillConvexPoly(mask, cv2.convexHull(lms), 1.0, lineType=cv2.LINE_AA)
if self._dilation != 0:
mask = cv2.dilate(mask,
cv2.getStructuringElement(cv2.MORPH_ELLIPSE,
(self._dilation, self._dilation)),
iterations=1)
if self._blur_kernel != 0 and self._blur_type is not None:
mask = BlurMask(self._blur_type,
mask,
self._blur_kernel,
passes=self._blur_passes).blurred
logger.trace("mask: (shape: %s, dtype: %s)", mask.shape, mask.dtype) # type: ignore
self.add(mask, affine_matrix, interpolator)


class BlurMask(): # pylint:disable=too-few-public-methods
""" Factory class to return the correct blur object for requested blur type.

Expand Down
15 changes: 12 additions & 3 deletions lib/cli/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def _import_script(self) -> Callable:
class: Faceswap Script
The uninitialized script from the faceswap scripts folder.
"""
self._set_environment_variables()
self._test_for_tf_version()
self._test_for_gui()
cmd = os.path.basename(sys.argv[0])
Expand All @@ -51,6 +52,17 @@ def _import_script(self) -> Callable:
script = getattr(module, self._command.title())
return script

def _set_environment_variables(self) -> None:
""" Set the number of threads that numexpr can use and TF environment variables. """
# Allocate a decent number of threads to numexpr to suppress warnings
cpu_count = os.cpu_count()
allocate = cpu_count - cpu_count // 3 if cpu_count is not None else 1
os.environ["NUMEXPR_MAX_THREADS"] = str(max(1, allocate))

# Ensure tensorflow doesn't pin all threads to one core when using Math Kernel Library
os.environ["TF_MIN_GPU_MULTIPROCESSOR_COUNT"] = "4"
os.environ["KMP_AFFINITY"] = "disabled"

def _test_for_tf_version(self) -> None:
""" Check that the required Tensorflow version is installed.

Expand All @@ -63,9 +75,6 @@ def _test_for_tf_version(self) -> None:
min_ver = 2.7
max_ver = 2.9
try:
# Ensure tensorflow doesn't pin all threads to one core when using Math Kernel Library
os.environ["TF_MIN_GPU_MULTIPROCESSOR_COUNT"] = "4"
os.environ["KMP_AFFINITY"] = "disabled"
import tensorflow as tf # noqa pylint:disable=import-outside-toplevel,unused-import
except ImportError as err:
if "DLL load failed while importing" in str(err):
Expand Down
26 changes: 10 additions & 16 deletions lib/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,26 +356,20 @@ def read_image_batch(filenames, with_metadata=False):
>>> images = read_image_batch(image_filenames)
"""
logger.trace("Requested batch: '%s'", filenames)
executor = futures.ThreadPoolExecutor()
with executor:
batch = [None for _ in range(len(filenames))]
if with_metadata:
meta = [None for _ in range(len(filenames))]

with futures.ThreadPoolExecutor() as executor:
images = {executor.submit(read_image, filename,
raise_error=True, with_metadata=with_metadata): filename
for filename in filenames}
batch = [None for _ in range(len(filenames))]
if with_metadata:
meta = [None for _ in range(len(filenames))]
# There is no guarantee that the same filename will not be passed through multiple times
# (and when shuffle is true this can definitely happen), so we can't just call
# filenames.index().
return_indices = {filename: [idx for idx, fname in enumerate(filenames)
if fname == filename]
for filename in set(filenames)}
raise_error=True, with_metadata=with_metadata): idx
for idx, filename in enumerate(filenames)}
for future in futures.as_completed(images):
return_idx = return_indices[images[future]].pop()
ret_idx = images[future]
if with_metadata:
batch[return_idx], meta[return_idx] = future.result()
batch[ret_idx], meta[ret_idx] = future.result()
else:
batch[return_idx] = future.result()
batch[ret_idx] = future.result()

batch = np.array(batch)
retval = (batch, meta) if with_metadata else batch
Expand Down
2 changes: 1 addition & 1 deletion lib/training/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
associated objects. """

from .augmentation import ImageAugmentation # noqa
from .generator import TrainingDataGenerator # noqa
from .generator import PreviewDataGenerator, TrainingDataGenerator # noqa
Loading