Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[add:lib] Add multi-processed predictions for HMMClassifier #136

Merged
merged 4 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ The following algorithms provided within Sequentia support the use of multivaria
- [x] Hidden Markov Models (via [`hmmlearn`](https://github.com/hmmlearn/hmmlearn))<br/><em>Learning with the Baum-Welch algorithm</em> [[1]](#references)
- [x] Gaussian Mixture Model emissions
- [x] Linear, left-right and ergodic topologies
- [x] Multi-processed predictions
- [x] Dynamic Time Warping k-Nearest Neighbors (via [`dtaidistance`](https://github.com/wannesm/dtaidistance))
- [x] Sakoe–Chiba band global warping constraint
- [x] Dependent and independent feature warping (DTWD & DTWI)
Expand Down
53 changes: 47 additions & 6 deletions lib/sequentia/classifiers/hmm/hmm_classifier.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import numpy as np, pickle
import tqdm, tqdm.auto, numpy as np, pickle
from joblib import Parallel, delayed
from multiprocessing import cpu_count
from .gmmhmm import GMMHMM
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import LabelEncoder
Expand Down Expand Up @@ -43,7 +45,7 @@ def fit(self, models):
self._encoder = LabelEncoder()
self._encoder.fit([model.label for model in models])

def predict(self, X, prior='frequency', return_scores=False, original_labels=True):
def predict(self, X, prior='frequency', verbose=True, return_scores=False, original_labels=True, n_jobs=1):
"""Predicts the label for an observation sequence (or multiple sequences) according to maximum likelihood or posterior scores.

Parameters
Expand All @@ -60,12 +62,24 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru

Alternatively, class prior probabilities can be specified in an iterable of floats, e.g. `[0.1, 0.3, 0.6]`.

verbose: bool
Whether to display a progress bar or not.

.. note::
If both ``verbose=True`` and ``n_jobs > 1``, then the progress bars for each process
are always displayed in the console, regardless of where you are running this function from
(e.g. a Jupyter notebook).

return_scores: bool
Whether to return the scores of each model on the observation sequence(s).

original_labels: bool
Whether to inverse-transform the labels to their original encoding.

n_jobs: int > 0 or -1
| The number of jobs to run in parallel.
| Setting this to -1 will use all available CPU cores.

Returns
-------
prediction(s): str/numeric or :class:`numpy:numpy.ndarray` (str/numeric)
Expand All @@ -91,7 +105,10 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
assert np.isclose(sum(prior), 1.), 'Class priors must form a probability distribution by summing to one'
else:
self._val.one_of(prior, ['frequency', 'uniform'], desc='prior')
self._val.boolean(verbose, desc='verbose')
self._val.boolean(return_scores, desc='return_scores')
self._val.boolean(original_labels, desc='original_labels')
self._val.restricted_integer(n_jobs, lambda x: x == -1 or x > 0, 'number of jobs', '-1 or greater than zero')

# Create look-up for prior probabilities
if prior == 'frequency':
Expand All @@ -105,10 +122,15 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
# Convert single observation sequence to a singleton list
X = [X] if isinstance(X, np.ndarray) else X

# Lambda for calculating the log un-normalized posteriors as a sum of the log forward probabilities (likelihoods) and log priors
posteriors = lambda x: np.array([model.forward(x) + np.log(prior[model.label]) for model in self._models])

# Calculate log un-normalized posteriors as a sum of the log forward probabilities (likelihoods) and log priors
# Perform the MAP classification rule and return labels to original encoding if necessary
posteriors = lambda x: np.array([model.forward(x) + np.log(prior[model.label]) for model in self._models])
scores = np.array([posteriors(x) for x in X])
n_jobs = min(cpu_count() if n_jobs == -1 else n_jobs, len(X))
X_chunks = [list(chunk) for chunk in np.array_split(np.array(X, dtype=object), n_jobs)]
scores = Parallel(n_jobs=n_jobs)(delayed(self._chunk_predict)(i+1, posteriors, chunk, verbose) for i, chunk in enumerate(X_chunks))
scores = np.concatenate(scores)
best_idxs = np.atleast_1d(scores.argmax(axis=1))
labels = self._encoder.inverse_transform(best_idxs) if original_labels else best_idxs

Expand All @@ -117,7 +139,7 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
else:
return (labels, scores) if return_scores else labels

def evaluate(self, X, y, prior='frequency'):
def evaluate(self, X, y, prior='frequency', verbose=True, n_jobs=1):
"""Evaluates the performance of the classifier on a batch of observation sequences and their labels.

Parameters
Expand All @@ -137,6 +159,18 @@ def evaluate(self, X, y, prior='frequency'):

Alternatively, class prior probabilities can be specified in an iterable of floats, e.g. `[0.1, 0.3, 0.6]`.

verbose: bool
Whether to display a progress bar or not.

.. note::
If both ``verbose=True`` and ``n_jobs > 1``, then the progress bars for each process
are always displayed in the console, regardless of where you are running this function from
(e.g. a Jupyter notebook).

n_jobs: int > 0 or -1
| The number of jobs to run in parallel.
| Setting this to -1 will use all available CPU cores.

Returns
-------
accuracy: float
Expand All @@ -146,7 +180,7 @@ def evaluate(self, X, y, prior='frequency'):
The confusion matrix representing the discrepancy between predicted and actual labels.
"""
X, y = self._val.observation_sequences_and_labels(X, y)
predictions = self.predict(X, prior=prior, return_scores=False, original_labels=False)
predictions = self.predict(X, prior=prior, return_scores=False, original_labels=False, verbose=verbose, n_jobs=n_jobs)
cm = confusion_matrix(self._encoder.transform(y), predictions, labels=self._encoder.transform(self._encoder.classes_))
return np.sum(np.diag(cm)) / np.sum(cm), cm

Expand Down Expand Up @@ -183,6 +217,13 @@ def load(cls, path):
with open(path, 'rb') as file:
return pickle.load(file)

def _chunk_predict(self, process, posteriors, chunk, verbose): # Requires fit
"""Makes predictions (scores) for a chunk of the observation sequences, for a given subprocess."""
return np.array([posteriors(x) for x in tqdm.auto.tqdm(
chunk, desc='Classifying examples (process {})'.format(process),
disable=not(verbose), position=process-1
)])

@property
def models(self):
try:
Expand Down
5 changes: 3 additions & 2 deletions lib/sequentia/classifiers/knn/knn_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,16 @@ def predict(self, X, verbose=True, original_labels=True, n_jobs=1):

X = self._val.observation_sequences(X, allow_single=True)
self._val.boolean(verbose, desc='verbose')
self._val.boolean(original_labels, desc='original_labels')
self._val.restricted_integer(n_jobs, lambda x: x == -1 or x > 0, 'number of jobs', '-1 or greater than zero')

if isinstance(X, np.ndarray):
distances = np.array([self._dtw(X, x) for x in tqdm.auto.tqdm(self._X, desc='Calculating distances', disable=not(verbose))])
return self._output(self._find_nearest(distances), original_labels)
else:
n_jobs = cpu_count() if n_jobs == -1 else n_jobs
n_jobs = min(cpu_count() if n_jobs == -1 else n_jobs, len(X))
X_chunks = [list(chunk) for chunk in np.array_split(np.array(X, dtype=object), n_jobs)]
labels = Parallel(n_jobs=min(n_jobs, len(X)))(delayed(self._chunk_predict)(i+1, chunk, verbose) for i, chunk in enumerate(X_chunks))
labels = Parallel(n_jobs=n_jobs)(delayed(self._chunk_predict)(i+1, chunk, verbose) for i, chunk in enumerate(X_chunks))
return self._output(np.concatenate(labels), original_labels) # Flatten the resulting array

def evaluate(self, X, y, verbose=True, n_jobs=1):
Expand Down
26 changes: 13 additions & 13 deletions notebooks/Pen-Tip Trajectories (Example).ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "9036fa23214f4727b2ad661a19a549c4",
"model_id": "3ddd5bf499294bf0bc0963ab4dbb6ece",
"version_major": 2,
"version_minor": 0
},
Expand Down Expand Up @@ -444,7 +444,7 @@
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "1a3610f874c4400cab88d38292c49468",
"model_id": "c2f3a92e26114f1fbb4d45058698b747",
"version_major": 2,
"version_minor": 0
},
Expand All @@ -461,8 +461,8 @@
"text": [
"w c d e a e b h s v c y w e v v w v v b o e l c d c p n h p y p m h d a y d b n m m a g o g c n l y\n",
"\n",
"CPU times: user 1.75 s, sys: 108 ms, total: 1.86 s\n",
"Wall time: 2.2 s\n"
"CPU times: user 1.68 s, sys: 195 ms, total: 1.88 s\n",
"Wall time: 1.98 s\n"
]
}
],
Expand Down Expand Up @@ -491,8 +491,8 @@
"text": [
"w c d e a e b h s v c y w e v v w v v b o e l c d c p n h p y p m h d a y d b n m m a g o g c n l y\n",
"\n",
"CPU times: user 699 ms, sys: 80.5 ms, total: 779 ms\n",
"Wall time: 3.73 s\n"
"CPU times: user 721 ms, sys: 90.5 ms, total: 811 ms\n",
"Wall time: 3.24 s\n"
]
}
],
Expand Down Expand Up @@ -521,8 +521,8 @@
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 542 ms, sys: 17.1 ms, total: 559 ms\n",
"Wall time: 21.9 s\n"
"CPU times: user 556 ms, sys: 17.3 ms, total: 573 ms\n",
"Wall time: 10.1 s\n"
]
}
],
Expand Down Expand Up @@ -568,7 +568,7 @@
"\n",
"While the fast C compiled functions in the [`dtaidistance`](https://github.com/wannesm/dtaidistance) package (along with the multiprocessing capabilities of Sequentia's `KNNClassifier`) help to speed up classification **a lot**, the practical use of $k$-NN becomes more limited as the dataset grows larger. \n",
"\n",
"In this case, since our dataset is relatively small, classifying all test examples was completed in $\\approx22s$, which is even faster than the HMM classifier that we show below. "
"In this case, since our dataset is relatively small, classifying all test examples was completed in $\\approx10s$, which is even faster than the HMM classifier that we show below. "
]
},
{
Expand Down Expand Up @@ -605,7 +605,7 @@
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "0fbffb1d3bbc44b5b6356b54a89a61b2",
"model_id": "30052beed780408db9e7b1f1212b6404",
"version_major": 2,
"version_minor": 0
},
Expand Down Expand Up @@ -655,14 +655,14 @@
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 3min 33s, sys: 18 s, total: 3min 51s\n",
"Wall time: 2min 34s\n"
"CPU times: user 197 ms, sys: 13.5 ms, total: 210 ms\n",
"Wall time: 55.4 s\n"
]
}
],
"source": [
"%%time\n",
"acc, cm = clf.evaluate(X_test, y_test)"
"acc, cm = clf.evaluate(X_test, y_test, n_jobs=-1)"
]
},
{
Expand Down