Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read state to string column #3381

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Column {
uint64_t numValuesPerPage = UINT64_MAX;
common::node_group_idx_t nodeGroupIdx = common::INVALID_NODE_GROUP_IDX;
std::unique_ptr<ReadState> nullState = nullptr;
// Used for struct columns.
// Used for struct/list/string columns.
std::vector<ReadState> childrenStates;
};

Expand Down
9 changes: 8 additions & 1 deletion src/include/storage/store/dictionary_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@ namespace storage {

class DictionaryColumn {
public:
static constexpr common::vector_idx_t DATA_COLUMN_CHILD_READ_STATE_IDX = 0;
static constexpr common::vector_idx_t OFFSET_COLUMN_CHILD_READ_STATE_IDX = 1;

DictionaryColumn(const std::string& name, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression);

void initReadState(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInChunk, Column::ReadState& columnReadState);

void append(common::node_group_idx_t nodeGroupIdx, const DictionaryChunk& dictChunk);
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
DictionaryChunk& dictChunk);
// Offsets to scan should be a sorted list of pairs mapping the index of the entry in the string
// dictionary (as read from the index column) to the output index in the result vector to store
// the string.
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
void scan(transaction::Transaction* transaction, const Column::ReadState& offsetState,
const Column::ReadState& dataState,
std::vector<std::pair<DictionaryChunk::string_index_t, uint64_t>>& offsetsToScan,
common::ValueVector* resultVector, const ColumnChunkMetadata& indexMeta);

Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/store/string_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ class StringColumn final : public Column {
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction,
RWPropertyStats propertyStatistics, bool enableCompression);

void initReadState(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInChunk, ReadState& columnReadState) override;

void scan(transaction::Transaction* transaction, ReadState& readState,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) override;
Expand Down
18 changes: 13 additions & 5 deletions src/storage/store/dictionary_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ using string_offset_t = DictionaryChunk::string_offset_t;

DictionaryColumn::DictionaryColumn(const std::string& name, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression) {
Transaction* transaction, RWPropertyStats stats, bool enableCompression) {
auto dataColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::DATA, "");
dataColumn = std::make_unique<Column>(dataColName, *LogicalType::UINT8(),
*metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction,
Expand All @@ -24,6 +24,16 @@ DictionaryColumn::DictionaryColumn(const std::string& name, const MetadataDAHInf
stats, enableCompression, false /*requireNullColumn*/);
}

void DictionaryColumn::initReadState(Transaction* transaction, node_group_idx_t nodeGroupIdx,
offset_t startOffsetInChunk, Column::ReadState& readState) {
// We put states for data and offset columns into childrenStates.
readState.childrenStates.resize(2);
dataColumn->initReadState(transaction, nodeGroupIdx, startOffsetInChunk,
readState.childrenStates[DATA_COLUMN_CHILD_READ_STATE_IDX]);
offsetColumn->initReadState(transaction, nodeGroupIdx, startOffsetInChunk,
readState.childrenStates[OFFSET_COLUMN_CHILD_READ_STATE_IDX]);
}

void DictionaryColumn::append(node_group_idx_t nodeGroupIdx, const DictionaryChunk& dictChunk) {
KU_ASSERT(dictChunk.sanityCheck());
dataColumn->append(dictChunk.getStringDataChunk(), nodeGroupIdx);
Expand All @@ -49,12 +59,10 @@ void DictionaryColumn::scan(Transaction* transaction, node_group_idx_t nodeGroup
offsetColumn->scan(transaction, nodeGroupIdx, offsetChunk);
}

void DictionaryColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx,
void DictionaryColumn::scan(Transaction* transaction, const Column::ReadState& offsetState,
const Column::ReadState& dataState,
std::vector<std::pair<string_index_t, uint64_t>>& offsetsToScan, ValueVector* resultVector,
const ColumnChunkMetadata& indexMeta) {
auto offsetState = offsetColumn->getReadState(transaction->getType(), nodeGroupIdx);
auto dataState = dataColumn->getReadState(transaction->getType(), nodeGroupIdx);

string_index_t firstOffsetToScan, lastOffsetToScan;
auto comp = [](auto pair1, auto pair2) { return pair1.first < pair2.first; };
auto duplicationFactor = (double)offsetState.metadata.numValues / indexMeta.numValues;
Expand Down
24 changes: 18 additions & 6 deletions src/storage/store/string_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ StringColumn::StringColumn(std::string name, LogicalType dataType,
dictionary{name, metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, transaction, stats,
enableCompression} {}

void StringColumn::initReadState(Transaction* transaction, node_group_idx_t nodeGroupIdx,
offset_t startOffsetInChunk, ReadState& readState) {
Column::initReadState(transaction, nodeGroupIdx, startOffsetInChunk, readState);
dictionary.initReadState(transaction, nodeGroupIdx, startOffsetInChunk, readState);
}

void StringColumn::scan(Transaction* transaction, ReadState& readState, offset_t startOffsetInGroup,
offset_t endOffsetInGroup, ValueVector* resultVector, uint64_t offsetInVector) {
nullColumn->scan(transaction, *readState.nullState, startOffsetInGroup, endOffsetInGroup,
Expand Down Expand Up @@ -121,8 +127,10 @@ void StringColumn::scanUnfiltered(transaction::Transaction* transaction, ReadSta
// All scanned values are null
return;
}
dictionary.scan(transaction, readState.nodeGroupIdx, offsetsToScan, resultVector,
readState.metadata);
dictionary.scan(transaction,
readState.childrenStates[DictionaryColumn::OFFSET_COLUMN_CHILD_READ_STATE_IDX],
readState.childrenStates[DictionaryColumn::DATA_COLUMN_CHILD_READ_STATE_IDX], offsetsToScan,
resultVector, readState.metadata);
}

void StringColumn::scanFiltered(transaction::Transaction* transaction, ReadState& readState,
Expand All @@ -143,8 +151,10 @@ void StringColumn::scanFiltered(transaction::Transaction* transaction, ReadState
// All scanned values are null
return;
}
dictionary.scan(transaction, readState.nodeGroupIdx, offsetsToScan, resultVector,
readState.metadata);
dictionary.scan(transaction,
readState.childrenStates[DictionaryColumn::OFFSET_COLUMN_CHILD_READ_STATE_IDX],
readState.childrenStates[DictionaryColumn::DATA_COLUMN_CHILD_READ_STATE_IDX], offsetsToScan,
resultVector, readState.metadata);
}

void StringColumn::lookupInternal(Transaction* transaction, ReadState& readState,
Expand All @@ -166,8 +176,10 @@ void StringColumn::lookupInternal(Transaction* transaction, ReadState& readState
// All scanned values are null
return;
}
dictionary.scan(transaction, readState.nodeGroupIdx, offsetsToScan, resultVector,
readState.metadata);
dictionary.scan(transaction,
readState.childrenStates[DictionaryColumn::OFFSET_COLUMN_CHILD_READ_STATE_IDX],
readState.childrenStates[DictionaryColumn::DATA_COLUMN_CHILD_READ_STATE_IDX], offsetsToScan,
resultVector, readState.metadata);
}

bool StringColumn::canCommitInPlace(transaction::Transaction* transaction,
Expand Down
Loading