From 58175a4d28ca0200e7c052aa0eb3534b167c1141 Mon Sep 17 00:00:00 2001 From: Sean MacAvaney Date: Wed, 18 Sep 2024 09:02:35 +0100 Subject: [PATCH] cleanup apply transformers --- pyterrier/apply_base.py | 283 ++++++++++++++++++++++------------------ pyterrier/model.py | 20 ++- 2 files changed, 169 insertions(+), 134 deletions(-) diff --git a/pyterrier/apply_base.py b/pyterrier/apply_base.py index f80815c5..8eae7cea 100644 --- a/pyterrier/apply_base.py +++ b/pyterrier/apply_base.py @@ -1,10 +1,9 @@ -import math -from typing import Callable, Any, Union, Optional +from typing import Callable, Any, Union, Optional, Iterable +import itertools +import more_itertools +import numpy as np import pandas as pd - import pyterrier as pt -from .transformer import Transformer, Indexer -from .model import add_ranks, split_df class DropColumnTransformer(pt.Transformer): @@ -91,8 +90,7 @@ def transform(self, inp: pd.DataFrame) -> pd.DataFrame: return self._apply_df(inp) # batching - num_chunks = math.ceil(len(inp) / self.batch_size) - iterator = pt.model.split_df(inp, num_chunks) + iterator = pt.model.split_df(inp, batch_size=self.batch_size) if self.verbose: iterator = pt.tqdm(iterator, desc="pt.apply", unit='row') return pd.concat([self._apply_df(chunk_df) for chunk_df in iterator]) @@ -118,32 +116,32 @@ def __repr__(self): return f"pt.apply.{self.col}()" -class ApplyTransformerBase(Transformer): - """ - A base class for Apply*Transformers - """ - def __init__(self, fn, *args, verbose=False, **kwargs): - super().__init__(*args, **kwargs) - self.fn = fn - self.verbose = verbose - - def __repr__(self): - return "pt.apply.??()" - -class ApplyForEachQuery(ApplyTransformerBase): - def __init__(self, fn, *args, add_ranks=True, batch_size=None, **kwargs): +class ApplyForEachQuery(pt.Transformer): + def __init__(self, + fn: Callable[[pd.DataFrame], pd.DataFrame], + *, + add_ranks: bool = True, + batch_size: Optional[int] = None, + verbose: bool = False + ): """ - Arguments: - - fn (Callable): Takes as input a panda Series for a row representing that document, and returns the new float doument score + Instantiates a ApplyForEachQuery. + + Arguments: + fn: Takes as input a DataFrame representing all results for a query and returns a transformed DataFrame + add_ranks: Whether to calcualte and add ranks to the output for each query + batch_size: The number of results per query to process at once. If None, processes in one batch per query. + verbose: Whether to display a progress bar """ - super().__init__(fn, *args, **kwargs) + self.fn = fn self.add_ranks = add_ranks self.batch_size = batch_size - + self.verbose = verbose + def __repr__(self): return "pt.apply.by_query()" - - def transform(self, res): + + def transform(self, res: pd.DataFrame) -> pd.DataFrame: if len(res) == 0: return self.fn(res) @@ -159,19 +157,17 @@ def transform(self, res): query_dfs.append(self.fn(group)) else: # fn cannot be applied to more than batch_size rows at once - # so we must split and reconstruct the output FOR EACH QUERY + # so we must split and reconstruct the output FOR EACH QUERY query_dfs = [] for qid, group in it: - - num_chunks = math.ceil( len(group) / self.batch_size ) - iterator = split_df(group, num_chunks) + iterator = pt.model.split_df(group, batch_size=self.batch_size) query_dfs.append( pd.concat([self.fn(chunk_df) for chunk_df in iterator]) ) except Exception as a: raise Exception("Problem applying %s for qid %s" % (self.fn, lastqid)) from a if self.add_ranks: try: - query_dfs = [add_ranks(df, single_query=True) for df in query_dfs] + query_dfs = [pt.model.add_ranks(df, single_query=True) for df in query_dfs] except KeyError as ke: suffix = 'Try setting add_ranks=False' if len(query_dfs) > 0 and 'score' not in query_dfs[0].columns: @@ -179,33 +175,37 @@ def transform(self, res): raise ValueError("Cannot apply add_ranks in pt.apply.by_query - " + suffix) from ke rtr = pd.concat(query_dfs) return rtr - -class ApplyIterForEachQuery(ApplyTransformerBase): - def __init__(self, fn, *args, add_ranks=False, batch_size=None, **kwargs): + + +class ApplyIterForEachQuery(pt.Transformer): + def __init__(self, + fn: Callable[[pt.model.IterDict], pt.model.IterDict], + *, + batch_size=None): """ - Arguments: - - fn (Callable): Takes as input a panda Series for a row representing that document, and returns the new float doument score + Instantiates a ApplyIterForEachQuery. + + Arguments: + fn: Takes as input an IterDict of dictionaries representing all results for a query and returns a transformed IterDict + batch_size: The number of results per query to process at once. If None, processes in one batch per query. """ - super().__init__(fn, *args, **kwargs) - self.add_ranks = add_ranks + self.fn = fn self.batch_size = batch_size - assert not kwargs.get("verbose", False), "verbose not supported" - + def __repr__(self): return "pt.apply.by_query()" - - def transform_iter(self, input): - from itertools import groupby - from more_itertools import ichunked + + def transform_iter(self, inp: pt.model.IterDict) -> pt.model.IterDict: if self.batch_size is not None: - for _, group in groupby(input, key=lambda row: row['qid']): - for batch in ichunked(group, self.batch_size): + for _, group in itertools.groupby(inp, key=lambda row: row['qid']): + for batch in more_itertools.ichunked(group, self.batch_size): yield from self.fn(batch) else: - for _, group in groupby(input, key=lambda row: row['qid']): + for _, group in itertools.groupby(inp, key=lambda row: row['qid']): yield from self.fn(group) -class ApplyDocumentScoringTransformer(ApplyTransformerBase): + +class ApplyDocumentScoringTransformer(pt.Transformer): """ Implements a transformer that can apply a function to perform document scoring. The supplied function should take as input one row, and return a float for the score of the document. @@ -226,56 +226,62 @@ def _doclen(df): pipe = pt.terrier.Retriever(index) >> pt.apply.doc_score(_doclen) """ - def __init__(self, fn, *args, batch_size=None, **kwargs): + def __init__(self, + fn: Union[ + Callable[[pd.Series], float], + Callable[[pd.DataFrame], Iterable[float]], + ], + *, + batch_size: Optional[int] = None, + verbose: bool = False, + ): """ - Arguments: - - fn (Callable): Takes as input a panda Series for a row representing that document, and returns the new float doument score. If batch_size is set, - takes a dataframe, and returns a sequence of floats representing scores for those documents. - - batch_size (int or None). How many documents to operate on at once. If None, operates row-wise + Arguments: + fn: Either takes a panda Series for a row (representing each document in the result set), and returns the + new float doument score. Or, if batch_size is set, takes a DataFrame, and returns a sequence of floats + representing scores for those documents. + batch_size: How many documents to operate on at once. If None, operates row-wise. + verbose: Whether to display a progress bar """ - super().__init__(fn, *args, **kwargs) + self.fn = fn self.batch_size = batch_size + self.verbose = verbose def __repr__(self): return "pt.apply.doc_score()" def _transform_rowwise(self, outputRes): - fn = self.fn - if len(outputRes) == 0: - outputRes["score"] = pd.Series(dtype='float64') - return outputRes if self.verbose: pt.tqdm.pandas(desc="pt.apply.doc_score", unit="d") - outputRes["score"] = outputRes.progress_apply(fn, axis=1).astype('float64') + outputRes["score"] = outputRes.progress_apply(self.fn, axis=1).astype('float64') else: - outputRes["score"] = outputRes.apply(fn, axis=1).astype('float64') - outputRes = add_ranks(outputRes) + outputRes["score"] = outputRes.apply(self.fn, axis=1).astype('float64') + outputRes = pt.model.add_ranks(outputRes) return outputRes - + def _transform_batchwise(self, outputRes): - fn = self.fn - outputRes["score"] = fn(outputRes) + outputRes["score"] = self.fn(outputRes) outputRes["score"] = outputRes["score"].astype('float64') return outputRes - - def transform(self, inputRes): - outputRes = inputRes.copy() + + def transform(self, inp: pd.DataFrame) -> pd.DataFrame: + outputRes = inp.copy() if len(outputRes) == 0: outputRes["score"] = pd.Series(dtype='float64') - return add_ranks(outputRes) + return pt.model.add_ranks(outputRes) + if self.batch_size is None: - return self._transform_rowwise(inputRes) + return self._transform_rowwise(outputRes) - import math - from .model import split_df - num_chunks = math.ceil( len(inputRes) / self.batch_size ) - iterator = split_df(inputRes, num_chunks) - iterator = pt.tqdm(iterator, desc="pt.apply", unit='row') + iterator = pt.model.split_df(outputRes, batch_size=self.batch_size) + if self.verbose: + iterator = pt.tqdm(iterator, desc="pt.apply", unit='row') rtr = pd.concat([self._transform_batchwise(chunk_df) for chunk_df in iterator]) - rtr = add_ranks(rtr) + rtr = pt.model.add_ranks(rtr) return rtr -class ApplyDocFeatureTransformer(ApplyTransformerBase): + +class ApplyDocFeatureTransformer(pt.Transformer): """ Implements a transformer that can apply a function to perform feature scoring. The supplied function should take as input one row, and return a numpy array for the features of the document. @@ -287,27 +293,34 @@ def _feature_fn(row): pipe = pt.terrier.Retriever(index) >> pt.apply.doc_features(_feature_fn) >> pt.LTRpipeline(xgBoost()) """ - def __init__(self, fn, *args, **kwargs): + def __init__(self, + fn: Callable[[pd.Series, pt.model.IterDictRecord], np.array], + *, + verbose: bool = False + ): """ - Arguments: - - fn (Callable): Takes as input a panda Series for a row representing that document, and returns a new numpy array representing the features of that document + Arguments: + fn: Takes as input a panda Series for a row representing that document, and returns a new numpy array representing the features of that document + verbose: Whether to display a progress bar """ - super().__init__(fn, *args, **kwargs) + self.fn = fn + self.verbose = verbose def __repr__(self): return "pt.apply.doc_features()" - def transform_iter(self, iterdict): - fn = self.fn + def transform_iter(self, inp: pt.model.IterDict) -> pt.model.IterDict: # we assume that the function can take a dictionary as well as a pandas.Series. As long as [""] notation is used # to access fields, both should work - for row in pt.tqdm(iterdict, desc="pt.apply.doc_features") if self.verbose else iterdict: + if self.verbose: + inp = pt.tqdm(inp, desc="pt.apply.doc_features") + for row in inp: row["features"] = self.fn(row) yield row - def transform(self, inputRes): + def transform(self, inp: pd.DataFrame) -> pd.DataFrame: fn = self.fn - outputRes = inputRes.copy() + outputRes = inp.copy() if self.verbose: pt.tqdm.pandas(desc="pt.apply.doc_features", unit="d") outputRes["features"] = outputRes.progress_apply(fn, axis=1) @@ -315,7 +328,8 @@ def transform(self, inputRes): outputRes["features"] = outputRes.apply(fn, axis=1) return outputRes -class ApplyQueryTransformer(ApplyTransformerBase): + +class ApplyQueryTransformer(pt.Transformer): """ Implements a query rewriting transformer by passing a function to perform the rewriting. The function should take as input one row, and return the string form of the new query. @@ -334,43 +348,48 @@ def _rewriting_fn(row): In the resulting dataframe, the previous query for each row can be found in the query_0 column. """ - def __init__(self, fn, *args, **kwargs): + def __init__(self, + fn: Callable[[Union[pd.Series, pt.model.IterDictRecord]], str], + *, + verbose: bool = False + ): """ - Arguments: - - fn (Callable): Takes as input a panda Series for a row representing a query, and returns the new string query - - verbose (bool): Display a tqdm progress bar for this transformer + Arguments: + fn: Takes as input a panda Series for a row representing a query, and returns the new string query + verbose: Display a tqdm progress bar for this transformer """ - super().__init__(fn, *args, **kwargs) + self.fn = fn + self.verbose = verbose def __repr__(self): return "pt.apply.query()" - - def transform_iter(self, iterdict): - fn = self.fn + + def transform_iter(self, inp: pt.model.IterDict) -> pt.model.IterDict: # we assume that the function can take a dictionary as well as a pandas.Series. As long as [""] notation is used # to access fields, both should work - for row in pt.tqdm(iterdict, desc="pt.apply.query") if self.verbose else iterdict: + if self.verbose: + inp = pt.tqdm(inp, desc="pt.apply.query") + for row in inp: + row = row.copy() if "query" in row: - pass + raise NotImplementedError('TODO: implement push_queries for iter-dict') # we only push if a query already exists # TODO implement push_queries for iter-dict row["query"] = self.fn(row) yield row - def transform(self, inputRes): - from .model import push_queries - fn = self.fn - if "query" in inputRes.columns: + def transform(self, inp: pd.DataFrame) -> pd.DataFrame: + if "query" in inp.columns: # we only push if a query already exists - outputRes = push_queries(inputRes.copy(), inplace=True, keep_original=True) + outputRes = pt.model.push_queries(inp.copy(), inplace=True, keep_original=True) else: - outputRes = inputRes.copy() + outputRes = inp.copy() try: if self.verbose: pt.tqdm.pandas(desc="pt.apply.query", unit="d") - outputRes["query"] = outputRes.progress_apply(fn, axis=1) + outputRes["query"] = outputRes.progress_apply(self.fn, axis=1) else: - outputRes["query"] = outputRes.apply(fn, axis=1) + outputRes["query"] = outputRes.apply(self.fn, axis=1) except ValueError as ve: msg = str(ve) if "Columns must be same length as key" in msg: @@ -379,7 +398,8 @@ def transform(self, inputRes): raise ve return outputRes -class ApplyGenericTransformer(ApplyTransformerBase): + +class ApplyGenericTransformer(pt.Transformer): """ Allows arbitrary pipelines components to be written as functions. The function should take as input a dataframe, and return a new dataframe. The function should abide by the main contracual obligations, @@ -399,33 +419,39 @@ class ApplyGenericTransformer(ApplyTransformerBase): """ - def __init__(self, fn, *args, batch_size=None, **kwargs): + def __init__(self, + fn: Callable[[pd.DataFrame], pd.DataFrame], + *, + batch_size: Optional[int] = None, + verbose: bool = False + ): """ - Arguments: - - fn (Callable): Takes as input a panda DataFrame, and returns a new Pandas DataFrame + Arguments: + fn: Takes as input a panda DataFrame, and returns a new Pandas DataFrame + batch_size: The number of rows to process at once. If None, processes in one batch. + verbose: When in batch model, display a tqdm progress bar """ - super().__init__(fn, *args, **kwargs) + self.fn = fn self.batch_size = batch_size + self.verbose = verbose def __repr__(self): return "pt.apply.generic()" - def transform(self, inputRes): + def transform(self, inp: pd.DataFrame) -> pd.DataFrame: # no batching if self.batch_size is None: - return self.fn(inputRes) + return self.fn(inp) # batching - from pyterrier.model import split_df - num_chunks = math.ceil( len(inputRes) / self.batch_size ) - iterator = split_df(inputRes, num_chunks) + iterator = pt.model.split_df(inp, batch_size=self.batch_size) if self.verbose: iterator = pt.tqdm(iterator, desc="pt.apply", unit='row') rtr = pd.concat([self.fn(chunk_df) for chunk_df in iterator]) return rtr -class ApplyGenericIterTransformer(ApplyTransformerBase): +class ApplyGenericIterTransformer(pt.Transformer): """ As per ApplyGenericTransformer, but implements transform_iter(), not transform(). The supplied function @@ -437,35 +463,38 @@ class ApplyGenericIterTransformer(ApplyTransformerBase): variants. """ - def __init__(self, fn, *args, batch_size=None, **kwargs): + def __init__(self, + fn: Callable[[pt.model.IterDict], pt.model.IterDict], + *, + batch_size: Optional[int] = None + ): """ - Arguments: - - fn (Callable): Takes as input a panda DataFrame, and returns a new Pandas DataFrame + Arguments: + fn: Takes as input a panda DataFrame, and returns a new Pandas DataFrame + batch_size: The number of rows to process at once. If None, processes in one batch. """ - super().__init__(fn, *args, **kwargs) + self.fn = fn self.batch_size = batch_size - assert not kwargs.get("verbose", False), "verbose not supported" def __repr__(self): return "pt.apply.generic()" def transform_iter(self, inp: pt.model.IterDict) -> pt.model.IterDict: - from more_itertools import ichunked if self.batch_size is None: # no batching yield from self.fn(inp) else: - for batch in ichunked(inp, self.batch_size): + for batch in more_itertools.ichunked(inp, self.batch_size): yield from self.fn(batch) -class ApplyIndexer(Indexer): + +class ApplyIndexer(pt.Indexer): """ Allows arbitrary indexer pipelines components to be written as functions. """ - def __init__(self, fn, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, fn: Callable[[pt.model.IterDict], Any]): self.fn = fn def index(self, iter_dict): - return self.fn(iter_dict) \ No newline at end of file + return self.fn(iter_dict) diff --git a/pyterrier/model.py b/pyterrier/model.py index 27accbd5..a1287cec 100644 --- a/pyterrier/model.py +++ b/pyterrier/model.py @@ -1,6 +1,7 @@ +import math import numpy as np import pandas as pd -from typing import Any, Dict, Iterable, List, Sequence +from typing import Any, Dict, Iterable, List, Sequence, Optional # This file has useful methods for using the Pyterrier Pandas datamodel @@ -207,11 +208,18 @@ def coerce_dataframe_types(dataframe): return dataframe -def split_df(df : pd.DataFrame, N) -> List[pd.DataFrame]: +def split_df(df : pd.DataFrame, N: Optional[int] = None, *, batch_size: Optional[int] = None) -> List[pd.DataFrame]: """ - splits a dataframe into N different chunks. Splitting will be sensitive to the primary datatype + Splits a dataframe into N different chunks. Splitting will be sensitive to the primary datatype of the dataframe (Q,R,D). + + Either ``N`` (the number of chunks) or ``batch_size`` (the size of each chunk) should be provided (but not both). """ + assert (N is None) != (batch_size is None), "Either N or batch_size should be provided (and not both)" + + if N is None: + N = math.ceil(len(df) / batch_size) + type = None if "qid" in df.columns: if "docno" in df.columns: @@ -222,8 +230,6 @@ def split_df(df : pd.DataFrame, N) -> List[pd.DataFrame]: type = "D" else: raise ValueError("Dataframe is not of type D,Q,R") - - from math import ceil def chunks(df, n): """Yield successive n-sized chunks from df.""" @@ -231,13 +237,13 @@ def chunks(df, n): yield df.iloc[ i: min(len(df),i + n)] if type == "Q" or type == "D": - splits = list( chunks(df, ceil(len(df)/N))) + splits = list( chunks(df, math.ceil(len(df)/N))) return splits rtr = [] grouper = df.groupby("qid") this_group = [] - chunk_size = ceil(len(grouper)/N) + chunk_size = math.ceil(len(grouper)/N) for qid, group in grouper: this_group.append(group) if len(this_group) == chunk_size: