Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add iterrows() and itertuples() DataFrame API, its usage is similar to pandas #380

Merged
merged 11 commits into from
Aug 20, 2021
6 changes: 6 additions & 0 deletions docs/sphinx/reference/api/eland.DataFrame.iterrows.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
eland.DataFrame.iterrows
========================

.. currentmodule:: eland

.. automethod:: DataFrame.iterrows
6 changes: 6 additions & 0 deletions docs/sphinx/reference/api/eland.DataFrame.itertuples.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
eland.DataFrame.itertuples
==========================

.. currentmodule:: eland

.. automethod:: DataFrame.itertuples
2 changes: 2 additions & 0 deletions docs/sphinx/reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ Indexing, Iteration
DataFrame.get
DataFrame.query
DataFrame.sample
DataFrame.iterrows
DataFrame.itertuples

Function Application, GroupBy & Window
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
117 changes: 116 additions & 1 deletion eland/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import sys
import warnings
from io import StringIO
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple, Union
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd # type: ignore
Expand Down Expand Up @@ -1446,6 +1446,121 @@ def keys(self) -> pd.Index:
"""
return self.columns

def iterrows(self) -> Iterable[Tuple[Union[str, Tuple[str, ...]], pd.Series]]:
"""
Iterate over eland.DataFrame rows as (index, pandas.Series) pairs.

Yields
------
index: index
The index of the row.
data: pandas Series
The data of the row as a pandas Series.

See Also
--------
eland.DataFrame.itertuples: Iterate over eland.DataFrame rows as namedtuples.

Examples
--------
>>> df = ed.DataFrame('localhost:9200', 'flights', columns=['AvgTicketPrice', 'Cancelled']).head()
>>> df
AvgTicketPrice Cancelled
0 841.265642 False
1 882.982662 False
2 190.636904 False
3 181.694216 True
4 730.041778 False
<BLANKLINE>
[5 rows x 2 columns]

>>> for index, row in df.iterrows():
... print(row)
AvgTicketPrice 841.265642
Cancelled False
Name: 0, dtype: object
AvgTicketPrice 882.982662
Cancelled False
Name: 1, dtype: object
AvgTicketPrice 190.636904
Cancelled False
Name: 2, dtype: object
AvgTicketPrice 181.694216
Cancelled True
Name: 3, dtype: object
AvgTicketPrice 730.041778
Cancelled False
Name: 4, dtype: object
"""
for df in self._query_compiler.search_yield_pandas_dataframes():
yield from df.iterrows()

def itertuples(
self, index: bool = True, name: Union[str, None] = "Eland"
) -> Iterable[Tuple[Any, ...]]:
"""
Iterate over eland.DataFrame rows as namedtuples.

Args
----
index: bool, default True
If True, return the index as the first element of the tuple.
name: str or None, default "Eland"
The name of the returned namedtuples or None to return regular tuples.

Returns
-------
iterator
An object to iterate over namedtuples for each row in the
DataFrame with the first field possibly being the index and
following fields being the column values.

See Also
--------
eland.DataFrame.iterrows: Iterate over eland.DataFrame rows as (index, pandas.Series) pairs.

Examples
--------
>>> df = ed.DataFrame('localhost:9200', 'flights', columns=['AvgTicketPrice', 'Cancelled']).head()
>>> df
AvgTicketPrice Cancelled
0 841.265642 False
1 882.982662 False
2 190.636904 False
3 181.694216 True
4 730.041778 False
<BLANKLINE>
[5 rows x 2 columns]

>>> for row in df.itertuples():
... print(row)
Eland(Index='0', AvgTicketPrice=841.2656419677076, Cancelled=False)
Eland(Index='1', AvgTicketPrice=882.9826615595518, Cancelled=False)
Eland(Index='2', AvgTicketPrice=190.6369038508356, Cancelled=False)
Eland(Index='3', AvgTicketPrice=181.69421554118, Cancelled=True)
Eland(Index='4', AvgTicketPrice=730.041778346198, Cancelled=False)

By setting the `index` parameter to False we can remove the index as the first element of the tuple:
>>> for row in df.itertuples(index=False):
... print(row)
Eland(AvgTicketPrice=841.2656419677076, Cancelled=False)
Eland(AvgTicketPrice=882.9826615595518, Cancelled=False)
Eland(AvgTicketPrice=190.6369038508356, Cancelled=False)
Eland(AvgTicketPrice=181.69421554118, Cancelled=True)
Eland(AvgTicketPrice=730.041778346198, Cancelled=False)

With the `name` parameter set we set a custom name for the yielded namedtuples:
>>> for row in df.itertuples(name='Flight'):
... print(row)
Flight(Index='0', AvgTicketPrice=841.2656419677076, Cancelled=False)
Flight(Index='1', AvgTicketPrice=882.9826615595518, Cancelled=False)
Flight(Index='2', AvgTicketPrice=190.6369038508356, Cancelled=False)
Flight(Index='3', AvgTicketPrice=181.69421554118, Cancelled=True)
Flight(Index='4', AvgTicketPrice=730.041778346198, Cancelled=False)
"""
for df in self._query_compiler.search_yield_pandas_dataframes():
yield from df.itertuples(index=index, name=name)

def aggregate(
self,
func: Union[str, List[str]],
Expand Down
30 changes: 30 additions & 0 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,36 @@ def to_csv(
df = self._es_results(query_compiler, show_progress)
return df.to_csv(**kwargs) # type: ignore[no-any-return]

def search_yield_pandas_dataframes(
kxbin marked this conversation as resolved.
Show resolved Hide resolved
self, query_compiler: "QueryCompiler"
) -> Generator["pd.DataFrame", None, None]:
query_params, post_processing = self._resolve_tasks(query_compiler)

result_size, sort_params = Operations._query_params_to_size_and_sort(
query_params
)

script_fields = query_params.script_fields
query = Query(query_params.query)

body = query.to_search_body()
if script_fields is not None:
body["script_fields"] = script_fields

# Only return requested field_names and add them to body
_source = query_compiler.get_field_names(include_scripted_fields=False)
body["_source"] = _source if _source else False

if sort_params:
body["sort"] = [sort_params]

for hits in _search_yield_hits(
query_compiler=query_compiler, body=body, max_number_of_hits=result_size
):
df = query_compiler._es_results_to_pandas(hits)
df = self._apply_df_post_processing(df, post_processing)
yield df

def _es_results(
self, query_compiler: "QueryCompiler", show_progress: bool = False
) -> pd.DataFrame:
Expand Down
4 changes: 4 additions & 0 deletions eland/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TYPE_CHECKING,
Any,
Dict,
Generator,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -527,6 +528,9 @@ def to_csv(self, **kwargs) -> Optional[str]:
"""
return self._operations.to_csv(self, **kwargs)

def search_yield_pandas_dataframes(self) -> Generator["pd.DataFrame", None, None]:
return self._operations.search_yield_pandas_dataframes(self)

# __getitem__ methods
def getitem_column_array(self, key, numeric=False):
"""Get column data for target labels.
Expand Down
68 changes: 68 additions & 0 deletions tests/dataframe/test_iterrows_itertuples_pytest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# File called _pytest for PyCharm compatability

from pandas.testing import assert_index_equal, assert_series_equal

from tests.common import TestData


class TestDataFrameIterrowsItertuples(TestData):
def test_iterrows(self):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()

ed_flights_iterrows = ed_flights.iterrows()
pd_flights_iterrows = pd_flights.iterrows()

assert len(list(ed_flights_iterrows)) == len(list(pd_flights_iterrows))

for ed_index, ed_row in ed_flights_iterrows:
kxbin marked this conversation as resolved.
Show resolved Hide resolved

pd_index, pd_row = next(pd_flights_iterrows)

assert_index_equal(ed_index, pd_index)
assert_series_equal(ed_row, pd_row)

for pd_index, pd_row in pd_flights_iterrows:
kxbin marked this conversation as resolved.
Show resolved Hide resolved

ed_index, ed_row = next(ed_flights_iterrows)

assert_index_equal(pd_index, ed_index)
assert_series_equal(pd_row, ed_row)

def test_itertuples(self):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()

ed_flights_itertuples = ed_flights.itertuples(name=None)
pd_flights_itertuples = pd_flights.itertuples(name=None)

assert len(list(ed_flights_itertuples)) == len(list(pd_flights_itertuples))

for ed_row in ed_flights_itertuples:
kxbin marked this conversation as resolved.
Show resolved Hide resolved

pd_row = next(pd_flights_itertuples)

assert ed_row == pd_row

for pd_row in pd_flights_itertuples:

ed_row = next(ed_flights_itertuples)

assert pd_row == ed_row