Skip to content

Commit

Permalink
chore: refactor the signature of read_sst_meta method
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Dec 19, 2022
1 parent c353a70 commit 74738eb
Showing 1 changed file with 21 additions and 31 deletions.
52 changes: 21 additions & 31 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,7 @@ impl<'a> Reader<'a> {
return Ok(());
}

let ignore_bloom_filter = self.predicate.exprs().is_empty();
let meta_data = Self::read_sst_meta(
self.storage,
self.path,
&self.meta_cache,
self.just_once,
ignore_bloom_filter,
)
.await?;
let meta_data = self.read_sst_meta().await?;

let row_projector = self
.projected_schema
Expand All @@ -251,67 +243,65 @@ impl<'a> Reader<'a> {
}

async fn load_meta_data_from_storage(
storage: &ObjectStoreRef,
&self,
object_meta: &ObjectMeta,
) -> Result<ParquetMetaDataRef> {
let meta_data =
file_format::parquet::fetch_parquet_metadata(storage.as_ref(), object_meta, None)
file_format::parquet::fetch_parquet_metadata(self.storage.as_ref(), object_meta, None)
.await
.map_err(|e| Box::new(e) as _)
.context(DecodeSstMeta)?;

Ok(Arc::new(meta_data))
}

async fn read_sst_meta(
storage: &ObjectStoreRef,
path: &Path,
meta_cache: &Option<MetaCacheRef>,
avoid_update_cache: bool,
empty_predicate: bool,
) -> Result<MetaData> {
if let Some(cache) = meta_cache {
if let Some(meta_data) = cache.get(path.as_ref()) {
async fn read_sst_meta(&self) -> Result<MetaData> {
if let Some(cache) = &self.meta_cache {
if let Some(meta_data) = cache.get(self.path.as_ref()) {
return Ok(meta_data);
}
}

// The metadata can't be found in the cache, and let's fetch it from the
// storage.
let avoid_update_cache = self.just_once;
let empty_predicate = self.predicate.exprs().is_empty();

let meta_data = {
let object_meta = storage.head(path).await.context(ObjectStoreError {})?;
let parquet_meta_data =
Self::load_meta_data_from_storage(storage, &object_meta).await?;
let object_meta = self
.storage
.head(self.path)
.await
.context(ObjectStoreError {})?;
let parquet_meta_data = self.load_meta_data_from_storage(&object_meta).await?;

let ignore_bloom_filter = avoid_update_cache && empty_predicate;
MetaData::try_new(&parquet_meta_data, object_meta.size, ignore_bloom_filter)
.map_err(|e| Box::new(e) as _)
.context(DecodeSstMeta)?
};

if avoid_update_cache || meta_cache.is_none() {
if avoid_update_cache || self.meta_cache.is_none() {
return Ok(meta_data);
}

// Update the cache.
meta_cache
self.meta_cache
.as_ref()
.unwrap()
.put(path.to_string(), meta_data.clone());
.put(self.path.to_string(), meta_data.clone());

Ok(meta_data)
}

#[cfg(test)]
pub(crate) async fn row_groups(&mut self) -> Vec<parquet::file::metadata::RowGroupMetaData> {
let meta_data =
Self::read_sst_meta(self.storage, self.path, &self.meta_cache, false, false)
.await
.unwrap();
let meta_data = self.read_sst_meta().await.unwrap();
meta_data.parquet().row_groups().to_vec()
}
}

/// Options for `read_parallely` in [Reader]
/// Options for `read_parallelly` in [Reader]
#[derive(Debug, Clone, Copy)]
struct ParallelismOptions {
/// Whether allow parallelly reading.
Expand Down

0 comments on commit 74738eb

Please sign in to comment.