diff --git a/docs/sphinx/reference/api/eland.DataFrame.iterrows.rst b/docs/sphinx/reference/api/eland.DataFrame.iterrows.rst new file mode 100644 index 00000000..2e3812ce --- /dev/null +++ b/docs/sphinx/reference/api/eland.DataFrame.iterrows.rst @@ -0,0 +1,6 @@ +eland.DataFrame.iterrows +======================== + +.. currentmodule:: eland + +.. automethod:: DataFrame.iterrows diff --git a/docs/sphinx/reference/api/eland.DataFrame.itertuples.rst b/docs/sphinx/reference/api/eland.DataFrame.itertuples.rst new file mode 100644 index 00000000..3c3959df --- /dev/null +++ b/docs/sphinx/reference/api/eland.DataFrame.itertuples.rst @@ -0,0 +1,6 @@ +eland.DataFrame.itertuples +========================== + +.. currentmodule:: eland + +.. automethod:: DataFrame.itertuples diff --git a/docs/sphinx/reference/dataframe.rst b/docs/sphinx/reference/dataframe.rst index 3a5c24be..2faf9dd5 100644 --- a/docs/sphinx/reference/dataframe.rst +++ b/docs/sphinx/reference/dataframe.rst @@ -38,6 +38,8 @@ Indexing, Iteration DataFrame.get DataFrame.query DataFrame.sample + DataFrame.iterrows + DataFrame.itertuples Function Application, GroupBy & Window ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/eland/dataframe.py b/eland/dataframe.py index 80352a6f..d579379c 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -19,7 +19,7 @@ import sys import warnings from io import StringIO -from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple, Union +from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Sequence, Tuple, Union import numpy as np import pandas as pd # type: ignore @@ -1446,6 +1446,121 @@ def keys(self) -> pd.Index: """ return self.columns + def iterrows(self) -> Iterable[Tuple[Union[str, Tuple[str, ...]], pd.Series]]: + """ + Iterate over eland.DataFrame rows as (index, pandas.Series) pairs. + + Yields + ------ + index: index + The index of the row. + data: pandas Series + The data of the row as a pandas Series. + + See Also + -------- + eland.DataFrame.itertuples: Iterate over eland.DataFrame rows as namedtuples. + + Examples + -------- + >>> df = ed.DataFrame('localhost:9200', 'flights', columns=['AvgTicketPrice', 'Cancelled']).head() + >>> df + AvgTicketPrice Cancelled + 0 841.265642 False + 1 882.982662 False + 2 190.636904 False + 3 181.694216 True + 4 730.041778 False + + [5 rows x 2 columns] + + >>> for index, row in df.iterrows(): + ... print(row) + AvgTicketPrice 841.265642 + Cancelled False + Name: 0, dtype: object + AvgTicketPrice 882.982662 + Cancelled False + Name: 1, dtype: object + AvgTicketPrice 190.636904 + Cancelled False + Name: 2, dtype: object + AvgTicketPrice 181.694216 + Cancelled True + Name: 3, dtype: object + AvgTicketPrice 730.041778 + Cancelled False + Name: 4, dtype: object + """ + for df in self._query_compiler.search_yield_pandas_dataframes(): + yield from df.iterrows() + + def itertuples( + self, index: bool = True, name: Union[str, None] = "Eland" + ) -> Iterable[Tuple[Any, ...]]: + """ + Iterate over eland.DataFrame rows as namedtuples. + + Args + ---- + index: bool, default True + If True, return the index as the first element of the tuple. + name: str or None, default "Eland" + The name of the returned namedtuples or None to return regular tuples. + + Returns + ------- + iterator + An object to iterate over namedtuples for each row in the + DataFrame with the first field possibly being the index and + following fields being the column values. + + See Also + -------- + eland.DataFrame.iterrows: Iterate over eland.DataFrame rows as (index, pandas.Series) pairs. + + Examples + -------- + >>> df = ed.DataFrame('localhost:9200', 'flights', columns=['AvgTicketPrice', 'Cancelled']).head() + >>> df + AvgTicketPrice Cancelled + 0 841.265642 False + 1 882.982662 False + 2 190.636904 False + 3 181.694216 True + 4 730.041778 False + + [5 rows x 2 columns] + + >>> for row in df.itertuples(): + ... print(row) + Eland(Index='0', AvgTicketPrice=841.2656419677076, Cancelled=False) + Eland(Index='1', AvgTicketPrice=882.9826615595518, Cancelled=False) + Eland(Index='2', AvgTicketPrice=190.6369038508356, Cancelled=False) + Eland(Index='3', AvgTicketPrice=181.69421554118, Cancelled=True) + Eland(Index='4', AvgTicketPrice=730.041778346198, Cancelled=False) + + By setting the `index` parameter to False we can remove the index as the first element of the tuple: + >>> for row in df.itertuples(index=False): + ... print(row) + Eland(AvgTicketPrice=841.2656419677076, Cancelled=False) + Eland(AvgTicketPrice=882.9826615595518, Cancelled=False) + Eland(AvgTicketPrice=190.6369038508356, Cancelled=False) + Eland(AvgTicketPrice=181.69421554118, Cancelled=True) + Eland(AvgTicketPrice=730.041778346198, Cancelled=False) + + With the `name` parameter set we set a custom name for the yielded namedtuples: + >>> for row in df.itertuples(name='Flight'): + ... print(row) + Flight(Index='0', AvgTicketPrice=841.2656419677076, Cancelled=False) + Flight(Index='1', AvgTicketPrice=882.9826615595518, Cancelled=False) + Flight(Index='2', AvgTicketPrice=190.6369038508356, Cancelled=False) + Flight(Index='3', AvgTicketPrice=181.69421554118, Cancelled=True) + Flight(Index='4', AvgTicketPrice=730.041778346198, Cancelled=False) + """ + for df in self._query_compiler.search_yield_pandas_dataframes(): + yield from df.itertuples(index=index, name=name) + def aggregate( self, func: Union[str, List[str]], diff --git a/eland/operations.py b/eland/operations.py index 48c33448..0fad8ef2 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -1211,6 +1211,36 @@ def to_csv( df = self._es_results(query_compiler, show_progress) return df.to_csv(**kwargs) # type: ignore[no-any-return] + def search_yield_pandas_dataframes( + self, query_compiler: "QueryCompiler" + ) -> Generator["pd.DataFrame", None, None]: + query_params, post_processing = self._resolve_tasks(query_compiler) + + result_size, sort_params = Operations._query_params_to_size_and_sort( + query_params + ) + + script_fields = query_params.script_fields + query = Query(query_params.query) + + body = query.to_search_body() + if script_fields is not None: + body["script_fields"] = script_fields + + # Only return requested field_names and add them to body + _source = query_compiler.get_field_names(include_scripted_fields=False) + body["_source"] = _source if _source else False + + if sort_params: + body["sort"] = [sort_params] + + for hits in _search_yield_hits( + query_compiler=query_compiler, body=body, max_number_of_hits=result_size + ): + df = query_compiler._es_results_to_pandas(hits) + df = self._apply_df_post_processing(df, post_processing) + yield df + def _es_results( self, query_compiler: "QueryCompiler", show_progress: bool = False ) -> pd.DataFrame: diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 207d1c15..4a818049 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -21,6 +21,7 @@ TYPE_CHECKING, Any, Dict, + Generator, List, Optional, Sequence, @@ -527,6 +528,9 @@ def to_csv(self, **kwargs) -> Optional[str]: """ return self._operations.to_csv(self, **kwargs) + def search_yield_pandas_dataframes(self) -> Generator["pd.DataFrame", None, None]: + return self._operations.search_yield_pandas_dataframes(self) + # __getitem__ methods def getitem_column_array(self, key, numeric=False): """Get column data for target labels. diff --git a/tests/dataframe/test_iterrows_itertuples_pytest.py b/tests/dataframe/test_iterrows_itertuples_pytest.py new file mode 100644 index 00000000..9dc495e9 --- /dev/null +++ b/tests/dataframe/test_iterrows_itertuples_pytest.py @@ -0,0 +1,64 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# File called _pytest for PyCharm compatability + +import pytest +from pandas.testing import assert_series_equal + +from tests.common import TestData + + +class TestDataFrameIterrowsItertuples(TestData): + def test_iterrows(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + + ed_flights_iterrows = ed_flights.iterrows() + pd_flights_iterrows = pd_flights.iterrows() + + for ed_index, ed_row in ed_flights_iterrows: + pd_index, pd_row = next(pd_flights_iterrows) + + assert ed_index == pd_index + assert_series_equal(ed_row, pd_row) + + # Assert that both are the same length and are exhausted. + with pytest.raises(StopIteration): + next(ed_flights_iterrows) + with pytest.raises(StopIteration): + next(pd_flights_iterrows) + + def test_itertuples(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + + ed_flights_itertuples = list(ed_flights.itertuples(name=None)) + pd_flights_itertuples = list(pd_flights.itertuples(name=None)) + + def assert_tuples_almost_equal(left, right): + # Shim which uses pytest.approx() for floating point values inside tuples. + assert len(left) == len(right) + assert all( + (lt == rt) # Not floats? Use == + if not isinstance(lt, float) and not isinstance(rt, float) + else (lt == pytest.approx(rt)) # If both are floats use pytest.approx() + for lt, rt in zip(left, right) + ) + + for ed_tuple, pd_tuple in zip(ed_flights_itertuples, pd_flights_itertuples): + assert_tuples_almost_equal(ed_tuple, pd_tuple)