Skip to content

Commit

Permalink
Stream writes in to_csv()
Browse files Browse the repository at this point in the history
Co-authored-by: P. Sai Vinay <pvinay1998@gmail.com>
  • Loading branch information
bartbroere and V1NAY8 committed Nov 6, 2023
1 parent adf0535 commit 28e6d92
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 13 deletions.
1 change: 0 additions & 1 deletion eland/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
# Default number of rows displayed (different to pandas where ALL could be displayed)
DEFAULT_NUM_ROWS_DISPLAYED = 60
DEFAULT_CHUNK_SIZE = 10000
DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000
DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000
DEFAULT_SEARCH_SIZE = 5000
DEFAULT_PIT_KEEP_ALIVE = "3m"
Expand Down
40 changes: 30 additions & 10 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,36 @@ def describe(self, query_compiler: "QueryCompiler") -> pd.DataFrame:
["count", "mean", "std", "min", "25%", "50%", "75%", "max"]
)

def to_csv( # type: ignore
self,
query_compiler: "QueryCompiler",
path_or_buf=None,
header: bool = True,
mode: str = "w",
show_progress: bool = False,
**kwargs,
) -> Optional[str]:
result = []
processed = 0
for i, df in enumerate(
self.search_yield_pandas_dataframes(query_compiler=query_compiler)
):
processed += df.shape[0]
if show_progress and processed % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0:
print(f"{datetime.now()}: read {processed} rows")
result.append(
df.to_csv(
path_or_buf=path_or_buf,
# start appending after the first batch
mode=mode if i == 0 else "a",
# only write the header for the first batch, if wanted at all
header=header if i == 0 else False,
**kwargs,
)
)
if path_or_buf is None:
return "".join(result)

def to_pandas(
self, query_compiler: "QueryCompiler", show_progress: bool = False
) -> pd.DataFrame:
Expand All @@ -1239,16 +1269,6 @@ def to_pandas(
return query_compiler._empty_pd_ef()
return pd.concat(df_list)

def to_csv(
self,
query_compiler: "QueryCompiler",
show_progress: bool = False,
**kwargs: Union[bool, str],
) -> Optional[str]:
return self.to_pandas( # type: ignore[no-any-return]
query_compiler=query_compiler, show_progress=show_progress
).to_csv(**kwargs)

def search_yield_pandas_dataframes(
self, query_compiler: "QueryCompiler"
) -> Generator["pd.DataFrame", None, None]:
Expand Down
4 changes: 2 additions & 2 deletions eland/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def es_query(self, query: Dict[str, Any]) -> "QueryCompiler":
return self._update_query(QueryFilter(query))

# To/From Pandas
def to_pandas(self, show_progress: bool = False):
def to_pandas(self, show_progress: bool = False) -> pd.DataFrame:
"""Converts Eland DataFrame to Pandas DataFrame.
Returns:
Expand All @@ -512,7 +512,7 @@ def to_csv(self, **kwargs) -> Optional[str]:
Returns:
If path_or_buf is None, returns the resulting csv format as a string. Otherwise returns None.
"""
return self._operations.to_csv(self, **kwargs)
return self._operations.to_csv(query_compiler=self, **kwargs)

def search_yield_pandas_dataframes(self) -> Generator["pd.DataFrame", None, None]:
return self._operations.search_yield_pandas_dataframes(self)
Expand Down

0 comments on commit 28e6d92

Please sign in to comment.