Skip to content

Commit

Permalink
text_loader abstraction for pt.text.get_text (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmacavaney authored Aug 23, 2024
1 parent 3ec276d commit a537597
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 103 deletions.
81 changes: 80 additions & 1 deletion pyterrier/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pandas as pd
from .transformer import is_lambda
import types
from typing import Union, Tuple, Iterator, Dict, Any, List
from typing import Union, Tuple, Iterator, Dict, Any, List, Literal
from warnings import warn
import requests
from .io import autoopen, touch
Expand Down Expand Up @@ -539,6 +539,85 @@ def info_url(self):
def __repr__(self):
return f"IRDSDataset({repr(self._irds_id)})"

def text_loader(
self,
fields: Union[List[str], str, Literal['*']] = '*',
*,
verbose: bool = False,
) -> pt.Transformer:
"""Create a transformer that loads text fields from an ir_datasets dataset into a DataFrame by docno.
Args:
fields: The fields to load from the dataset. If '*', all fields will be loaded.
verbose: Whether to print debug information.
"""
return IRDSTextLoader(self, fields, verbose=verbose)


class IRDSTextLoader(pt.Transformer):
"""A transformer that loads text fields from an ir_datasets dataset into a DataFrame by docno."""
def __init__(
self,
dataset: IRDSDataset,
fields: Union[List[str], str, Literal['*']] = '*',
*,
verbose=False
):
"""Initialise the transformer with the index to load metadata from.
Args:
dataset: The dataset to load text from.
fields: The fields to load from the dataset. If '*', all fields will be loaded.
verbose: Whether to print debug information.
"""
if not dataset.irds_ref().has_docs():
raise ValueError(f"Dataset {dataset} does not provide docs")
docs_cls = dataset.irds_ref().docs_cls()

available_fields = [f for f in docs_cls._fields if f != 'doc_id' and docs_cls.__annotations__[f] is str]
if fields == '*':
fields = available_fields
else:
if isinstance(fields, str):
fields = [fields]
missing_fields = set(fields) - set(available_fields)
if missing_fields:
raise ValueError(f"Dataset {dataset} did not have requested metaindex keys {list(missing_fields)}. "
f"Keys present in metaindex are {available_fields}")

self.dataset = dataset
self.fields = fields
self.verbose = verbose

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
"""Load text fields from the dataset into the input DataFrame.
Args:
inp: The input DataFrame. Must contain 'docno'.
Returns:
A new DataFrame with the text columns appended.
"""
if 'docno' not in inp.columns:
raise ValueError(f"input missing 'docno' column, available columns: {list(inp.columns)}")
irds = self.dataset.irds_ref()
docstore = irds.docs_store()
docnos = inp.docno.values.tolist()

# Load the new data
fields = ['doc_id'] + self.fields
set_docnos = set(docnos)
it = (tuple(getattr(doc, f) for f in fields) for doc in docstore.get_many_iter(set_docnos))
if self.verbose:
it = pd.tqdm(it, unit='d', total=len(set_docnos), desc='IRDSTextLoader')
metadata = pd.DataFrame(list(it), columns=fields).set_index('doc_id')
metadata_frame = metadata.loc[docnos].reset_index(drop=True)

# append the input and metadata frames
inp = inp.drop(columns=self.fields, errors='ignore') # make sure we don't end up with duplicates
inp = inp.reset_index(drop=True) # reset the index to default (matching metadata_frame)
return pd.concat([inp, metadata_frame], axis='columns')


def passage_generate(dataset):
for filename in dataset.get_corpus():
Expand Down
2 changes: 2 additions & 0 deletions pyterrier/terrier/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# java stuff
from pyterrier.terrier import java
from pyterrier.terrier._text_loader import TerrierTextLoader, terrier_text_loader
from pyterrier.terrier.java import configure, set_version, set_helper_version, set_prf_version, extend_classpath, J, set_property, set_properties, run, version, check_version, check_helper_version
from pyterrier.terrier.retriever import Retriever, FeaturesRetriever, TextScorer
from pyterrier.terrier.index_factory import IndexFactory
Expand Down Expand Up @@ -55,4 +56,5 @@ def from_dataset(*args, **kwargs):
# misc
'TerrierStemmer', 'TerrierStopwords', 'TerrierTokeniser',
'IndexFactory', 'set_property', 'set_properties', 'run',
'TerrierTextLoader', 'terrier_text_loader',
]
90 changes: 90 additions & 0 deletions pyterrier/terrier/_text_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import List, Union, Literal

import pandas as pd
import pyterrier as pt


class TerrierTextLoader(pt.Transformer):
"""A transformer that loads textual metadata from a Terrier index into a DataFrame by docid or docno."""
def __init__(
self,
index,
fields: Union[List[str], str, Literal['*']] = '*',
*,
verbose=False
):
"""Initialise the transformer with the index to load metadata from.
Args:
index (pyterrier.terrier.J.Index): The index to load metadata from.
fields: The fields to load from the index. If '*', all fields will be loaded.
verbose: Whether to print debug information.
"""
metaindex = index.getMetaIndex()

if metaindex is None:
raise ValueError(f"Index {index} does not have a metaindex")

available_fields = list(metaindex.getKeys())
if fields == '*':
fields = available_fields
else:
if isinstance(fields, str):
fields = [fields]
missing_fields = set(fields) - set(available_fields)
if missing_fields:
raise ValueError(f"Index from {index} did not have requested metaindex keys {list(missing_fields)}. "
f"Keys present in metaindex are {available_fields}")
self._index = index
self.metaindex = metaindex
self.fields = fields
self.verbose = verbose

def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
"""Load metadata from the index into the input DataFrame.
Args:
inp: The input DataFrame. Must contain either 'docid' or 'docno'.
Returns:
A new DataFrame with the metadata columns appended.
"""
if 'docno' not in inp.columns and 'docid' not in inp.columns:
raise ValueError(f"Neither docid nor docno are in the input dataframe, found {list(inp.columns)}")

# Get the docids
if "docid" not in inp.columns:
# Look up docids by docno
docids = inp.docno.map(lambda docno: self.metaindex.getDocument("docno", docno))
else:
# Use the provided docids
docids = inp.docid

# Look up the metadata and build a new frame to append
docids = docids.values.tolist() # getItems expects a list
metadata_matrix = self.metaindex.getItems(self.fields, docids) # indexed by docid then keys
metadata_frame = pd.DataFrame(metadata_matrix, columns=self.fields)

# append the input and metadata frames
inp = inp.drop(columns=self.fields, errors='ignore') # make sure we don't end up with duplicates
inp = inp.reset_index(drop=True) # reset the index to default (matching metadata_frame)
return pd.concat([inp, metadata_frame], axis='columns')


@pt.java.required
def terrier_text_loader(
index,
fields: Union[List[str], str, Literal['*']] = '*',
*,
verbose=False
) -> TerrierTextLoader:
"""Create a transformer that loads textual metadata from a Terrier index into a DataFrame by docid or docno.
Args:
index (str or pyterrier.terrier.J.IndexRef or pyterrier.terrier.J.Index): The index to load metadata from.
fields: The fields to load from the index. If '*', all fields will be loaded.
verbose: Whether to print debug information.
"""
if isinstance(index, (str, pt.terrier.J.IndexRef)):
index = pt.IndexFactory.of(index)
return TerrierTextLoader(index, fields, verbose=verbose)
4 changes: 3 additions & 1 deletion pyterrier/terrier/java.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def post_init(self, jnius):
jnius.protocol_map["org.terrier.querying.IndexRef"] = {
'__reduce__' : _index_ref_reduce,
'__getstate__' : lambda self : None,
'text_loader': pt.terrier.terrier_text_loader,
}

jnius.protocol_map["org.terrier.matching.models.WeightingModel"] = {
Expand All @@ -122,7 +123,8 @@ def post_init(self, jnius):
'__add__': _index_add,

# get_corpus_iter returns a yield generator that return {"docno": "d1", "toks" : {'a' : 1}}
'get_corpus_iter' : _index_corpusiter
'get_corpus_iter' : _index_corpusiter,
'text_loader': pt.terrier.terrier_text_loader,
}

self._post_init_index(jnius)
Expand Down
147 changes: 46 additions & 101 deletions pyterrier/text.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,52 @@
from pyterrier.datasets import IRDSDataset
import more_itertools
from collections import defaultdict
import re
import numpy as np
import pandas as pd
from typing import List, Union
from typing import Any, List, Union, Literal, Protocol, runtime_checkable
from warnings import warn
import pyterrier as pt

@runtime_checkable
class HasTextLoader(Protocol):
def text_loader(
self,
fields: Union[List[str], str, Literal['*']] = '*',
*,
verbose: bool = False,
) -> pt.Transformer:
"""
Returns a transformer that loads and populates text columns for each document in the
provided input frame.
Arguments:
fields: The names of the fields to load. If a list of strings, all fields are provided.
If a single string, this single field is provided. If the special value of '*' (default,
all available fields are provided.
verbose: Show a progress bar.
"""


@pt.java.required
def get_text(
indexlike,
metadata : Union[str,List[str]] = "body",
indexlike: Union[HasTextLoader, str],
metadata : Union[str,List[str], Literal['*']] = '*',
by_query : bool = False,
verbose : bool = False) -> pt.Transformer:
verbose : bool = False,
**kwargs: Any) -> pt.Transformer:
"""
A utility transformer for obtaining the text from the text of documents (or other document metadata) from Terrier's MetaIndex
or an IRDSDataset docstore.
Arguments:
indexlike: a Terrier index or IRDSDataset to retrieve the metadata from
metadata(list(str) or str): a list of strings of the metadata keys to retrieve from the index. Defaults to ["body"]
by_query(bool): whether the entire dataframe should be progressed at once, rather than one query at a time.
Defaults to false, which means that all document metadata will be fetched at once.
verbose(bool): whether to print a tqdm progress bar. Defaults to false. Has no effect when by_query=False
indexlike: an object that provides a .text_loader() factory method, such as a Terrier index or IRDSDataset.
If a ``str`` is provided, it will try to load a Terrier index from the provided path.
metadata: The names of the fields to load. If a list of strings, all fields are provided.
If a single string, this single field is provided. If the special value of '*' (default), all
available fields are provided.
by_query: whether the entire dataframe should be progressed at once, rather than one query at a time.
Defaults to false, which means that all document metadata will be fetched at once.
verbose: whether to print a tqdm progress bar. When by_query=True, prints progress by query. Otherwise,
the behaviour is defined by the provided ``indexlike``.
kwargs: other arguments to pass through to the text_loader.
Example::
Expand All @@ -33,97 +55,20 @@ def get_text(
>> pt.text.scorer(wmodel="DPH") )
"""
JIR = pt.java.autoclass('org.terrier.querying.IndexRef')
JI = pt.java.autoclass('org.terrier.structures.Index')

if isinstance(metadata, str):
metadata = [metadata]

if isinstance(indexlike, str) or isinstance(indexlike, JIR):
index = pt.IndexFactory.of(indexlike)
add_text_fn = _add_text_terrier_metaindex(index, metadata)
elif isinstance(indexlike, JI):
add_text_fn = _add_text_terrier_metaindex(indexlike, metadata)
elif isinstance(indexlike, IRDSDataset):
add_text_fn = _add_text_irds_docstore(indexlike, metadata)
else:
raise ValueError("indexlike %s of type %s not supported. Pass a string, an IndexRef, an Index, or an IRDSDataset" %
(str(indexlike), type(indexlike)))
if isinstance(indexlike, str):
# TODO: We'll need to decide how to handle this once terrier is split from core
# Maybe it should run Artifact.load(indexlike) instead?
indexlike = pt.IndexFactory.of(indexlike)

if not isinstance(indexlike, HasTextLoader):
raise ValueError('indexlike must provide a .text_loader() method.')

result = indexlike.text_loader(metadata, verbose=verbose and not by_query, **kwargs)

if by_query:
return pt.apply.by_query(add_text_fn, verbose=verbose)
return pt.apply.generic(add_text_fn)


def _add_text_terrier_metaindex(index, metadata):
metaindex = index.getMetaIndex()
if metaindex is None:
raise ValueError("Index %s does not have a metaindex" % str(index))

for k in metadata:
if not k in metaindex.getKeys():
raise ValueError("Index from %s did not have requested metaindex key %s. Keys present in metaindex are %s" %
(str(index), k, str( metaindex.getKeys()) ))

def add_docids(res):
res = res.copy()
res["docid"] = res.apply(lambda row: metaindex.getDocument("docno", row.docno), axis=1)
return res

def add_text_function_docids(res):
res = res.copy()
if len(res) == 0:
for k in metadata:
res[k] = pd.Series(dtype='object')
return res

docids = res.docid.values.tolist()
# indexed by docid then keys
allmeta = metaindex.getItems(metadata, docids)
# get transpose to make easier for insertion back into dataframe?
allmeta = np.array(allmeta).T
for i, k in enumerate(metadata):
res[k] = allmeta[i]
return res

def add_text_generic(res):
if not "docid" in res.columns:
assert "docno" in res.columns, "Neither docid nor docno are in the input dataframe, found %s" % (str(res.columns))
res = add_docids(res)
return add_text_function_docids(res)

return add_text_generic


def _add_text_irds_docstore(irds_dataset, metadata):
irds = irds_dataset.irds_ref()
assert irds.has_docs(), f"dataset {irds_dataset} doesn't provide docs"
docs_cls = irds.docs_cls()

for k in metadata:
if not k in docs_cls._fields:
raise ValueError(f"{irds_dataset} did not have requested field {k}. Keys present are {docs_cls._fields} (from {docs_cls})")
field_idx = [(f, docs_cls._fields.index(f)) for f in metadata]

docstore = irds.docs_store()

def add_text_function_docids(res):
assert 'docno' in res, "requires docno column"
res = res.copy()
docids = res.docno.values.tolist()
did2idxs = defaultdict(list)
for i, did in enumerate(docids):
did2idxs[did].append(i)
new_columns = {f: [None] * len(docids) for f in metadata}
for doc in docstore.get_many_iter(docids):
for didx in did2idxs[doc.doc_id]:
for f, fidx in field_idx:
new_columns[f][didx] = doc[fidx]
for k, v in new_columns.items():
res[k] = v
return res

return add_text_function_docids
result = pt.apply.by_query(result, verbose=verbose)

return result


def scorer(*args, **kwargs) -> pt.Transformer:
Expand Down

0 comments on commit a537597

Please sign in to comment.