Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow extraction from zip archives #11

Merged
merged 2 commits into from
Dec 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 96 additions & 51 deletions gedixr/gedi.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from pathlib import Path
import tempfile
import zipfile
import logging
from tqdm import tqdm
import h5py
Expand All @@ -9,6 +11,8 @@
import gedixr.ancillary as ancil

ALLOWED_PRODUCTS = ['L2A', 'L2B']
PATTERN_L2A = '*GEDI02_A_*.h5'
PATTERN_L2B = '*GEDI02_B_*.h5'

FULL_POWER_BEAMS = ['BEAM0101', 'BEAM0110', 'BEAM1000', 'BEAM1011']

Expand All @@ -33,7 +37,7 @@
('rh100', 'rh100')]


def extract_data(directory, gedi_product='L2B', filter_month=(1, 12), variables=None, beams=None,
def extract_data(directory, temp_unpack_zip=False, gedi_product='L2B', filter_month=(1, 12), variables=None, beams=None,
subset_vector=None, save_gpkg=True, dry_run=False):
"""
Extracts data from GEDI L2A or L2B files in HDF5 format using the following steps:
Expand All @@ -51,6 +55,11 @@ def extract_data(directory, gedi_product='L2B', filter_month=(1, 12), variables=
----------
directory: str or Path
Root directory to recursively search for GEDI L2A/L2B files.
temp_unpack_zip: bool, optional
Unpack zip archives in temporary directories and use those to extract data from? Default is False.
Use this option with caution, as it will create a temporary directory to decompress each zip archive found in
the specified directory! The temporary directories will be deleted after the extraction process, but interruptions
may cause them to remain on the disk.
gedi_product: str, optional
GEDI product type. Either 'L2A' or 'L2B'. Default is 'L2B'.
filter_month: tuple(int), optional
Expand Down Expand Up @@ -100,63 +109,76 @@ def extract_data(directory, gedi_product='L2B', filter_month=(1, 12), variables=
variables = VARIABLES_BASIC_L2A
else:
variables = VARIABLES_BASIC_L2B

if gedi_product == 'L2A':
pattern = PATTERN_L2A
else:
pattern = PATTERN_L2B

# (1) Search for GEDI files
pattern = f'*GEDI02_{gedi_product[-1]}*.h5'
filepaths = [p for p in directory.rglob('*') if p.is_file() and p.match(pattern)]
if dry_run:
print(f"{len(filepaths)} GEDI {gedi_product} files were found to extract data from. "
f"Rerun without activated 'dry_run'-flag to extract data.")
return None
if len(filepaths) == 0:
raise RuntimeError(f"No GEDI {gedi_product} files were found in {directory}.")
tmp_dirs = None
try:
# (1) Search for GEDI files
if temp_unpack_zip:
filepaths, tmp_dirs = _filepaths_from_zips(directory=directory, pattern=pattern)
else:
filepaths = [p for p in directory.rglob('*') if p.is_file() and p.match(pattern)]

gdf_list_no_spatial_subset = []
for i, fp in enumerate(tqdm(filepaths)):

# (2) Filter by month of acquisition and beam type
date = ancil.date_from_gedi_file(gedi_path=fp)
if not filter_month[0] <= date.month <= filter_month[1]:
msg = f'Time of acquisition outside of filter range: month_min={filter_month[0]}, ' \
f'month_max={filter_month[1]}'
ancil.log(handler=log_handler, mode='info', file=fp.name, msg=msg)
continue
if dry_run:
print(f"{len(filepaths)} GEDI {gedi_product} files were found to extract data from. "
f"Rerun without activated 'dry_run'-flag to extract data.")
_cleanup_tmp_dirs(tmp_dirs)
ancil.close_logging(log_handler=log_handler)
return None
if len(filepaths) == 0:
_cleanup_tmp_dirs(tmp_dirs)
raise RuntimeError(f"No GEDI {gedi_product} files were found in {directory}.")

try:
gedi = h5py.File(fp, 'r')
gdf_list_no_spatial_subset = []
for i, fp in enumerate(tqdm(filepaths)):

# (3) Extract data and convert to Dataframe
df = pd.DataFrame(_from_file(gedi_file=gedi, beams=beams, variables=variables, acq_time=date))
# (2) Filter by month of acquisition and beam type
date = ancil.date_from_gedi_file(gedi_path=fp)
if not filter_month[0] <= date.month <= filter_month[1]:
msg = f'Time of acquisition outside of filter range: month_min={filter_month[0]}, ' \
f'month_max={filter_month[1]}'
ancil.log(handler=log_handler, mode='info', file=fp.name, msg=msg)
continue

# (4) Filter by quality flags
df = filter_quality(df=df, log_handler=log_handler, gedi_path=fp)

# (5) Convert to GeoDataFrame and set 'Shot Number' as index
df['geometry'] = df.apply(lambda row: Point(row.longitude, row.latitude), axis=1)
df = df.drop(columns=['latitude', 'longitude'])
gdf = gp.GeoDataFrame(df)
gdf.crs = 'EPSG:4326'

# (6) Subset spatially if any vector files were provided
if subset_vector is not None:
for k, v in out_dict.items():
gdf_sub = gdf[gdf.intersects(v['geo'])]
if not gdf_sub.empty:
if out_dict[k]['gdf'] is None:
out_dict[k]['gdf'] = gdf_sub
else:
out_dict[k]['gdf'] = pd.concat([out_dict[k]['gdf'], gdf_sub])
del gdf_sub
else:
gdf_list_no_spatial_subset.append(gdf)
try:
gedi = h5py.File(fp, 'r')

# (3) Extract data and convert to Dataframe
df = pd.DataFrame(_from_file(gedi_file=gedi, beams=beams, variables=variables, acq_time=date))

# (4) Filter by quality flags
df = filter_quality(df=df, log_handler=log_handler, gedi_path=fp)

# (5) Convert to GeoDataFrame and set 'Shot Number' as index
df['geometry'] = df.apply(lambda row: Point(row.longitude, row.latitude), axis=1)
df = df.drop(columns=['latitude', 'longitude'])
gdf = gp.GeoDataFrame(df)
gdf.crs = 'EPSG:4326'

# (6) Subset spatially if any vector files were provided
if subset_vector is not None:
for k, v in out_dict.items():
gdf_sub = gdf[gdf.intersects(v['geo'])]
if not gdf_sub.empty:
if out_dict[k]['gdf'] is None:
out_dict[k]['gdf'] = gdf_sub
else:
out_dict[k]['gdf'] = pd.concat([out_dict[k]['gdf'], gdf_sub])
del gdf_sub
else:
gdf_list_no_spatial_subset.append(gdf)

gedi.close()
del df, gdf

gedi.close()
del df, gdf
except Exception as msg:
ancil.log(handler=log_handler, mode='exception', file=fp.name, msg=str(msg))
n_err += 1

except Exception as msg:
ancil.log(handler=log_handler, mode='exception', file=fp.name, msg=str(msg))
n_err += 1
try:
# (7) & (8)
out_dir = directory / 'extracted'
out_dir.mkdir(exist_ok=True)
Expand All @@ -177,11 +199,34 @@ def extract_data(directory, gedi_product='L2B', filter_month=(1, 12), variables=
ancil.log(handler=log_handler, mode='exception', msg=str(msg))
n_err += 1
finally:
_cleanup_tmp_dirs(tmp_dirs)
ancil.close_logging(log_handler=log_handler)
if n_err > 0:
print(f"WARNING: {n_err} errors occurred during the extraction process. Please check the log file!")


def _cleanup_tmp_dirs(tmp_dirs):
"""Cleanup temporary directories created during the extraction process."""
if tmp_dirs is not None:
for tmp_dir in tmp_dirs:
tmp_dir.cleanup()


def _filepaths_from_zips(directory, pattern):
"""Decompress zip archives in temporary directories and return filepaths matching the given pattern."""
zip_files = [p for p in directory.rglob('*') if p.is_file() and p.match('*.zip')]
tmp_dirs = [tempfile.TemporaryDirectory() for _ in zip_files]

filepaths = []
for zf, tmp_dir in zip(zip_files, tmp_dirs):
tmp_dir_path = Path(tmp_dir.name)
with zipfile.ZipFile(zf, 'r') as zip_ref:
zip_ref.extractall(tmp_dir_path)
match = [p for p in tmp_dir_path.rglob('*') if p.is_file() and p.match(pattern)]
filepaths.extend(match)
return filepaths, tmp_dirs


def _from_file(gedi_file, beams, variables, acq_time='Acquisition Time'):
"""
Extracts values from a GEDI HDF5 file.
Expand Down