diff --git a/README.md b/README.md
index 7372c7c2..2b096231 100644
--- a/README.md
+++ b/README.md
@@ -61,6 +61,7 @@ The following algorithms provided within Sequentia support the use of multivaria
- [x] Hidden Markov Models (via [`hmmlearn`](https://github.com/hmmlearn/hmmlearn))
Learning with the Baum-Welch algorithm [[1]](#references)
- [x] Gaussian Mixture Model emissions
- [x] Linear, left-right and ergodic topologies
+ - [x] Multi-processed predictions
- [x] Dynamic Time Warping k-Nearest Neighbors (via [`dtaidistance`](https://github.com/wannesm/dtaidistance))
- [x] Sakoe–Chiba band global warping constraint
- [x] Dependent and independent feature warping (DTWD & DTWI)
diff --git a/lib/sequentia/classifiers/hmm/hmm_classifier.py b/lib/sequentia/classifiers/hmm/hmm_classifier.py
index 8f2ef552..08f2cfa3 100644
--- a/lib/sequentia/classifiers/hmm/hmm_classifier.py
+++ b/lib/sequentia/classifiers/hmm/hmm_classifier.py
@@ -1,4 +1,6 @@
-import numpy as np, pickle
+import tqdm, tqdm.auto, numpy as np, pickle
+from joblib import Parallel, delayed
+from multiprocessing import cpu_count
from .gmmhmm import GMMHMM
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import LabelEncoder
@@ -43,7 +45,7 @@ def fit(self, models):
self._encoder = LabelEncoder()
self._encoder.fit([model.label for model in models])
- def predict(self, X, prior='frequency', return_scores=False, original_labels=True):
+ def predict(self, X, prior='frequency', verbose=True, return_scores=False, original_labels=True, n_jobs=1):
"""Predicts the label for an observation sequence (or multiple sequences) according to maximum likelihood or posterior scores.
Parameters
@@ -60,12 +62,24 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
Alternatively, class prior probabilities can be specified in an iterable of floats, e.g. `[0.1, 0.3, 0.6]`.
+ verbose: bool
+ Whether to display a progress bar or not.
+
+ .. note::
+ If both ``verbose=True`` and ``n_jobs > 1``, then the progress bars for each process
+ are always displayed in the console, regardless of where you are running this function from
+ (e.g. a Jupyter notebook).
+
return_scores: bool
Whether to return the scores of each model on the observation sequence(s).
original_labels: bool
Whether to inverse-transform the labels to their original encoding.
+ n_jobs: int > 0 or -1
+ | The number of jobs to run in parallel.
+ | Setting this to -1 will use all available CPU cores.
+
Returns
-------
prediction(s): str/numeric or :class:`numpy:numpy.ndarray` (str/numeric)
@@ -91,7 +105,10 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
assert np.isclose(sum(prior), 1.), 'Class priors must form a probability distribution by summing to one'
else:
self._val.one_of(prior, ['frequency', 'uniform'], desc='prior')
+ self._val.boolean(verbose, desc='verbose')
self._val.boolean(return_scores, desc='return_scores')
+ self._val.boolean(original_labels, desc='original_labels')
+ self._val.restricted_integer(n_jobs, lambda x: x == -1 or x > 0, 'number of jobs', '-1 or greater than zero')
# Create look-up for prior probabilities
if prior == 'frequency':
@@ -105,10 +122,15 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
# Convert single observation sequence to a singleton list
X = [X] if isinstance(X, np.ndarray) else X
+ # Lambda for calculating the log un-normalized posteriors as a sum of the log forward probabilities (likelihoods) and log priors
+ posteriors = lambda x: np.array([model.forward(x) + np.log(prior[model.label]) for model in self._models])
+
# Calculate log un-normalized posteriors as a sum of the log forward probabilities (likelihoods) and log priors
# Perform the MAP classification rule and return labels to original encoding if necessary
- posteriors = lambda x: np.array([model.forward(x) + np.log(prior[model.label]) for model in self._models])
- scores = np.array([posteriors(x) for x in X])
+ n_jobs = min(cpu_count() if n_jobs == -1 else n_jobs, len(X))
+ X_chunks = [list(chunk) for chunk in np.array_split(np.array(X, dtype=object), n_jobs)]
+ scores = Parallel(n_jobs=n_jobs)(delayed(self._chunk_predict)(i+1, posteriors, chunk, verbose) for i, chunk in enumerate(X_chunks))
+ scores = np.concatenate(scores)
best_idxs = np.atleast_1d(scores.argmax(axis=1))
labels = self._encoder.inverse_transform(best_idxs) if original_labels else best_idxs
@@ -117,7 +139,7 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
else:
return (labels, scores) if return_scores else labels
- def evaluate(self, X, y, prior='frequency'):
+ def evaluate(self, X, y, prior='frequency', verbose=True, n_jobs=1):
"""Evaluates the performance of the classifier on a batch of observation sequences and their labels.
Parameters
@@ -137,6 +159,18 @@ def evaluate(self, X, y, prior='frequency'):
Alternatively, class prior probabilities can be specified in an iterable of floats, e.g. `[0.1, 0.3, 0.6]`.
+ verbose: bool
+ Whether to display a progress bar or not.
+
+ .. note::
+ If both ``verbose=True`` and ``n_jobs > 1``, then the progress bars for each process
+ are always displayed in the console, regardless of where you are running this function from
+ (e.g. a Jupyter notebook).
+
+ n_jobs: int > 0 or -1
+ | The number of jobs to run in parallel.
+ | Setting this to -1 will use all available CPU cores.
+
Returns
-------
accuracy: float
@@ -146,7 +180,7 @@ def evaluate(self, X, y, prior='frequency'):
The confusion matrix representing the discrepancy between predicted and actual labels.
"""
X, y = self._val.observation_sequences_and_labels(X, y)
- predictions = self.predict(X, prior=prior, return_scores=False, original_labels=False)
+ predictions = self.predict(X, prior=prior, return_scores=False, original_labels=False, verbose=verbose, n_jobs=n_jobs)
cm = confusion_matrix(self._encoder.transform(y), predictions, labels=self._encoder.transform(self._encoder.classes_))
return np.sum(np.diag(cm)) / np.sum(cm), cm
@@ -183,6 +217,13 @@ def load(cls, path):
with open(path, 'rb') as file:
return pickle.load(file)
+ def _chunk_predict(self, process, posteriors, chunk, verbose): # Requires fit
+ """Makes predictions (scores) for a chunk of the observation sequences, for a given subprocess."""
+ return np.array([posteriors(x) for x in tqdm.auto.tqdm(
+ chunk, desc='Classifying examples (process {})'.format(process),
+ disable=not(verbose), position=process-1
+ )])
+
@property
def models(self):
try:
diff --git a/lib/sequentia/classifiers/knn/knn_classifier.py b/lib/sequentia/classifiers/knn/knn_classifier.py
index cc4cf934..7ac38e2b 100644
--- a/lib/sequentia/classifiers/knn/knn_classifier.py
+++ b/lib/sequentia/classifiers/knn/knn_classifier.py
@@ -175,15 +175,16 @@ def predict(self, X, verbose=True, original_labels=True, n_jobs=1):
X = self._val.observation_sequences(X, allow_single=True)
self._val.boolean(verbose, desc='verbose')
+ self._val.boolean(original_labels, desc='original_labels')
self._val.restricted_integer(n_jobs, lambda x: x == -1 or x > 0, 'number of jobs', '-1 or greater than zero')
if isinstance(X, np.ndarray):
distances = np.array([self._dtw(X, x) for x in tqdm.auto.tqdm(self._X, desc='Calculating distances', disable=not(verbose))])
return self._output(self._find_nearest(distances), original_labels)
else:
- n_jobs = cpu_count() if n_jobs == -1 else n_jobs
+ n_jobs = min(cpu_count() if n_jobs == -1 else n_jobs, len(X))
X_chunks = [list(chunk) for chunk in np.array_split(np.array(X, dtype=object), n_jobs)]
- labels = Parallel(n_jobs=min(n_jobs, len(X)))(delayed(self._chunk_predict)(i+1, chunk, verbose) for i, chunk in enumerate(X_chunks))
+ labels = Parallel(n_jobs=n_jobs)(delayed(self._chunk_predict)(i+1, chunk, verbose) for i, chunk in enumerate(X_chunks))
return self._output(np.concatenate(labels), original_labels) # Flatten the resulting array
def evaluate(self, X, y, verbose=True, n_jobs=1):
diff --git a/notebooks/Pen-Tip Trajectories (Example).ipynb b/notebooks/Pen-Tip Trajectories (Example).ipynb
index 19e97d15..cf1e023a 100644
--- a/notebooks/Pen-Tip Trajectories (Example).ipynb
+++ b/notebooks/Pen-Tip Trajectories (Example).ipynb
@@ -409,7 +409,7 @@
{
"data": {
"application/vnd.jupyter.widget-view+json": {
- "model_id": "9036fa23214f4727b2ad661a19a549c4",
+ "model_id": "3ddd5bf499294bf0bc0963ab4dbb6ece",
"version_major": 2,
"version_minor": 0
},
@@ -444,7 +444,7 @@
{
"data": {
"application/vnd.jupyter.widget-view+json": {
- "model_id": "1a3610f874c4400cab88d38292c49468",
+ "model_id": "c2f3a92e26114f1fbb4d45058698b747",
"version_major": 2,
"version_minor": 0
},
@@ -461,8 +461,8 @@
"text": [
"w c d e a e b h s v c y w e v v w v v b o e l c d c p n h p y p m h d a y d b n m m a g o g c n l y\n",
"\n",
- "CPU times: user 1.75 s, sys: 108 ms, total: 1.86 s\n",
- "Wall time: 2.2 s\n"
+ "CPU times: user 1.68 s, sys: 195 ms, total: 1.88 s\n",
+ "Wall time: 1.98 s\n"
]
}
],
@@ -491,8 +491,8 @@
"text": [
"w c d e a e b h s v c y w e v v w v v b o e l c d c p n h p y p m h d a y d b n m m a g o g c n l y\n",
"\n",
- "CPU times: user 699 ms, sys: 80.5 ms, total: 779 ms\n",
- "Wall time: 3.73 s\n"
+ "CPU times: user 721 ms, sys: 90.5 ms, total: 811 ms\n",
+ "Wall time: 3.24 s\n"
]
}
],
@@ -521,8 +521,8 @@
"name": "stdout",
"output_type": "stream",
"text": [
- "CPU times: user 542 ms, sys: 17.1 ms, total: 559 ms\n",
- "Wall time: 21.9 s\n"
+ "CPU times: user 556 ms, sys: 17.3 ms, total: 573 ms\n",
+ "Wall time: 10.1 s\n"
]
}
],
@@ -568,7 +568,7 @@
"\n",
"While the fast C compiled functions in the [`dtaidistance`](https://github.com/wannesm/dtaidistance) package (along with the multiprocessing capabilities of Sequentia's `KNNClassifier`) help to speed up classification **a lot**, the practical use of $k$-NN becomes more limited as the dataset grows larger. \n",
"\n",
- "In this case, since our dataset is relatively small, classifying all test examples was completed in $\\approx22s$, which is even faster than the HMM classifier that we show below. "
+ "In this case, since our dataset is relatively small, classifying all test examples was completed in $\\approx10s$, which is even faster than the HMM classifier that we show below. "
]
},
{
@@ -605,7 +605,7 @@
{
"data": {
"application/vnd.jupyter.widget-view+json": {
- "model_id": "0fbffb1d3bbc44b5b6356b54a89a61b2",
+ "model_id": "30052beed780408db9e7b1f1212b6404",
"version_major": 2,
"version_minor": 0
},
@@ -655,14 +655,14 @@
"name": "stdout",
"output_type": "stream",
"text": [
- "CPU times: user 3min 33s, sys: 18 s, total: 3min 51s\n",
- "Wall time: 2min 34s\n"
+ "CPU times: user 197 ms, sys: 13.5 ms, total: 210 ms\n",
+ "Wall time: 55.4 s\n"
]
}
],
"source": [
"%%time\n",
- "acc, cm = clf.evaluate(X_test, y_test)"
+ "acc, cm = clf.evaluate(X_test, y_test, n_jobs=-1)"
]
},
{