Skip to content

Commit

Permalink
Merge pull request #314 from terrier-org/0.9refactor
Browse files Browse the repository at this point in the history
0.9refactor
  • Loading branch information
cmacdonald authored Oct 27, 2022
2 parents 54c1baf + 3cc2082 commit 583ffb9
Show file tree
Hide file tree
Showing 22 changed files with 826 additions and 759 deletions.
4 changes: 2 additions & 2 deletions docs/ltr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ Learning

.. autofunction:: pyterrier.ltr.apply_learned_model()

The resulting transformer implements EstimatorBase, in other words it has a `fit()` method, that can be trained using
training topics and qrels, as well as (optionally) validation topics and qrels. See also :ref:`pt.transformer.estimatorbase`.
The resulting transformer implements Estimator, in other words it has a `fit()` method, that can be trained using
training topics and qrels, as well as (optionally) validation topics and qrels. See also :ref:`pt.transformer.estimator`.

SKLearn
~~~~~~~
Expand Down
31 changes: 21 additions & 10 deletions docs/transformer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ earlier. So while the following two pipelines are semantically equivalent, the l
Fitting
=======
When `fit()` is called on a pipeline, all estimators (transformers that also have a ``fit()`` method, as specified by
`EstimatorBase`) within the pipeline are fitted, in turn. This allows one (or more) stages of learning to be
`Estimator`) within the pipeline are fitted, in turn. This allows one (or more) stages of learning to be
integrated into a retrieval pipeline. See :ref:`pyterrier.ltr` for examples.

When calling fit on a composed pipeline (i.e. one created using the ``>>`` operator), this will will call ``fit()`` on any
Expand All @@ -58,23 +58,34 @@ This class is the base class for all transformers.
Moreover, by extending Transformer, all transformer implementations gain the necessary "dunder" methods (e.g. ``__rshift__()``)
to support the transformer operators (`>>`, `+` etc). NB: This class used to be called ``pyterrier.transformer.TransformerBase``

.. _pt.transformer.estimatorbase:
.. _pt.transformer.estimator:

EstimatorBase
Estimator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This class exposes a ``fit()`` method that can be used for transformers that can be trained.
This base class exposes a ``fit()`` method that can be used for transformers that can be trained.

.. autoclass:: pyterrier.transformer.EstimatorBase
.. autoclass:: pyterrier.Estimator
:members:

The ComposedPipeline implements ``fit()``, which applies the interimediate transformers on the specified training (and validation) topics, and places
the output into the ``fit()`` method of the final transformer.

Indexer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This base class exposes a ``index()`` method that can be used for transformers that create an index.

.. autoclass:: pyterrier.Indexer
:members:

The ComposedPipeline also implements ``index()``, which applies the interimediate transformers on the specified documents to be indexed, and places
the output into the ``index()`` method of the final transformer.

Internal transformers
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

A significant number of transformers are defined in pyterrier.transformer to implement operators etc. Its is not expected
A significant number of transformers are defined in pyterrier.ops to implement operators etc. Its is not expected
to use these directly but they are documented for completeness.

+--------+------------------+---------------------------+
Expand Down Expand Up @@ -106,7 +117,7 @@ Indexing Pipelines
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Transformers can be chained to create indexing pipelines. The last element in the chain is assumed to be an indexer like
IterDictIndexer - it should implement an ``index()`` method like IterDictIndexerBase. For instance::
IterDictIndexer - it should implement an ``index()`` method like pt.Indexer. For instance::

docs = [ {"docno" : "1", "text" : "a" } ]
indexer = pt.text.sliding() >> pt.IterDictIndexer()
Expand All @@ -133,9 +144,9 @@ extend ``pt.Transformer`` directly.

Here are some hints for writing Transformers:
- Except for an indexer, you should implement a ``transform()`` method.
- If your approach ranks results, use ``pt.model.add_ranks()`` to add the rank column.
- If your approach can be trained, your transformer should extend EstimatorBase, and implement the ``fit()`` method.
- If your approach is an indexer, your transformer should extend IterDictIndexerBase and implement ``index()`` method.
- If your approach ranks results, use ``pt.model.add_ranks()`` to add the rank column. (``pt.apply.doc_score`` will call add_ranks automatically).
- If your approach can be trained, your transformer should extend Estimator, and implement the ``fit()`` method.
- If your approach is an indexer, your transformer should extend Indexer and implement ``index()`` method.


Mocking Transformers from DataFrames
Expand Down
8 changes: 5 additions & 3 deletions pyterrier/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

from .bootstrap import _logging, setup_terrier, setup_jnius, is_windows

# definitive API used by others, now available before pt.init
from .transformer import Transformer, Estimator, Indexer

import importlib

#sub modules
Expand All @@ -22,7 +25,7 @@
rewrite = None
text = None
transformer = None
Transformer = None


file_path = os.path.dirname(os.path.abspath(__file__))
firstInit = False
Expand Down Expand Up @@ -145,7 +148,6 @@ def init(version=None, mem=None, packages=[], jvm_opts=[], redirect_io=True, log
from .datasets import get_dataset, find_datasets, list_datasets
from .index import Indexer, FilesIndexer, TRECCollectionIndexer, DFIndexer, DFIndexUtils, IterDictIndexer, FlatJSONDocumentIterator, IndexingType
from .pipelines import Experiment, GridScan, GridSearch, KFoldGridSearch
from .transformer import Transformer

# Make imports global
globals()["autoclass"] = autoclass
Expand Down Expand Up @@ -197,7 +199,7 @@ def init(version=None, mem=None, packages=[], jvm_opts=[], redirect_io=True, log
globals()["GridScan"] = GridScan
globals()["GridSearch"] = GridSearch
globals()["KFoldGridSearch"] = KFoldGridSearch
globals()["Transformer"] = Transformer



# we save the pt.init() arguments so that other processes,
Expand Down
2 changes: 1 addition & 1 deletion pyterrier/apply.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Callable, Any, Dict, Union, Sequence
from .transformer import ApplyDocumentScoringTransformer, ApplyQueryTransformer, ApplyDocFeatureTransformer, ApplyForEachQuery, ApplyGenericTransformer, Transformer
from .apply_base import ApplyDocumentScoringTransformer, ApplyQueryTransformer, ApplyDocFeatureTransformer, ApplyForEachQuery, ApplyGenericTransformer, Transformer
from nptyping import NDArray
import pandas as pd

Expand Down
200 changes: 200 additions & 0 deletions pyterrier/apply_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
from .transformer import Transformer
from . import tqdm
from .model import add_ranks, split_df
import pandas as pd

class ApplyTransformerBase(Transformer):
"""
A base class for Apply*Transformers
"""
def __init__(self, fn, *args, verbose=False, **kwargs):
super().__init__(*args, **kwargs)
self.fn = fn
self.verbose = verbose

def __repr__(self):
return "pt.apply.??()"

class ApplyForEachQuery(ApplyTransformerBase):
def __init__(self, fn, *args, add_ranks=True, **kwargs):
"""
Arguments:
- fn (Callable): Takes as input a panda Series for a row representing that document, and returns the new float doument score
"""
super().__init__(fn, *args, **kwargs)
self.add_ranks = add_ranks

def transform(self, res):
if len(res) == 0:
return self.fn(res)
it = res.groupby("qid")
if self.verbose:
it = tqdm(it, unit='query')
try:
dfs = [self.fn(group) for qid, group in it]
if self.add_ranks:
dfs = [add_ranks(df, single_query=True) for df in dfs]
rtr = pd.concat(dfs)
except Exception as a:
raise Exception("Problem applying %s" % self.fn) from a
return rtr

class ApplyDocumentScoringTransformer(ApplyTransformerBase):
"""
Implements a transformer that can apply a function to perform document scoring. The supplied function
should take as input one row, and return a float for the score of the document.
Usually accessed using pt.apply.doc_score()::
def _score_fn(row):
return float(row["url".count("/")])
pipe = pt.BatchRetrieve(index) >> pt.apply.doc_score(_score_fn)
Can be used in batching manner, which is particularly useful for appling neural models. In this case,
the scoring function receives a dataframe, rather than a single row::
def _doclen(df):
return df.text.str.len()
pipe = pt.BatchRetrieve(index) >> pt.apply.doc_score(_doclen)
"""
def __init__(self, fn, *args, batch_size=None, **kwargs):
"""
Arguments:
- fn (Callable): Takes as input a panda Series for a row representing that document, and returns the new float doument score. If batch_size is set,
takes a dataframe, and returns a sequence of floats representing scores for those documents.
- batch_size (int or None). How many documents to operate on at once. If None, operates row-wise
"""
super().__init__(fn, *args, **kwargs)
self.batch_size = batch_size

def _transform_rowwise(self, outputRes):
fn = self.fn
if self.verbose:
tqdm.pandas(desc="pt.apply.doc_score", unit="d")
outputRes["score"] = outputRes.progress_apply(fn, axis=1)
else:
outputRes["score"] = outputRes.apply(fn, axis=1)
outputRes = add_ranks(outputRes)
return outputRes

def _transform_batchwise(self, outputRes):
fn = self.fn
outputRes["score"] = fn(outputRes)
return outputRes

def transform(self, inputRes):
outputRes = inputRes.copy()
if self.batch_size is None:
return self._transform_rowwise(inputRes)

import math
from .model import split_df
from . import tqdm
num_chunks = math.ceil( len(inputRes) / self.batch_size )
iterator = split_df(inputRes, num_chunks)
iterator = tqdm(iterator, desc="pt.apply", unit='row')
rtr = pd.concat([self._transform_batchwise(chunk_df) for chunk_df in iterator])
rtr = add_ranks(rtr)
return rtr

class ApplyDocFeatureTransformer(ApplyTransformerBase):
"""
Implements a transformer that can apply a function to perform feature scoring. The supplied function
should take as input one row, and return a numpy array for the features of the document.
Usually accessed using pt.apply.doc_features()::
def _feature_fn(row):
return numpy.array([len(row["url"], row["url".count("/")])
pipe = pt.BatchRetrieve(index) >> pt.apply.doc_features(_feature_fn) >> pt.LTRpipeline(xgBoost())
"""
def __init__(self, fn, *args, **kwargs):
"""
Arguments:
- fn (Callable): Takes as input a panda Series for a row representing that document, and returns a new numpy array representing the features of that document
"""
super().__init__(fn, *args, **kwargs)

def transform(self, inputRes):
fn = self.fn
outputRes = inputRes.copy()
if self.verbose:
tqdm.pandas(desc="pt.apply.doc_features", unit="d")
outputRes["features"] = outputRes.progress_apply(fn, axis=1)
else:
outputRes["features"] = outputRes.apply(fn, axis=1)
return outputRes

class ApplyQueryTransformer(ApplyTransformerBase):
"""
Implements a query rewriting transformer by passing a function to perform the rewriting. The function should take
as input one row, and return the string form of the new query.
Usually accessed using pt.apply.query() passing it the function::
def _rewriting_fn(row):
return row["query"] + " extra words"
pipe = pt.apply.query(_rewriting_fn) >> pt.BatchRetrieve(index)
Similarly, a lambda function can also be used::
pipe = pt.apply.query(lambda row: row["query"] + " extra words") >> pt.BatchRetrieve(index)
In the resulting dataframe, the previous query for each row can be found in the query_0 column.
"""
def __init__(self, fn, *args, **kwargs):
"""
Arguments:
- fn (Callable): Takes as input a panda Series for a row representing a query, and returns the new string query
- verbose (bool): Display a tqdm progress bar for this transformer
"""
super().__init__(fn, *args, **kwargs)

def transform(self, inputRes):
from .model import push_queries
fn = self.fn
outputRes = push_queries(inputRes.copy(), inplace=True, keep_original=True)
if self.verbose:
tqdm.pandas(desc="pt.apply.query", unit="d")
outputRes["query"] = outputRes.progress_apply(fn, axis=1)
else:
outputRes["query"] = outputRes.apply(fn, axis=1)
return outputRes

class ApplyGenericTransformer(ApplyTransformerBase):
"""
Allows arbitrary pipelines components to be written as functions. The function should take as input
a dataframe, and return a new dataframe. The function should abide by the main contracual obligations,
e.g. updating then "rank" column.
This class is normally accessed through pt.apply.generic()
If you are scoring, query rewriting or calculating features, it is advised to use one of the other
variants.
Example::
# this pipeline would remove all but the first two documents from a result set
lp = ApplyGenericTransformer(lambda res : res[res["rank"] < 2])
pipe = pt.BatchRetrieve(index) >> lp
"""

def __init__(self, fn, *args, **kwargs):
"""
Arguments:
- fn (Callable): Takes as input a panda DataFrame, and returns a new Pandas DataFrame
"""
super().__init__(fn, *args, **kwargs)

def transform(self, inputRes):
fn = self.fn
return fn(inputRes)

Loading

0 comments on commit 583ffb9

Please sign in to comment.