Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

T356 diffusive oxygenation #357

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions simpa/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from .libraries.heterogeneity_generator import RandomHeterogeneity
from .libraries.heterogeneity_generator import BlobHeterogeneity
from .libraries.heterogeneity_generator import ImageHeterogeneity
from .libraries.heterogeneity_generator import OxygenationDiffusionMap3D
from .libraries.heterogeneity_generator import OxygenationDiffusionMap2D

# Then load classes and methods with an <b>increasing</b> amount of internal dependencies.
# If there are import errors in the tests, it is probably due to an incorrect
Expand Down
188 changes: 188 additions & 0 deletions simpa/utils/libraries/heterogeneity_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
# SPDX-FileCopyrightText: 2021 Janek Groehl
# SPDX-License-Identifier: MIT

from abc import abstractmethod, ABC
import numpy as np
from sklearn.datasets import make_blobs
from scipy.ndimage.filters import gaussian_filter
from skimage import transform
from simpa.utils import Tags
from typing import Union, Optional
from simpa.log import Logger
from simpa.utils.calculate import calculate_oxygenation


class HeterogeneityGeneratorBase(object):
Expand Down Expand Up @@ -280,6 +282,9 @@ def crop_image(self, xdim: int, zdim: int, image: np.ndarray,
crop_horizontal = round((image_width_pixels - crop_width) / 2)
elif isinstance(crop_placement[0], int):
crop_horizontal = crop_placement[0]
else:
raise ValueError(f'Unrecognised crop placement {crop_placement[0]}, please see the available options in'
f'the documentation')

if crop_placement[1] == Tags.CROP_POSITION_TOP:
crop_vertical = 0
Expand All @@ -289,6 +294,9 @@ def crop_image(self, xdim: int, zdim: int, image: np.ndarray,
crop_vertical = round((image_height_pixels - crop_height) / 2)
elif isinstance(crop_placement[1], int):
crop_vertical = crop_placement[1]
else:
raise ValueError(f'Unrecognised crop placement {crop_placement[1]}, please see the available options in'
f'the documentation')

elif isinstance(crop_placement, str):
if crop_placement == Tags.CROP_POSITION_CENTRE:
Expand All @@ -301,6 +309,12 @@ def crop_image(self, xdim: int, zdim: int, image: np.ndarray,
crop_vertical = image_height_pixels - crop_height
if crop_vertical != 0:
crop_vertical = np.random.randint(0, crop_vertical)
else:
raise ValueError(f'Unrecognised crop placement {crop_placement}, please see the available options in'
f'the documentation')
else:
raise ValueError(f'Unrecognised crop placement {crop_placement}, please see the available options in'
f'the documentation')

cropped_image = image[crop_horizontal: crop_horizontal + crop_width, crop_vertical: crop_vertical + crop_height]

Expand All @@ -325,3 +339,177 @@ def change_resolution(self, image: np.ndarray, spacing_mm: Union[int, float],
self.logger.warning(
"The input image has changed pixel spacing to {} to match the simulation volume".format(spacing_mm))
return transform.resize(image, (new_image_pixel_width, new_image_pixel_height))


class OxygenationDiffusionMap(object):
"""
Here the aim is to create diffusion maps for oxygenation based on the positions and values of pre-set of the
sources/sinks. This predefined oxygenation map must be 2D. The user should understand that this technique requires
knowing when the diffusion equations have approached a stable point. This works using a finite difference method on
the diffusion equations.

Some suggestions for how to optimise the time of convergence:
- We recommend that the user try a number of time steps to begin with to gauge reasonable values for your
simulation.
- We recommend that the user amends the diffusion constant. At its essence, this value determines the size of
the steps in this numerical method. Large diffusion constant will lead to faster convergence, but will reduce
the accuracy
- We recommend caution around the value decide for the background_oxygenation. This value should represent what
you expect the long term oxygenation to look like, as this will reduce the amount each value needs to change in
order to reach ots converged value. In addition, this value will be used as the boundary. It represents
condition which must be fulfilled throughout the simulation and will therefore have a large baring on the final
results

Attributes:
map: The final oxygenation map
"""

def __init__(self, segmentation_array: np.ndarray, segmentation_mapping: dict,
segments_to_include: list, diffusion_const: Union[int, float], dt: Union[int, float],
ds2: Union[int, float], n_steps: int, background_oxygenation: Union[int, float]):
"""
Upon initialisation of this class, the diffusion map will be created.
:param segmentation_array: An array segmenting the medium
:param segmentation_mapping: A mapping from the segmentations to the oxygenations of the blood vessels
:param segments_to_include: Which segments to include in the initial oxygenation map
:param diffusion_const: The diffusion constant of the oxygenation
:param n_steps: The number of steps the simulation should use
:param background_oxygenation: The oxygenation that will be used as the starting value everywhere where the
oxygenation is not defined, and as the edge value
"""
initial_conditions = np.zeros(segmentation_array.shape)
for segment in segments_to_include:
initial_conditions[np.where(segmentation_array == segment)
] = calculate_oxygenation(segmentation_mapping[segment])

oxygenation = initial_conditions.copy()
oxygenation[np.where(initial_conditions == 0)] = background_oxygenation

for m in range(n_steps):
oxygenation = self.diffusion_timestep(oxygenation, initial_conditions, diffusion_const, dt, ds2)

self.map = oxygenation

@abstractmethod
def diffusion_timestep(self, u, boundary_conditions, diffusion_const, dt, ds2):
"""
Here, a diffusion equation can be applied to determine how the oxygenation should evolve.
:param u: The initial map to perform a time step of the diffusion model
:param boundary_conditions: The conditions that you wish to remain constant throughout the simulation
:param diffusion_const: The diffusion constant of the oxygenation
:param dt: The time step
:param ds2: The second order spatial step
:return: The map after a time step of the diffusion model
"""
pass

def get_map(self):
return self.map


class OxygenationDiffusionMap2D(OxygenationDiffusionMap):
"""
This uses the OxygenationDiffusionMap class and applies it to a volume with a constant cross-sectoin. The predefined
oxygenation map must be 2D.

Attributes:
map: The final oxygenation map
"""

def __init__(self, spacing_mm, segmentation_area: np.ndarray, segmentation_mapping, segments_to_include,
diffusion_const, ydim: int, n_steps=10000, background_oxygenation=0.8):
"""
Upon initialisation of this class, the diffusion map will be created.
:param spacing_mm: the spacing of the volume in mm
:param segmentation_area: An array segmenting the cross-sectional area of the medium
:param segmentation_mapping: A mapping from the segmentations to the oxygenations of the blood vessels
:param segments_to_include: Which segments to include in the initial oxygenation map
:param diffusion_const: The diffusion constant of the oxygenation
:param ydim: the dimension to scale the image to make it into a volume
:param n_steps: The number of steps the simulation should use
:param background_oxygenation: The oxygenation that will be used as the starting value everywhere where the
oxygenation is not defined, and as the edge value
"""
ds2 = spacing_mm**2
dt = ds2 ** 2 / (4 * diffusion_const * ds2)
super().__init__(segmentation_area, segmentation_mapping, segments_to_include, diffusion_const, dt, ds2,
n_steps, background_oxygenation)

self.map = np.repeat(self.map[:, np.newaxis, :], ydim, axis=1)

def diffusion_timestep(self, u, boundary_conditions, diffusion_const, dt, ds2):
"""
:param u: The initial map to perform a time step of the diffusion model
:param boundary_conditions: The conditions that you wish to remain constant throughout the simulation
:param diffusion_const: The diffusion constant of the oxygenation
:param dt: The time step
:param ds2: The second order spatial step
:return: The map after a time step of the diffusion model
"""
# Propagate with forward-difference in time, central-difference in space
u[1:-1, 1:-1] = u[1:-1, 1:-1] + diffusion_const * dt * (
(u[2:, 1:-1] - 2 * u[1:-1, 1:-1] + u[:-2, 1:-1]) / ds2
+ (u[1:-1, 2:] - 2 * u[1:-1, 1:-1] + u[1:-1, :-2]) / ds2)
u[np.where(boundary_conditions != 0)] = boundary_conditions[np.where(boundary_conditions != 0)]
return u


class OxygenationDiffusionMap3D(OxygenationDiffusionMap):
"""
Here the aim is to create diffusion maps for oxygenation based on the positions and values of pre-set of the
sources/sinks. This predefined oxygenation map must be 3D. The user should understand that this technique requires
knowing when the diffusion equations have approached a stable point. This works using a finite difference method on
the diffusion equations.

Some suggestions for how to optimise the time of convergence:
- We recommend that the user try a number of time steps to begin with to gauge reasonable values for your
simulation.
- We recommend that the user amends the diffusion constant. At its essence, this value determines the size of
the steps in this numerical method. Large diffusion constant will lead to faster convergence, but will reduce
the accuracy
- We recommend caution around the value decide for the background_oxygenation. This value should represent what
you expect the long term oxygenation to look like, as this will reduce the amount each value needs to change in
order to reach ots converged value. In addition, this value will be used as the boundary. It represents
condition which must be fulfilled throughout the simulation and will therefore have a large baring on the final
results

Attributes:
map: The final oxygenation map
"""

def __init__(self, spacing_mm: Union[int, float], segmentation_volume: np.ndarray, segmentation_mapping: dict,
segments_to_include: list, diffusion_const: Union[int, float],
n_steps: int = 10000, background_oxygenation: Union[int, float] = 0.8):
"""
Upon initialisation of this class, the diffusion map will be created.
:param spacing_mm: the spacing of the volume in mm
:param segmentation_volume: An array segmenting the volume of the medium
:param segmentation_mapping: A mapping from the segmentations to the oxygenations of the blood vessels
:param segments_to_include: Which segments to include in the initial oxygenation map
:param diffusion_const: The diffusion constant of the oxygenation
:param n_steps: The number of steps the simulation should use
:param background_oxygenation: The oxygenation that will be used: as the starting value everywhere where the
oxygenation is not defined, and as the edge value
"""
ds2 = spacing_mm**2
dt = ds2 ** 2 / (6 * diffusion_const * ds2)

super().__init__(segmentation_volume, segmentation_mapping, segments_to_include, diffusion_const, dt,
ds2, n_steps, background_oxygenation)

def diffusion_timestep(self, u, boundary_conditions, diffusion_const, dt, ds2):
"""
:param u: The initial map to perform a time step of the diffusion model
:param boundary_conditions: The conditions that you wish to remain constant throughout the simulation
:param diffusion_const: The diffusion constant of the oxygenation
:param dt: The time step
:param ds2: The second order spatial step
:return: The map after a time step of the diffusion model
"""
# Propagate with forward-difference in time, central-difference in space
u[1:-1, 1:-1, 1:-1] = u[1:-1, 1:-1, 1:-1] + diffusion_const * dt * (
(u[2:, 1:-1, 1:-1] - 2 * u[1:-1, 1:-1, 1:-1] + u[:-2, 1:-1, 1:-1]) / ds2
+ (u[1:-1, 2:, 1:-1] - 2 * u[1:-1, 1:-1, 1:-1] + u[1:-1, :-2, 1:-1]) / ds2
+ (u[1:-1, 1:-1, 2:] - 2 * u[1:-1, 1:-1, 1:-1] + u[1:-1, 1:-1, :-2]) / ds2)
u[np.where(boundary_conditions != 0)] = boundary_conditions[np.where(boundary_conditions != 0)]
return u
Loading