Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

addresses #350 by adding batching operations to apply.generic() and apply.by_query() #351

Merged
merged 1 commit into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pyterrier/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def rename(columns : Dict[str,str], *args, **kwargs) -> Transformer:
"""
return ApplyGenericTransformer(lambda df: df.rename(columns=columns), *args, **kwargs)

def generic(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, **kwargs) -> Transformer:
def generic(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, batch_size=None, **kwargs) -> Transformer:
"""
Create a transformer that changes the input dataframe to another dataframe in an unspecified way.

Expand All @@ -140,6 +140,8 @@ def generic(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, **kwargs) -> Tra

Arguments:
fn(Callable): the function to apply to each row
batch_size(int or None): whether to apply fn on batches of rows or all that are received
verbose(bool): Whether to display a progress bar over batches (only used if batch_size is set).

Example::

Expand All @@ -149,13 +151,15 @@ def generic(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, **kwargs) -> Tra
pipe = pt.BatchRetrieve(index) >> pt.apply.generic(lambda res : res[res["rank"] < 2])

"""
return ApplyGenericTransformer(fn, *args, **kwargs)
return ApplyGenericTransformer(fn, *args, batch_size=batch_size, **kwargs)

def by_query(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, **kwargs) -> Transformer:
def by_query(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, batch_size=None, **kwargs) -> Transformer:
"""
As `pt.apply.generic()` except that fn receives a dataframe for one query at at time, rather than all results at once.
If batch_size is set, fn will receive no more than batch_size documents for any query. The verbose kwargs controls whether
to display a progress bar over queries.
"""
return ApplyForEachQuery(fn, *args, **kwargs)
return ApplyForEachQuery(fn, *args, batch_size=batch_size, **kwargs)

class _apply:

Expand Down
44 changes: 37 additions & 7 deletions pyterrier/apply_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,42 @@ def __repr__(self):
return "pt.apply.??()"

class ApplyForEachQuery(ApplyTransformerBase):
def __init__(self, fn, *args, add_ranks=True, **kwargs):
def __init__(self, fn, *args, add_ranks=True, batch_size=None, **kwargs):
"""
Arguments:
- fn (Callable): Takes as input a panda Series for a row representing that document, and returns the new float doument score
"""
super().__init__(fn, *args, **kwargs)
self.add_ranks = add_ranks
self.batch_size = batch_size

def transform(self, res):
if len(res) == 0:
return self.fn(res)

import math, pandas as pd
from pyterrier.model import split_df
from . import tqdm

it = res.groupby("qid")
if self.verbose:
it = tqdm(it, unit='query')
try:
dfs = [self.fn(group) for qid, group in it]
if self.batch_size is None:
query_dfs = [self.fn(group) for qid, group in it]
else:
# fn cannot be applied to more than batch_size rows at once
# so we must split and reconstruct the output FOR EACH QUERY
query_dfs = []
for qid, group in it:

num_chunks = math.ceil( len(group) / self.batch_size )
iterator = split_df(group, num_chunks)
query_dfs.append( pd.concat([self.fn(chunk_df) for chunk_df in iterator]) )

if self.add_ranks:
dfs = [add_ranks(df, single_query=True) for df in dfs]
rtr = pd.concat(dfs)
query_dfs = [add_ranks(df, single_query=True) for df in query_dfs]
rtr = pd.concat(query_dfs)
except Exception as a:
raise Exception("Problem applying %s" % self.fn) from a
return rtr
Expand Down Expand Up @@ -187,14 +204,27 @@ class ApplyGenericTransformer(ApplyTransformerBase):

"""

def __init__(self, fn, *args, **kwargs):
def __init__(self, fn, *args, batch_size=None, **kwargs):
"""
Arguments:
- fn (Callable): Takes as input a panda DataFrame, and returns a new Pandas DataFrame
"""
super().__init__(fn, *args, **kwargs)
self.batch_size = batch_size

def transform(self, inputRes):
fn = self.fn
return fn(inputRes)
# no batching
if self.batch_size is None:
return self.fn(inputRes)

# batching
import math, pandas as pd
from pyterrier.model import split_df
from . import tqdm
num_chunks = math.ceil( len(inputRes) / self.batch_size )
iterator = split_df(inputRes, num_chunks)
if self.verbose:
iterator = tqdm(iterator, desc="pt.apply", unit='row')
rtr = pd.concat([self.fn(chunk_df) for chunk_df in iterator])
return rtr

50 changes: 50 additions & 0 deletions tests/test_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,56 @@ def _inc_score(res):

outputDfEmpty = p(inputDf.head(0))

def test_by_query_apply_batch(self):
# same as test_by_query_apply, but batch_size is set.
inputDf = pt.new.ranked_documents([[1], [2]], qid=["1", "2"])
def _inc_score(res):
if len(res) == 0:
return res
res = res.copy()
res["score"] = res["score"] + int(res.iloc[0]["qid"])
return res
p = pt.apply.by_query(_inc_score, batch_size=1)
outputDf = p(inputDf)
self.assertEqual(outputDf.iloc[0]["qid"], "1")
self.assertEqual(outputDf.iloc[0]["score"], 2)
self.assertEqual(outputDf.iloc[1]["qid"], "2")
self.assertEqual(outputDf.iloc[1]["score"], 4)

outputDfEmpty = p(inputDf.head(0))

def test_generic(self):
inputDf = pt.new.ranked_documents([[1], [2]], qid=["1", "2"])
def _fn1(df):
df = df.copy()
df["score"] = df["score"] * 2
return df
for i, t in enumerate([
pt.apply.generic(_fn1),
pt.apply.generic(_fn1, batch_size=1)
]):
outputDf = t(inputDf)
self.assertEqual(2, len(outputDf))
self.assertEqual(outputDf.iloc[0]["qid"], "1")
self.assertEqual(outputDf.iloc[0]["score"], 2)
self.assertEqual(outputDf.iloc[1]["qid"], "2")
self.assertEqual(outputDf.iloc[1]["score"], 4)

def _fn2(df):
df = df.copy()
df["score"] = len(df)
return df
t1 = pt.apply.generic(_fn2)
t2 = pt.apply.generic(_fn2, batch_size=1)
outputDf1 = t1(inputDf)
outputDf2 = t2(inputDf)
self.assertEqual(2, len(outputDf1))
self.assertEqual(2, len(outputDf2))
# batch is the entire dataframe, ie 2 rows
self.assertEqual(2, outputDf1.iloc[0]["score"])
# batch is a one row dataframe
self.assertEqual(1, outputDf2.iloc[0]["score"])

def test_docscore_apply(self):
p = pt.apply.doc_score(lambda doc_row: len(doc_row["text"]))
testDF = pd.DataFrame([["q1", "hello", "d1", "aa"]], columns=["qid", "query", "docno", "text"])
Expand Down