Skip to content

Commit

Permalink
Merge pull request #1487 from mikel-brostrom/kf-type-hints
Browse files Browse the repository at this point in the history
added type hints
  • Loading branch information
mikel-brostrom committed Jun 20, 2024
2 parents 5a63037 + 9f517f5 commit 9863bd2
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 492 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from abc import ABC, abstractmethod


class CMCInterface(ABC):
class BaseCMC(ABC):

@abstractmethod
def apply(self, im):
Expand Down
4 changes: 2 additions & 2 deletions boxmot/motion/cmc/ecc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import cv2
import numpy as np

from boxmot.motion.cmc.cmc_interface import CMCInterface
from boxmot.motion.cmc.base_cmc import BaseCMC
from boxmot.utils import BOXMOT
from boxmot.utils import logger as LOGGER


class ECC(CMCInterface):
class ECC(BaseCMC):
def __init__(
self,
warp_mode: int = cv2.MOTION_EUCLIDEAN,
Expand Down
4 changes: 2 additions & 2 deletions boxmot/motion/cmc/orb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import cv2
import numpy as np

from boxmot.motion.cmc.cmc_interface import CMCInterface
from boxmot.motion.cmc.base_cmc import BaseCMC
from boxmot.utils import BOXMOT


class ORB(CMCInterface):
class ORB(BaseCMC):

def __init__(
self,
Expand Down
4 changes: 2 additions & 2 deletions boxmot/motion/cmc/sift.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import cv2
import numpy as np

from boxmot.motion.cmc.cmc_interface import CMCInterface
from boxmot.motion.cmc.base_cmc import BaseCMC
from boxmot.utils import BOXMOT


class SIFT(CMCInterface):
class SIFT(BaseCMC):

def __init__(
self,
Expand Down
4 changes: 2 additions & 2 deletions boxmot/motion/cmc/sof.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import cv2
import numpy as np

from boxmot.motion.cmc.cmc_interface import CMCInterface
from boxmot.motion.cmc.base_cmc import BaseCMC
from boxmot.utils import BOXMOT
from boxmot.utils import logger as LOGGER


class SOF(CMCInterface):
class SOF(BaseCMC):

def __init__(
self,
Expand Down
158 changes: 158 additions & 0 deletions boxmot/motion/kalman_filters/base_kalman_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import numpy as np
import scipy.linalg
from typing import Tuple

"""
Table for the 0.95 quantile of the chi-square distribution with N degrees of
freedom (contains values for N=1, ..., 9). Taken from MATLAB/Octave's chi2inv
function and used as Mahalanobis gating threshold.
"""
chi2inv95 = {
1: 3.8415,
2: 5.9915,
3: 7.8147,
4: 9.4877,
5: 11.070,
6: 12.592,
7: 14.067,
8: 15.507,
9: 16.919
}

class BaseKalmanFilter:
"""
Base class for Kalman filters tracking bounding boxes in image space.
"""

def __init__(self, ndim: int):
self.ndim = ndim
self.dt = 1.

# Create Kalman filter model matrices.
self._motion_mat = np.eye(2 * ndim, 2 * ndim) # State transition matrix
for i in range(ndim):
self._motion_mat[i, ndim + i] = self.dt
self._update_mat = np.eye(ndim, 2 * ndim) # Observation matrix

# Motion and observation uncertainty weights.
self._std_weight_position = 1. / 20
self._std_weight_velocity = 1. / 160

def initiate(self, measurement: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Create track from unassociated measurement.
"""
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
mean = np.r_[mean_pos, mean_vel]

std = self._get_initial_covariance_std(measurement)
covariance = np.diag(np.square(std))
return mean, covariance

def _get_initial_covariance_std(self, measurement: np.ndarray) -> np.ndarray:
"""
Return initial standard deviations for the covariance matrix.
Should be implemented by subclasses.
"""
raise NotImplementedError

def predict(self, mean: np.ndarray, covariance: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Run Kalman filter prediction step.
"""
std_pos, std_vel = self._get_process_noise_std(mean)
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))

mean = np.dot(mean, self._motion_mat.T)
covariance = np.linalg.multi_dot((
self._motion_mat, covariance, self._motion_mat.T)) + motion_cov

return mean, covariance

def _get_process_noise_std(self, mean: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Return standard deviations for process noise.
Should be implemented by subclasses.
"""
raise NotImplementedError

def project(self, mean: np.ndarray, covariance: np.ndarray, confidence: float = 0.0) -> Tuple[np.ndarray, np.ndarray]:
"""
Project state distribution to measurement space.
"""
std = self._get_measurement_noise_std(mean, confidence)

# NSA Kalman algorithm from GIAOTracker, which proposes a formula to
# adaptively calculate the noise covariance Rek:
# Rk = (1 − ck) Rk
# where Rk is the preset constant measurement noise covariance
# and ck is the detection confidence score at state k. Intuitively,
# the detection has a higher score ck when it has less noise,
# which results in a low Re.
std = [(1 - confidence) * x for x in std]

innovation_cov = np.diag(np.square(std))

mean = np.dot(self._update_mat, mean)
covariance = np.linalg.multi_dot((
self._update_mat, covariance, self._update_mat.T))
return mean, covariance + innovation_cov

def multi_predict(self, mean: np.ndarray, covariance: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Run Kalman filter prediction step (Vectorized version).
"""
std_pos, std_vel = self._get_multi_process_noise_std(mean)
sqr = np.square(np.r_[std_pos, std_vel]).T

motion_cov = [np.diag(sqr[i]) for i in range(len(mean))]
motion_cov = np.asarray(motion_cov)

mean = np.dot(mean, self._motion_mat.T)
left = np.dot(self._motion_mat, covariance).transpose((1, 0, 2))
covariance = np.dot(left, self._motion_mat.T) + motion_cov

return mean, covariance

def update(self, mean: np.ndarray, covariance: np.ndarray, measurement: np.ndarray, confidence: float = 0.0) -> Tuple[np.ndarray, np.ndarray]:
"""
Run Kalman filter correction step.
"""
projected_mean, projected_cov = self.project(mean, covariance, confidence)

chol_factor, lower = scipy.linalg.cho_factor(projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve((chol_factor, lower), np.dot(covariance, self._update_mat.T).T, check_finite=False).T
innovation = measurement - projected_mean

new_mean = mean + np.dot(innovation, kalman_gain.T)
new_covariance = covariance - np.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance

def _get_multi_process_noise_std(self, mean: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Return standard deviations for process noise in vectorized form.
Should be implemented by subclasses.
"""
raise NotImplementedError

def gating_distance(self, mean: np.ndarray, covariance: np.ndarray, measurements: np.ndarray, only_position: bool = False, metric: str = 'maha') -> np.ndarray:
"""
Compute gating distance between state distribution and measurements.
"""
mean, covariance = self.project(mean, covariance)

if only_position:
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]

d = measurements - mean
if metric == 'gaussian':
return np.sum(d * d, axis=1)
elif metric == 'maha':
cholesky_factor = np.linalg.cholesky(covariance)
z = scipy.linalg.solve_triangular(cholesky_factor, d.T, lower=True, check_finite=False, overwrite_b=True)
squared_maha = np.sum(z * z, axis=0)
return squared_maha
else:
raise ValueError('invalid distance metric')
Loading

0 comments on commit 9863bd2

Please sign in to comment.