Skip to content

Commit

Permalink
Merge pull request #1491 from kuzudb/copy-rework
Browse files Browse the repository at this point in the history
Encapsulate templatizing of `HashIndexBuilder` inside `PrimaryKeyIndexBuilder`
  • Loading branch information
ray6080 committed Apr 26, 2023
2 parents 3f5d5a6 + db49f79 commit 14e1277
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 176 deletions.
11 changes: 6 additions & 5 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ class ConversionException : public Exception {
explicit ConversionException(const std::string& msg) : Exception(msg){};
};

class ReaderException : public Exception {
public:
explicit ReaderException(const std::string& msg) : Exception("Reader exception: " + msg){};
};

class CopyException : public Exception {
public:
explicit CopyException(const std::string& msg) : Exception("Copy exception: " + msg){};
Expand All @@ -55,6 +50,12 @@ class CatalogException : public Exception {
explicit CatalogException(const std::string& msg) : Exception("Catalog exception: " + msg){};
};

class HashIndexException : public Exception {
public:
explicit HashIndexException(const std::string& msg)
: Exception("HashIndex exception: " + msg){};
};

class StorageException : public Exception {
public:
explicit StorageException(const std::string& msg) : Exception("Storage exception: " + msg){};
Expand Down
87 changes: 42 additions & 45 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class NodeCopySharedState {
common::offset_t nodeOffset;

protected:
// todo: do we need this?
std::unordered_map<std::string, TableCopyExecutor::FileBlockInfo> fileBlockInfos;
common::block_idx_t blockIdx;
std::mutex mtx;
Expand All @@ -87,15 +86,14 @@ class CSVNodeCopySharedState : public NodeCopySharedState {
std::shared_ptr<arrow::csv::StreamingReader> reader;
};

template<typename T>
class NodeCopier {
public:
NodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, HashIndexBuilder<T>* pkIndex,
NodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, std::vector<InMemNodeColumn*> columns,
common::column_id_t pkColumnID)
: sharedState{std::move(sharedState)}, pkIndex{pkIndex}, copyDesc{copyDesc},
columns{columns}, pkColumnID{pkColumnID} {
overflowCursors.resize(columns.size());
columns{std::move(columns)}, pkColumnID{pkColumnID} {
overflowCursors.resize(this->columns.size());
}
virtual ~NodeCopier() = default;

Expand All @@ -109,64 +107,67 @@ class NodeCopier {

protected:
virtual void executeInternal(std::unique_ptr<NodeCopyMorsel> morsel) {
throw std::runtime_error("Not implemented");
throw common::CopyException("executeInternal not implemented");
}

static void copyArrayIntoColumnChunk(InMemColumnChunk* columnChunk, InMemNodeColumn* column,
void copyArrayIntoColumnChunk(InMemColumnChunk* columnChunk, common::column_id_t columnID,
arrow::Array& arrowArray, common::offset_t startNodeOffset,
common::CopyDescription& copyDescription, PageByteCursor& overflowCursor);
static void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, HashIndexBuilder<T>* pkIndex, common::offset_t startOffset,
uint64_t numValues);
static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t offset, HashIndexBuilder<T>* pkIndex) {
assert(false);
common::CopyDescription& copyDescription);
void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, common::offset_t startOffset, uint64_t numValues);

void flushChunksAndPopulatePKIndex(
const std::vector<std::unique_ptr<InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset);

template<typename T, typename... Args>
void appendToPKIndex(
InMemColumnChunk* chunk, common::offset_t startOffset, uint64_t numValues, Args... args) {
throw common::CopyException("appendToPKIndex not implemented");
}

protected:
std::shared_ptr<NodeCopySharedState> sharedState;
HashIndexBuilder<T>* pkIndex;
PrimaryKeyIndexBuilder* pkIndex;
common::CopyDescription copyDesc;
std::vector<InMemNodeColumn*> columns;
common::column_id_t pkColumnID;
std::vector<PageByteCursor> overflowCursors;
};

template<>
void NodeCopier<int64_t>::appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t offset, HashIndexBuilder<int64_t>* pkIndex);
void NodeCopier::appendToPKIndex<int64_t>(
kuzu::storage::InMemColumnChunk* chunk, common::offset_t startOffset, uint64_t numValues);
template<>
void NodeCopier<common::ku_string_t>::appendPKIndex(InMemColumnChunk* chunk,
InMemOverflowFile* overflowFile, common::offset_t offset,
HashIndexBuilder<common::ku_string_t>* pkIndex);
void NodeCopier::appendToPKIndex<common::ku_string_t, storage::InMemOverflowFile*>(
kuzu::storage::InMemColumnChunk* chunk, common::offset_t startOffset, uint64_t numValues,
storage::InMemOverflowFile* overflowFile);

template<typename T>
class CSVNodeCopier : public NodeCopier<T> {
class CSVNodeCopier : public NodeCopier {
public:
CSVNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, HashIndexBuilder<T>* pkIndex,
CSVNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, std::vector<InMemNodeColumn*> columns,
common::column_id_t pkColumnID)
: NodeCopier<T>{std::move(sharedState), pkIndex, copyDesc, columns, pkColumnID} {}
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, std::move(columns), pkColumnID} {}

inline std::unique_ptr<NodeCopier<T>> clone() const override {
return std::make_unique<CSVNodeCopier<T>>(
inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<CSVNodeCopier>(
this->sharedState, this->pkIndex, this->copyDesc, this->columns, this->pkColumnID);
}

protected:
void executeInternal(std::unique_ptr<NodeCopyMorsel> morsel) override;
};

template<typename T>
class ParquetNodeCopier : public NodeCopier<T> {
class ParquetNodeCopier : public NodeCopier {
public:
ParquetNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState,
HashIndexBuilder<T>* pkIndex, const common::CopyDescription& copyDesc,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
std::vector<InMemNodeColumn*> columns, common::column_id_t pkColumnID)
: NodeCopier<T>{std::move(sharedState), pkIndex, copyDesc, columns, pkColumnID} {}
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, std::move(columns), pkColumnID} {}

inline std::unique_ptr<NodeCopier<T>> clone() const override {
return std::make_unique<ParquetNodeCopier<T>>(
inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<ParquetNodeCopier>(
this->sharedState, this->pkIndex, this->copyDesc, this->columns, this->pkColumnID);
}

Expand All @@ -178,18 +179,16 @@ class ParquetNodeCopier : public NodeCopier<T> {
std::string filePath;
};

template<typename T>
class NPYNodeCopier : public NodeCopier<T> {
class NPYNodeCopier : public NodeCopier {
public:
NPYNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, HashIndexBuilder<T>* pkIndex,
NPYNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, std::vector<InMemNodeColumn*> columns,
common::column_id_t columnID, common::column_id_t pkColumnID)
: NodeCopier<T>{std::move(sharedState), pkIndex, copyDesc, columns, pkColumnID},
columnID{columnID} {}
common::column_id_t pkColumnID)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, std::move(columns), pkColumnID} {}

inline std::unique_ptr<NodeCopier<T>> clone() const override {
return std::make_unique<NPYNodeCopier<T>>(this->sharedState, this->pkIndex, this->copyDesc,
this->columns, columnID, this->pkColumnID);
inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<NPYNodeCopier>(
this->sharedState, this->pkIndex, this->copyDesc, this->columns, this->pkColumnID);
}

inline void finalize() override {
Expand All @@ -203,16 +202,14 @@ class NPYNodeCopier : public NodeCopier<T> {

private:
std::unique_ptr<NpyReader> reader;
common::column_id_t columnID;
};

// Note: This is a temporary class used specially in the node copier. NodeCopyTask mimics the
// behaviour of ProcessorTask. Eventually, we shouldn't have both CopyTask and NodeCopyTask.
template<typename T>
class NodeCopyTask : public common::Task {
public:
NodeCopyTask(
std::unique_ptr<NodeCopier<T>> nodeCopier, processor::ExecutionContext* executionContext)
std::unique_ptr<NodeCopier> nodeCopier, processor::ExecutionContext* executionContext)
: Task{executionContext->numThreads}, nodeCopier{std::move(nodeCopier)},
executionContext{executionContext} {};

Expand All @@ -226,7 +223,7 @@ class NodeCopyTask : public common::Task {

private:
std::mutex mtx;
std::unique_ptr<NodeCopier<T>> nodeCopier;
std::unique_ptr<NodeCopier> nodeCopier;
processor::ExecutionContext* executionContext;
};

Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/copier/node_copy_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ class NodeCopyExecutor : public TableCopyExecutor {

void populateColumnsAndLists(processor::ExecutionContext* executionContext) override;

// todo: do we need this? should go to finalize.
// TODO(Guodong): do we need this? should go to finalize.
void saveToFile() override;

std::unordered_map<common::property_id_t, common::column_id_t> propertyIDToColumnIDMap;
std::vector<std::unique_ptr<InMemNodeColumn>> columns;

private:
template<typename T>
void populateColumns(processor::ExecutionContext* executionContext);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class InMemColumnChunk {
auto elemPosInPageInBytes = cursor.elemPosInPage * numBytesForElement;
return getPage(cursor.pageIdx) + elemPosInPageInBytes;
}
inline common::DataType getDataType() const { return dataType; }

template<typename T, typename... Args>
void templateCopyValuesToPage(const PageElementCursor& pageCursor, arrow::Array& array,
Expand Down
61 changes: 61 additions & 0 deletions src/include/storage/index/hash_index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,66 @@ class HashIndexBuilder : public BaseHashIndex {
std::atomic<uint64_t> numEntries;
};

class PrimaryKeyIndexBuilder {
public:
PrimaryKeyIndexBuilder(const std::string& fName, const common::DataType& keyDataType)
: keyDataTypeID{keyDataType.typeID} {
switch (keyDataTypeID) {
case common::INT64: {
hashIndexBuilderForInt64 =
std::make_unique<HashIndexBuilder<int64_t>>(fName, keyDataType);
} break;
case common::STRING: {
hashIndexBuilderForString =
std::make_unique<HashIndexBuilder<common::ku_string_t>>(fName, keyDataType);
} break;
default: {
throw common::Exception(
"Unsupported data type for primary key index: " + std::to_string(keyDataTypeID));
}
}
}

inline void bulkReserve(uint32_t numEntries) {
keyDataTypeID == common::INT64 ? hashIndexBuilderForInt64->bulkReserve(numEntries) :
hashIndexBuilderForString->bulkReserve(numEntries);
}
// Note: append assumes that bulkRserve has been called before it and the index has reserved
// enough space already.
inline void append(int64_t key, common::offset_t value) {
auto retVal = keyDataTypeID == common::INT64 ?
hashIndexBuilderForInt64->append(key, value) :
hashIndexBuilderForString->append(key, value);
if (!retVal) {
throw common::HashIndexException(
common::Exception::getExistedPKExceptionMsg(std::to_string(key)));
}
}
inline void append(const char* key, common::offset_t value) {
auto retVal = keyDataTypeID == common::INT64 ?
hashIndexBuilderForInt64->append(key, value) :
hashIndexBuilderForString->append(key, value);
if (!retVal) {
throw common::HashIndexException(
common::Exception::getExistedPKExceptionMsg(std::string(key)));
}
}
inline bool lookup(int64_t key, common::offset_t& result) {
return keyDataTypeID == common::INT64 ? hashIndexBuilderForInt64->lookup(key, result) :
hashIndexBuilderForString->lookup(key, result);
}

// Non-thread safe. This should only be called in the copyCSV and never be called in parallel.
inline void flush() {
keyDataTypeID == common::INT64 ? hashIndexBuilderForInt64->flush() :
hashIndexBuilderForString->flush();
}

private:
common::DataTypeID keyDataTypeID;
std::unique_ptr<HashIndexBuilder<int64_t>> hashIndexBuilderForInt64;
std::unique_ptr<HashIndexBuilder<common::ku_string_t>> hashIndexBuilderForString;
};

} // namespace storage
} // namespace kuzu
Loading

0 comments on commit 14e1277

Please sign in to comment.