Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Mar 1, 2024
1 parent fd3f91e commit 1400ca9
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ impl BuildSpillHandler {
.partition_location
.contains_key(&(partition_id as u8))
{
let spilled_data = spill_state
.spiller
.read_spilled_data(&(partition_id as u8), processor_id)
.await?;
let spilled_data = DataBlock::concat(
&spill_state
.spiller
.read_spilled_data(&(partition_id as u8), processor_id)
.await?,
)?;
if !spilled_data.is_empty() {
return Ok(Some(spilled_data));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,12 @@ impl TransformHashJoinProbe {
.spilled_partitions()
.contains(&(p_id as u8))
{
let spilled_data = spill_state
.spiller
.read_spilled_data(&(p_id as u8), self.processor_id)
.await?;
let spilled_data = DataBlock::concat(
&spill_state
.spiller
.read_spilled_data(&(p_id as u8), self.processor_id)
.await?,
)?;
if !spilled_data.is_empty() {
// Split data to `block_size` rows per sub block.
let (sub_blocks, remain_block) = spilled_data.split_by_rows(self.max_block_size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ where R: Rows + Send + Sync + 'static
Ok(Event::Async)
} else {
// If we get a memory block at initial state, it means we will never spill data.
debug_assert!(self.spiller.columns_layouts.is_empty());
debug_assert!(self.spiller.columns_layout.is_empty());
self.output_block(block);
self.state = State::NoSpill;
Ok(Event::NeedConsume)
Expand Down Expand Up @@ -314,7 +314,7 @@ where R: Rows + Sync + Send + 'static
for _ in 0..num_streams - streams.len() {
let files = self.unmerged_blocks.pop_front().unwrap();
for file in files.iter() {
self.spiller.columns_layouts.remove(file);
self.spiller.columns_layout.remove(file);
}
let stream = BlockStream::Spilled((files, spiller_snapshot.clone()));
streams.push(stream);
Expand Down
137 changes: 35 additions & 102 deletions src/query/service/src/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::arrow::deserialize_column;
use databend_common_expression::arrow::serialize_column;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_hashtable::hash2bucket;
use log::info;
Expand Down Expand Up @@ -75,11 +74,10 @@ pub struct Spiller {
operator: Operator,
config: SpillerConfig,
spiller_type: SpillerType,
/// Record the location of the spilled partitions
/// 1 partition -> 1 partition file
pub partition_location: HashMap<u8, String>,
/// Record columns layouts for spilled data, will be used when read data from disk
pub columns_layouts: HashMap<String, Vec<Vec<usize>>>,
/// 1 partition -> N partition files
pub partition_location: HashMap<u8, Vec<String>>,
/// Record columns layout for spilled data, will be used when read data from disk
pub columns_layout: HashMap<String, Vec<usize>>,
}

impl Spiller {
Expand All @@ -96,7 +94,7 @@ impl Spiller {
config,
spiller_type,
partition_location: Default::default(),
columns_layouts: Default::default(),
columns_layout: Default::default(),
}
}

Expand All @@ -107,75 +105,22 @@ impl Spiller {
/// Read a certain file to a [`DataBlock`].
/// We should guarantee that the file is managed by this spiller.
pub async fn read_spilled(&self, file: &str) -> Result<(DataBlock, u64)> {
debug_assert!(self.columns_layouts.contains_key(file));
debug_assert!(self.columns_layout.contains_key(file));
let data = self.operator.read(file).await?;
let bytes = data.len() as u64;

let mut begin = 0;
let mut columns = vec![];
let columns_layouts = self.columns_layouts.get(file).unwrap();
for (idx, columns_layout) in columns_layouts.iter().enumerate() {
for (col_idx, column_layout) in columns_layout.iter().enumerate() {
if idx == 0 {
columns.push(vec![
deserialize_column(&data[begin..begin + column_layout]).unwrap(),
]);
begin += column_layout;
continue;
}
columns[col_idx]
.push(deserialize_column(&data[begin..begin + column_layout]).unwrap());
begin += column_layout;
}
let mut columns = Vec::with_capacity(self.columns_layout.len());
let columns_layout = self.columns_layout.get(file).unwrap();
for column_layout in columns_layout.iter() {
columns.push(deserialize_column(&data[begin..begin + column_layout]).unwrap());
begin += column_layout;
}
// Concat `columns`
let columns = columns
.into_iter()
.map(|v| Column::concat_columns(v.into_iter()))
.collect::<Result<Vec<_>>>()?;
let block = DataBlock::new_from_columns(columns);
Ok((block, bytes))
}

/// Write a [`DataBlock`] to storage with specific location.
pub async fn spill_block_with_location(
&mut self,
data: DataBlock,
location: &str,
) -> Result<u64> {
let mut write_bytes = 0;
let mut writer = self
.operator
.writer_with(location)
.append(true)
.buffer(8 * 1024 * 1024)
.await?;
let columns = data.columns().to_vec();
let mut columns_data = Vec::with_capacity(columns.len());
let mut columns_layout = Vec::with_capacity(columns.len());
for column in columns.into_iter() {
let column = column.value.as_column().unwrap();
let column_data = serialize_column(column);
columns_layout.push(column_data.len());
write_bytes += column_data.len() as u64;
columns_data.push(column_data);
}
self.columns_layouts
.entry(location.to_string())
.and_modify(|layouts| {
layouts.push(columns_layout.clone());
})
.or_insert(vec![columns_layout]);
for data in columns_data.into_iter() {
writer.write(data).await?;
}
writer.close().await?;

Ok(write_bytes)
}

/// Write a [`DataBlock`] to storage.
/// Todo: change sort call
pub async fn spill_block(&mut self, data: DataBlock) -> Result<(String, u64)> {
let unique_name = GlobalUniqName::unique();
let location = format!("{}/{}", self.config.location_prefix, unique_name);
Expand All @@ -187,17 +132,20 @@ impl Spiller {
.buffer(8 * 1024 * 1024)
.await?;
let columns = data.columns().to_vec();
let mut columns_layout = Vec::with_capacity(columns.len());
let mut columns_data = Vec::with_capacity(columns.len());
for column in columns.into_iter() {
let column = column.value.as_column().unwrap();
let column_data = serialize_column(column);
columns_layout.push(column_data.len());
self.columns_layout
.entry(location.to_string())
.and_modify(|layouts| {
layouts.push(column_data.len());
})
.or_insert(vec![column_data.len()]);
write_bytes += column_data.len() as u64;
columns_data.push(column_data);
}
self.columns_layouts
.insert(location.clone(), vec![columns_layout]);

for data in columns_data.into_iter() {
writer.write(data).await?;
}
Expand All @@ -206,20 +154,6 @@ impl Spiller {
Ok((location, write_bytes))
}

#[async_backtrace::framed]
/// Spill partition set
pub async fn spill(
&mut self,
partitions: Vec<(u8, DataBlock)>,
worker_id: usize,
) -> Result<()> {
for (partition_id, partition) in partitions {
self.spill_with_partition(partition_id, partition, worker_id)
.await?;
}
Ok(())
}

#[async_backtrace::framed]
/// Spill data block with location
pub async fn spill_with_partition(
Expand All @@ -233,15 +167,13 @@ impl Spiller {
bytes: data.memory_size(),
};

if let Some(location) = self.partition_location.get(&p_id) {
// Append data
let _ = self
.spill_block_with_location(data, location.clone().as_str())
.await?;
} else {
let (location, _) = self.spill_block(data).await?;
self.partition_location.insert(p_id, location);
}
let (location, _) = self.spill_block(data).await?;
self.partition_location
.entry(p_id)
.and_modify(|locs| {
locs.push(location.clone());
})
.or_insert(vec![location.clone()]);

self.ctx.get_join_spill_progress().incr(&progress_val);

Expand All @@ -254,15 +186,21 @@ impl Spiller {

#[async_backtrace::framed]
/// Read spilled data with partition id
pub async fn read_spilled_data(&self, p_id: &u8, worker_id: usize) -> Result<DataBlock> {
pub async fn read_spilled_data(&self, p_id: &u8, worker_id: usize) -> Result<Vec<DataBlock>> {
debug_assert!(self.partition_location.contains_key(p_id));
let file = self.partition_location.get(p_id).unwrap();
let (block, _) = self.read_spilled(file).await?;
let files = self.partition_location.get(p_id).unwrap();
let mut spilled_data = Vec::with_capacity(files.len());
for file in files.iter() {
let (block, _) = self.read_spilled(file).await?;
if block.num_rows() != 0 {
spilled_data.push(block);
}
}
info!(
"{:?} read partition {:?}, work id: {:?}",
self.spiller_type, p_id, worker_id
);
Ok(block)
Ok(spilled_data)
}

#[async_backtrace::framed]
Expand Down Expand Up @@ -307,9 +245,4 @@ impl Spiller {
}
Ok(())
}

#[inline(always)]
pub fn spilled_files_num(&self, pid: u8) -> usize {
self.partition_location[&pid].len()
}
}
10 changes: 2 additions & 8 deletions src/query/service/tests/it/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,10 @@ async fn test_spill_with_partition() -> Result<()> {
let res = spiller.spill_with_partition(0_u8, data, 0).await;

assert!(res.is_ok());
assert!(
spiller
.partition_location
.get(&0)
.unwrap()
.starts_with("_query_spill")
);
assert!(spiller.partition_location.get(&0).unwrap()[0].starts_with("_query_spill"));

// Test read spilled data
let block = spiller.read_spilled_data(&(0_u8), 0).await?;
let block = DataBlock::concat(&spiller.read_spilled_data(&(0_u8), 0).await?)?;
assert_eq!(block.num_rows(), 100);
assert_eq!(block.num_columns(), 2);
for (col_idx, col) in block.columns().iter().enumerate() {
Expand Down
4 changes: 2 additions & 2 deletions tests/sqllogictests/suites/tpch/queries.test
Original file line number Diff line number Diff line change
Expand Up @@ -3337,13 +3337,13 @@ MaterializedCte: 0
├── Build
│ └── CTEScan
│ ├── CTE index: 0, sub index: 2
│ └── estimated rows: 81.00
│ └── estimated rows: 80.00
└── Probe
└── HashJoin: INNER
├── Build
│ └── CTEScan
│ ├── CTE index: 0, sub index: 1
│ └── estimated rows: 81.00
│ └── estimated rows: 80.00
└── Probe
└── Scan: default.tpch_test.supplier (#0) (read rows: 2000)

Expand Down

0 comments on commit 1400ca9

Please sign in to comment.