Skip to content

Commit

Permalink
add read state to string column (#3381)
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Apr 26, 2024
1 parent 9ad863a commit 1b64173
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Column {
uint64_t numValuesPerPage = UINT64_MAX;
common::node_group_idx_t nodeGroupIdx = common::INVALID_NODE_GROUP_IDX;
std::unique_ptr<ReadState> nullState = nullptr;
// Used for struct columns.
// Used for struct/list/string columns.
std::vector<ReadState> childrenStates;
};

Expand Down
9 changes: 8 additions & 1 deletion src/include/storage/store/dictionary_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@ namespace storage {

class DictionaryColumn {
public:
static constexpr common::vector_idx_t DATA_COLUMN_CHILD_READ_STATE_IDX = 0;
static constexpr common::vector_idx_t OFFSET_COLUMN_CHILD_READ_STATE_IDX = 1;

DictionaryColumn(const std::string& name, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression);

void initReadState(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInChunk, Column::ReadState& columnReadState);

void append(common::node_group_idx_t nodeGroupIdx, const DictionaryChunk& dictChunk);
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
DictionaryChunk& dictChunk);
// Offsets to scan should be a sorted list of pairs mapping the index of the entry in the string
// dictionary (as read from the index column) to the output index in the result vector to store
// the string.
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
void scan(transaction::Transaction* transaction, const Column::ReadState& offsetState,
const Column::ReadState& dataState,
std::vector<std::pair<DictionaryChunk::string_index_t, uint64_t>>& offsetsToScan,
common::ValueVector* resultVector, const ColumnChunkMetadata& indexMeta);

Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/store/string_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ class StringColumn final : public Column {
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction,
RWPropertyStats propertyStatistics, bool enableCompression);

void initReadState(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInChunk, ReadState& columnReadState) override;

void scan(transaction::Transaction* transaction, ReadState& readState,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) override;
Expand Down
18 changes: 13 additions & 5 deletions src/storage/store/dictionary_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ using string_offset_t = DictionaryChunk::string_offset_t;

DictionaryColumn::DictionaryColumn(const std::string& name, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression) {
Transaction* transaction, RWPropertyStats stats, bool enableCompression) {
auto dataColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::DATA, "");
dataColumn = std::make_unique<Column>(dataColName, *LogicalType::UINT8(),
*metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction,
Expand All @@ -24,6 +24,16 @@ DictionaryColumn::DictionaryColumn(const std::string& name, const MetadataDAHInf
stats, enableCompression, false /*requireNullColumn*/);
}

void DictionaryColumn::initReadState(Transaction* transaction, node_group_idx_t nodeGroupIdx,
offset_t startOffsetInChunk, Column::ReadState& readState) {
// We put states for data and offset columns into childrenStates.
readState.childrenStates.resize(2);
dataColumn->initReadState(transaction, nodeGroupIdx, startOffsetInChunk,
readState.childrenStates[DATA_COLUMN_CHILD_READ_STATE_IDX]);
offsetColumn->initReadState(transaction, nodeGroupIdx, startOffsetInChunk,
readState.childrenStates[OFFSET_COLUMN_CHILD_READ_STATE_IDX]);
}

void DictionaryColumn::append(node_group_idx_t nodeGroupIdx, const DictionaryChunk& dictChunk) {
KU_ASSERT(dictChunk.sanityCheck());
dataColumn->append(dictChunk.getStringDataChunk(), nodeGroupIdx);
Expand All @@ -49,12 +59,10 @@ void DictionaryColumn::scan(Transaction* transaction, node_group_idx_t nodeGroup
offsetColumn->scan(transaction, nodeGroupIdx, offsetChunk);
}

void DictionaryColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx,
void DictionaryColumn::scan(Transaction* transaction, const Column::ReadState& offsetState,
const Column::ReadState& dataState,
std::vector<std::pair<string_index_t, uint64_t>>& offsetsToScan, ValueVector* resultVector,
const ColumnChunkMetadata& indexMeta) {
auto offsetState = offsetColumn->getReadState(transaction->getType(), nodeGroupIdx);
auto dataState = dataColumn->getReadState(transaction->getType(), nodeGroupIdx);

string_index_t firstOffsetToScan, lastOffsetToScan;
auto comp = [](auto pair1, auto pair2) { return pair1.first < pair2.first; };
auto duplicationFactor = (double)offsetState.metadata.numValues / indexMeta.numValues;
Expand Down
24 changes: 18 additions & 6 deletions src/storage/store/string_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ StringColumn::StringColumn(std::string name, LogicalType dataType,
dictionary{name, metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, transaction, stats,
enableCompression} {}

void StringColumn::initReadState(Transaction* transaction, node_group_idx_t nodeGroupIdx,
offset_t startOffsetInChunk, ReadState& readState) {
Column::initReadState(transaction, nodeGroupIdx, startOffsetInChunk, readState);
dictionary.initReadState(transaction, nodeGroupIdx, startOffsetInChunk, readState);
}

void StringColumn::scan(Transaction* transaction, ReadState& readState, offset_t startOffsetInGroup,
offset_t endOffsetInGroup, ValueVector* resultVector, uint64_t offsetInVector) {
nullColumn->scan(transaction, *readState.nullState, startOffsetInGroup, endOffsetInGroup,
Expand Down Expand Up @@ -121,8 +127,10 @@ void StringColumn::scanUnfiltered(transaction::Transaction* transaction, ReadSta
// All scanned values are null
return;
}
dictionary.scan(transaction, readState.nodeGroupIdx, offsetsToScan, resultVector,
readState.metadata);
dictionary.scan(transaction,
readState.childrenStates[DictionaryColumn::OFFSET_COLUMN_CHILD_READ_STATE_IDX],
readState.childrenStates[DictionaryColumn::DATA_COLUMN_CHILD_READ_STATE_IDX], offsetsToScan,
resultVector, readState.metadata);
}

void StringColumn::scanFiltered(transaction::Transaction* transaction, ReadState& readState,
Expand All @@ -143,8 +151,10 @@ void StringColumn::scanFiltered(transaction::Transaction* transaction, ReadState
// All scanned values are null
return;
}
dictionary.scan(transaction, readState.nodeGroupIdx, offsetsToScan, resultVector,
readState.metadata);
dictionary.scan(transaction,
readState.childrenStates[DictionaryColumn::OFFSET_COLUMN_CHILD_READ_STATE_IDX],
readState.childrenStates[DictionaryColumn::DATA_COLUMN_CHILD_READ_STATE_IDX], offsetsToScan,
resultVector, readState.metadata);
}

void StringColumn::lookupInternal(Transaction* transaction, ReadState& readState,
Expand All @@ -166,8 +176,10 @@ void StringColumn::lookupInternal(Transaction* transaction, ReadState& readState
// All scanned values are null
return;
}
dictionary.scan(transaction, readState.nodeGroupIdx, offsetsToScan, resultVector,
readState.metadata);
dictionary.scan(transaction,
readState.childrenStates[DictionaryColumn::OFFSET_COLUMN_CHILD_READ_STATE_IDX],
readState.childrenStates[DictionaryColumn::DATA_COLUMN_CHILD_READ_STATE_IDX], offsetsToScan,
resultVector, readState.metadata);
}

bool StringColumn::canCommitInPlace(transaction::Transaction* transaction,
Expand Down

0 comments on commit 1b64173

Please sign in to comment.