Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate by column #1424

Merged
merged 1 commit into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 55 additions & 15 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,19 @@
namespace kuzu {
namespace storage {

using set_element_func_t = std::function<void(NodeInMemColumn* column,
InMemColumnChunk* columnChunk, common::offset_t nodeOffset, const std::string& data)>;

class NodeCopier : public TableCopier {

public:
NodeCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, common::table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID},
nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs} {}
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID,
nodesStatisticsAndDeletedIDs} {}

protected:
inline void updateTableStatistics() override {
nodesStatisticsAndDeletedIDs->setNumTuplesForTable(tableSchema->tableID, numRows);
}

void initializeColumnsAndLists() override;

void populateColumnsAndLists() override;
Expand All @@ -46,13 +45,10 @@ class NodeCopier : public TableCopier {
arrow::Status populateColumnsFromParquet(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
static void putPropsOfLineIntoColumns(
std::unordered_map<uint64_t, std::unique_ptr<InMemColumnChunk>>& chunks,
std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>>&
propertyColumns,
std::vector<PageByteCursor>& overflowCursors,
const std::vector<std::shared_ptr<T>>& arrow_columns, common::offset_t nodeOffset,
uint64_t blockOffset, common::CopyDescription& copyDescription);
static void putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<T> arrowArray, common::offset_t startNodeOffset,
uint64_t numLinesInCurBlock, common::CopyDescription& copyDescription,
PageByteCursor& overflowCursor);

// Concurrent tasks.
// Note that primaryKeyPropertyIdx is *NOT* the property ID of the primary key property.
Expand All @@ -77,8 +73,52 @@ class NodeCopier : public TableCopier {
assert(false);
}

private:
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
static set_element_func_t getSetElementFunc(common::DataTypeID typeID,
common::CopyDescription& copyDescription, PageByteCursor& pageByteCursor);

template<typename T>
inline static void setNumericElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = common::TypeUtils::convertStringToNumber<T>(data.c_str());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}

inline static void setBoolElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = common::TypeUtils::convertToBoolean(data.c_str());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}

template<typename T>
inline static void setTimeElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = T::FromCString(data.c_str(), data.length());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}

inline static void setStringElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data, PageByteCursor& overflowCursor) {
auto val = column->getInMemOverflowFile()->copyString(
data.substr(0, common::BufferPoolConstants::PAGE_4KB_SIZE).c_str(), overflowCursor);
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
}

inline static void setVarListElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data,
common::CopyDescription& copyDescription, PageByteCursor& overflowCursor) {
auto varListVal =
getArrowVarList(data, 1, data.length() - 2, column->getDataType(), copyDescription);
auto kuList = column->getInMemOverflowFile()->copyList(*varListVal, overflowCursor);
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&kuList));
}

inline static void setFixedListElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data,
common::CopyDescription& copyDescription) {
auto fixedListVal =
getArrowFixedList(data, 1, data.length() - 2, column->getDataType(), copyDescription);
column->setElementInChunk(columnChunk, nodeOffset, fixedListVal.get());
}
};

} // namespace storage
Expand Down
5 changes: 0 additions & 5 deletions src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ class RelCopier : public TableCopier {
private:
static std::string getTaskTypeName(PopulateTaskType populateTaskType);

inline void updateTableStatistics() override {
relsStatistics->setNumRelsForTable(tableSchema->tableID, numRows);
}

void initializeColumnsAndLists() override;

void populateColumnsAndLists() override;
Expand Down Expand Up @@ -145,7 +141,6 @@ class RelCopier : public TableCopier {

private:
const std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerTable;
RelsStatistics* relsStatistics;
std::unique_ptr<transaction::Transaction> dummyReadOnlyTrx;
std::map<common::table_id_t, std::unique_ptr<PrimaryKeyIndex>> pkIndexes;
std::atomic<uint64_t> numRels = 0;
Expand Down
22 changes: 13 additions & 9 deletions src/include/storage/copier/table_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/task_system/task_scheduler.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/in_mem_storage_structure/in_mem_lists.h"
#include "storage/store/table_statistics.h"
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/io/api.h>
Expand Down Expand Up @@ -35,16 +36,14 @@ class TableCopier {

public:
TableCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
common::table_id_t tableID);
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, common::table_id_t tableID,
TablesStatistics* tableStatisticsAndDeletedIDs);

virtual ~TableCopier() = default;

uint64_t copy();

protected:
virtual void updateTableStatistics() = 0;

virtual void initializeColumnsAndLists() = 0;

virtual void populateColumnsAndLists() = 0;
Expand All @@ -53,6 +52,10 @@ class TableCopier {

virtual void populateInMemoryStructures();

inline void updateTableStatistics() {
tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows);
}

void countNumLines(const std::vector<std::string>& filePath);

arrow::Status countNumLinesCSV(const std::vector<std::string>& filePaths);
Expand All @@ -78,13 +81,13 @@ class TableCopier {
std::unique_ptr<parquet::arrow::FileReader>& reader, const std::string& filePath);

static std::vector<std::pair<int64_t, int64_t>> getListElementPos(
std::string& l, int64_t from, int64_t to, common::CopyDescription& copyDescription);
const std::string& l, int64_t from, int64_t to, common::CopyDescription& copyDescription);

static std::unique_ptr<common::Value> getArrowVarList(std::string& l, int64_t from, int64_t to,
const common::DataType& dataType, common::CopyDescription& copyDescription);
static std::unique_ptr<common::Value> getArrowVarList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

static std::unique_ptr<uint8_t[]> getArrowFixedList(std::string& l, int64_t from, int64_t to,
const common::DataType& dataType, common::CopyDescription& copyDescription);
static std::unique_ptr<uint8_t[]> getArrowFixedList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

static void throwCopyExceptionIfNotOK(const arrow::Status& status);

Expand All @@ -105,6 +108,7 @@ class TableCopier {
catalog::Catalog& catalog;
catalog::TableSchema* tableSchema;
uint64_t numRows;
TablesStatistics* tablesStatistics;
};

} // namespace storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class NodesStatisticsAndDeletedIDs : public TablesStatistics {
directory, common::DBFileType::ORIGINAL, transaction::TransactionType::READ_ONLY);
}

inline void setNumTuplesForTable(common::table_id_t tableID, uint64_t numTuples) {
inline void setNumTuplesForTable(common::table_id_t tableID, uint64_t numTuples) override {
initTableStatisticPerTableForWriteTrxIfNecessary();
((NodeStatisticsAndDeletedIDs*)tablesStatisticsContentForWriteTrx
->tableStatisticPerTable[tableID]
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/rels_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RelsStatistics : public TablesStatistics {
return (RelStatistics*)tableStatisticPerTable[tableID].get();
}

void setNumRelsForTable(common::table_id_t relTableID, uint64_t numRels);
void setNumTuplesForTable(common::table_id_t relTableID, uint64_t numRels) override;

void updateNumRelsByValue(common::table_id_t relTableID, int64_t value);

Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/store/table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class TablesStatistics {

virtual ~TablesStatistics() = default;

virtual void setNumTuplesForTable(common::table_id_t tableID, uint64_t numTuples) = 0;

inline void writeTablesStatisticsFileForWALRecord(const std::string& directory) {
saveToFile(directory, common::DBFileType::WAL_VERSION, transaction::TransactionType::WRITE);
}
Expand Down
125 changes: 49 additions & 76 deletions src/storage/copier/node_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* ove
template<typename T1, typename T2>
arrow::Status NodeCopier::batchPopulateColumnsTask(uint64_t primaryKeyPropertyIdx,
uint64_t blockIdx, uint64_t startOffset, HashIndexBuilder<T1>* pkIndex, NodeCopier* copier,
const std::vector<std::shared_ptr<T2>>& batchColumns, std::string filePath) {
const std::vector<std::shared_ptr<T2>>& batchArrays, std::string filePath) {
copier->logger->trace("Start: path={0} blkIdx={1}", filePath, blockIdx);
auto numLinesInCurBlock = copier->fileBlockInfos.at(filePath).numLinesPerBlock[blockIdx];

Expand All @@ -129,9 +129,10 @@ arrow::Status NodeCopier::batchPopulateColumnsTask(uint64_t primaryKeyPropertyId
column->getNumBytesForElement(), column->getNumElementsInAPage());
}
std::vector<PageByteCursor> overflowCursors(copier->tableSchema->getNumProperties());
for (auto blockOffset = 0u; blockOffset < numLinesInCurBlock; ++blockOffset) {
putPropsOfLineIntoColumns(chunks, copier->columns, overflowCursors, batchColumns,
startOffset + blockOffset, blockOffset, copier->copyDescription);
for (auto& [propertyIdx, column] : copier->columns) {
putPropsOfLinesIntoColumns(chunks.at(propertyIdx).get(), column.get(),
batchArrays[propertyIdx], startOffset, numLinesInCurBlock, copier->copyDescription,
overflowCursors[propertyIdx]);
}
// Flush each page within the [StartOffset, endOffset] range.
for (auto& [propertyIdx, column] : copier->columns) {
Expand All @@ -147,80 +148,17 @@ arrow::Status NodeCopier::batchPopulateColumnsTask(uint64_t primaryKeyPropertyId
}

template<typename T>
void NodeCopier::putPropsOfLineIntoColumns(
std::unordered_map<uint64_t, std::unique_ptr<InMemColumnChunk>>& chunks,
std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>>& propertyColumns,
std::vector<PageByteCursor>& overflowCursors,
const std::vector<std::shared_ptr<T>>& arrow_columns, uint64_t nodeOffset,
uint64_t bufferOffset, CopyDescription& copyDescription) {
for (auto columnIdx = 0u; columnIdx < propertyColumns.size(); columnIdx++) {
auto column = propertyColumns.at(columnIdx).get();
auto chunk = chunks.at(columnIdx).get();
auto currentToken = arrow_columns[columnIdx]->GetScalar(bufferOffset);
void NodeCopier::putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<T> arrowArray, common::offset_t startNodeOffset, uint64_t numLinesInCurBlock,
CopyDescription& copyDescription, PageByteCursor& overflowCursor) {
auto setElementFunc =
getSetElementFunc(column->getDataType().typeID, copyDescription, overflowCursor);
for (auto i = 0u; i < numLinesInCurBlock; i++) {
auto nodeOffset = startNodeOffset + i;
auto currentToken = arrowArray->GetScalar(i);
if ((*currentToken)->is_valid) {
auto stringToken = currentToken->get()->ToString();
const char* data = stringToken.c_str();
switch (column->getDataType().typeID) {
case INT64: {
auto val = TypeUtils::convertStringToNumber<int64_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case INT32: {
auto val = TypeUtils::convertStringToNumber<int32_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case INT16: {
auto val = TypeUtils::convertStringToNumber<int16_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case DOUBLE: {
auto val = TypeUtils::convertStringToNumber<double_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case FLOAT: {
auto val = TypeUtils::convertStringToNumber<float_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case BOOL: {
auto val = TypeUtils::convertToBoolean(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case DATE: {
date_t val = Date::FromCString(data, stringToken.length());
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case TIMESTAMP: {
timestamp_t val = Timestamp::FromCString(data, stringToken.length());
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case INTERVAL: {
interval_t val = Interval::FromCString(data, stringToken.length());
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case STRING: {
stringToken = stringToken.substr(0, BufferPoolConstants::PAGE_4KB_SIZE);
data = stringToken.c_str();
auto val = reinterpret_cast<NodeInMemColumnWithOverflow*>(column)
->getInMemOverflowFile()
->copyString(data, overflowCursors[columnIdx]);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case VAR_LIST: {
auto varListVal = getArrowVarList(stringToken, 1, stringToken.length() - 2,
column->getDataType(), copyDescription);
auto kuList = reinterpret_cast<NodeInMemColumnWithOverflow*>(column)
->getInMemOverflowFile()
->copyList(*varListVal, overflowCursors[columnIdx]);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&kuList));
} break;
case FIXED_LIST: {
auto fixedListVal = getArrowFixedList(stringToken, 1, stringToken.length() - 2,
column->getDataType(), copyDescription);
column->setElementInChunk(chunk, nodeOffset, fixedListVal.get());
} break;
default:
break;
}
setElementFunc(column, columnChunk, nodeOffset, stringToken);
}
}
}
Expand Down Expand Up @@ -300,5 +238,40 @@ void NodeCopier::appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overf
}
}

set_element_func_t NodeCopier::getSetElementFunc(common::DataTypeID typeID,
common::CopyDescription& copyDescription, PageByteCursor& pageByteCursor) {
switch (typeID) {
case common::DataTypeID::INT64:
return setNumericElement<int64_t>;
case common::DataTypeID::INT32:
return setNumericElement<int32_t>;
case common::DataTypeID::INT16:
return setNumericElement<int16_t>;
case common::DataTypeID::DOUBLE:
return setNumericElement<double_t>;
case common::DataTypeID::FLOAT:
return setNumericElement<float_t>;
case common::DataTypeID::BOOL:
return setBoolElement;
case common::DataTypeID::DATE:
return setTimeElement<common::Date>;
case common::DataTypeID::TIMESTAMP:
return setTimeElement<common::Timestamp>;
case common::DataTypeID::INTERVAL:
return setTimeElement<common::Interval>;
case common::DataTypeID::STRING:
return std::bind(setStringElement, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4, pageByteCursor);
case common::DataTypeID::VAR_LIST:
return std::bind(setVarListElement, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4, copyDescription, pageByteCursor);
case common::DataTypeID::FIXED_LIST:
return std::bind(setFixedListElement, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4, copyDescription);
default:
throw common::RuntimeException("Unsupported data type.");
}
}

} // namespace storage
} // namespace kuzu
6 changes: 3 additions & 3 deletions src/storage/copier/rel_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ RelCopier::RelCopier(CopyDescription& copyDescription, std::string outputDirecto
TaskScheduler& taskScheduler, Catalog& catalog,
std::map<table_id_t, offset_t> maxNodeOffsetsPerNodeTable, BufferManager* bufferManager,
table_id_t tableID, RelsStatistics* relsStatistics)
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID},
maxNodeOffsetsPerTable{std::move(maxNodeOffsetsPerNodeTable)}, relsStatistics{
relsStatistics} {
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID,
relsStatistics},
maxNodeOffsetsPerTable{std::move(maxNodeOffsetsPerNodeTable)} {
dummyReadOnlyTrx = Transaction::getDummyReadOnlyTrx();
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
initializePkIndexes(relTableSchema->srcTableID, *bufferManager);
Expand Down
Loading