Skip to content

Commit

Permalink
Merge pull request #1424 from kuzudb/copy-by-column
Browse files Browse the repository at this point in the history
Populate by column
  • Loading branch information
acquamarin committed Mar 30, 2023
2 parents e307fbc + 7e0368c commit 6675cf1
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 117 deletions.
70 changes: 55 additions & 15 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,19 @@
namespace kuzu {
namespace storage {

using set_element_func_t = std::function<void(NodeInMemColumn* column,
InMemColumnChunk* columnChunk, common::offset_t nodeOffset, const std::string& data)>;

class NodeCopier : public TableCopier {

public:
NodeCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, common::table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID},
nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs} {}
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID,
nodesStatisticsAndDeletedIDs} {}

protected:
inline void updateTableStatistics() override {
nodesStatisticsAndDeletedIDs->setNumTuplesForTable(tableSchema->tableID, numRows);
}

void initializeColumnsAndLists() override;

void populateColumnsAndLists() override;
Expand All @@ -46,13 +45,10 @@ class NodeCopier : public TableCopier {
arrow::Status populateColumnsFromParquet(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
static void putPropsOfLineIntoColumns(
std::unordered_map<uint64_t, std::unique_ptr<InMemColumnChunk>>& chunks,
std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>>&
propertyColumns,
std::vector<PageByteCursor>& overflowCursors,
const std::vector<std::shared_ptr<T>>& arrow_columns, common::offset_t nodeOffset,
uint64_t blockOffset, common::CopyDescription& copyDescription);
static void putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<T> arrowArray, common::offset_t startNodeOffset,
uint64_t numLinesInCurBlock, common::CopyDescription& copyDescription,
PageByteCursor& overflowCursor);

// Concurrent tasks.
// Note that primaryKeyPropertyIdx is *NOT* the property ID of the primary key property.
Expand All @@ -77,8 +73,52 @@ class NodeCopier : public TableCopier {
assert(false);
}

private:
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
static set_element_func_t getSetElementFunc(common::DataTypeID typeID,
common::CopyDescription& copyDescription, PageByteCursor& pageByteCursor);

template<typename T>
inline static void setNumericElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = common::TypeUtils::convertStringToNumber<T>(data.c_str());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}

inline static void setBoolElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = common::TypeUtils::convertToBoolean(data.c_str());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}

template<typename T>
inline static void setTimeElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = T::FromCString(data.c_str(), data.length());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}

inline static void setStringElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data, PageByteCursor& overflowCursor) {
auto val = column->getInMemOverflowFile()->copyString(
data.substr(0, common::BufferPoolConstants::PAGE_4KB_SIZE).c_str(), overflowCursor);
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
}

inline static void setVarListElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data,
common::CopyDescription& copyDescription, PageByteCursor& overflowCursor) {
auto varListVal =
getArrowVarList(data, 1, data.length() - 2, column->getDataType(), copyDescription);
auto kuList = column->getInMemOverflowFile()->copyList(*varListVal, overflowCursor);
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&kuList));
}

inline static void setFixedListElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data,
common::CopyDescription& copyDescription) {
auto fixedListVal =
getArrowFixedList(data, 1, data.length() - 2, column->getDataType(), copyDescription);
column->setElementInChunk(columnChunk, nodeOffset, fixedListVal.get());
}
};

} // namespace storage
Expand Down
5 changes: 0 additions & 5 deletions src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ class RelCopier : public TableCopier {
private:
static std::string getTaskTypeName(PopulateTaskType populateTaskType);

inline void updateTableStatistics() override {
relsStatistics->setNumRelsForTable(tableSchema->tableID, numRows);
}

void initializeColumnsAndLists() override;

void populateColumnsAndLists() override;
Expand Down Expand Up @@ -145,7 +141,6 @@ class RelCopier : public TableCopier {

private:
const std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerTable;
RelsStatistics* relsStatistics;
std::unique_ptr<transaction::Transaction> dummyReadOnlyTrx;
std::map<common::table_id_t, std::unique_ptr<PrimaryKeyIndex>> pkIndexes;
std::atomic<uint64_t> numRels = 0;
Expand Down
22 changes: 13 additions & 9 deletions src/include/storage/copier/table_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/task_system/task_scheduler.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/in_mem_storage_structure/in_mem_lists.h"
#include "storage/store/table_statistics.h"
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/io/api.h>
Expand Down Expand Up @@ -35,16 +36,14 @@ class TableCopier {

public:
TableCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
common::table_id_t tableID);
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, common::table_id_t tableID,
TablesStatistics* tableStatisticsAndDeletedIDs);

virtual ~TableCopier() = default;

uint64_t copy();

protected:
virtual void updateTableStatistics() = 0;

virtual void initializeColumnsAndLists() = 0;

virtual void populateColumnsAndLists() = 0;
Expand All @@ -53,6 +52,10 @@ class TableCopier {

virtual void populateInMemoryStructures();

inline void updateTableStatistics() {
tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows);
}

void countNumLines(const std::vector<std::string>& filePath);

arrow::Status countNumLinesCSV(const std::vector<std::string>& filePaths);
Expand All @@ -78,13 +81,13 @@ class TableCopier {
std::unique_ptr<parquet::arrow::FileReader>& reader, const std::string& filePath);

static std::vector<std::pair<int64_t, int64_t>> getListElementPos(
std::string& l, int64_t from, int64_t to, common::CopyDescription& copyDescription);
const std::string& l, int64_t from, int64_t to, common::CopyDescription& copyDescription);

static std::unique_ptr<common::Value> getArrowVarList(std::string& l, int64_t from, int64_t to,
const common::DataType& dataType, common::CopyDescription& copyDescription);
static std::unique_ptr<common::Value> getArrowVarList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

static std::unique_ptr<uint8_t[]> getArrowFixedList(std::string& l, int64_t from, int64_t to,
const common::DataType& dataType, common::CopyDescription& copyDescription);
static std::unique_ptr<uint8_t[]> getArrowFixedList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

static void throwCopyExceptionIfNotOK(const arrow::Status& status);

Expand All @@ -105,6 +108,7 @@ class TableCopier {
catalog::Catalog& catalog;
catalog::TableSchema* tableSchema;
uint64_t numRows;
TablesStatistics* tablesStatistics;
};

} // namespace storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class NodesStatisticsAndDeletedIDs : public TablesStatistics {
directory, common::DBFileType::ORIGINAL, transaction::TransactionType::READ_ONLY);
}

inline void setNumTuplesForTable(common::table_id_t tableID, uint64_t numTuples) {
inline void setNumTuplesForTable(common::table_id_t tableID, uint64_t numTuples) override {
initTableStatisticPerTableForWriteTrxIfNecessary();
((NodeStatisticsAndDeletedIDs*)tablesStatisticsContentForWriteTrx
->tableStatisticPerTable[tableID]
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/rels_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RelsStatistics : public TablesStatistics {
return (RelStatistics*)tableStatisticPerTable[tableID].get();
}

void setNumRelsForTable(common::table_id_t relTableID, uint64_t numRels);
void setNumTuplesForTable(common::table_id_t relTableID, uint64_t numRels) override;

void updateNumRelsByValue(common::table_id_t relTableID, int64_t value);

Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/store/table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class TablesStatistics {

virtual ~TablesStatistics() = default;

virtual void setNumTuplesForTable(common::table_id_t tableID, uint64_t numTuples) = 0;

inline void writeTablesStatisticsFileForWALRecord(const std::string& directory) {
saveToFile(directory, common::DBFileType::WAL_VERSION, transaction::TransactionType::WRITE);
}
Expand Down
125 changes: 49 additions & 76 deletions src/storage/copier/node_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* ove
template<typename T1, typename T2>
arrow::Status NodeCopier::batchPopulateColumnsTask(uint64_t primaryKeyPropertyIdx,
uint64_t blockIdx, uint64_t startOffset, HashIndexBuilder<T1>* pkIndex, NodeCopier* copier,
const std::vector<std::shared_ptr<T2>>& batchColumns, std::string filePath) {
const std::vector<std::shared_ptr<T2>>& batchArrays, std::string filePath) {
copier->logger->trace("Start: path={0} blkIdx={1}", filePath, blockIdx);
auto numLinesInCurBlock = copier->fileBlockInfos.at(filePath).numLinesPerBlock[blockIdx];

Expand All @@ -129,9 +129,10 @@ arrow::Status NodeCopier::batchPopulateColumnsTask(uint64_t primaryKeyPropertyId
column->getNumBytesForElement(), column->getNumElementsInAPage());
}
std::vector<PageByteCursor> overflowCursors(copier->tableSchema->getNumProperties());
for (auto blockOffset = 0u; blockOffset < numLinesInCurBlock; ++blockOffset) {
putPropsOfLineIntoColumns(chunks, copier->columns, overflowCursors, batchColumns,
startOffset + blockOffset, blockOffset, copier->copyDescription);
for (auto& [propertyIdx, column] : copier->columns) {
putPropsOfLinesIntoColumns(chunks.at(propertyIdx).get(), column.get(),
batchArrays[propertyIdx], startOffset, numLinesInCurBlock, copier->copyDescription,
overflowCursors[propertyIdx]);
}
// Flush each page within the [StartOffset, endOffset] range.
for (auto& [propertyIdx, column] : copier->columns) {
Expand All @@ -147,80 +148,17 @@ arrow::Status NodeCopier::batchPopulateColumnsTask(uint64_t primaryKeyPropertyId
}

template<typename T>
void NodeCopier::putPropsOfLineIntoColumns(
std::unordered_map<uint64_t, std::unique_ptr<InMemColumnChunk>>& chunks,
std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>>& propertyColumns,
std::vector<PageByteCursor>& overflowCursors,
const std::vector<std::shared_ptr<T>>& arrow_columns, uint64_t nodeOffset,
uint64_t bufferOffset, CopyDescription& copyDescription) {
for (auto columnIdx = 0u; columnIdx < propertyColumns.size(); columnIdx++) {
auto column = propertyColumns.at(columnIdx).get();
auto chunk = chunks.at(columnIdx).get();
auto currentToken = arrow_columns[columnIdx]->GetScalar(bufferOffset);
void NodeCopier::putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<T> arrowArray, common::offset_t startNodeOffset, uint64_t numLinesInCurBlock,
CopyDescription& copyDescription, PageByteCursor& overflowCursor) {
auto setElementFunc =
getSetElementFunc(column->getDataType().typeID, copyDescription, overflowCursor);
for (auto i = 0u; i < numLinesInCurBlock; i++) {
auto nodeOffset = startNodeOffset + i;
auto currentToken = arrowArray->GetScalar(i);
if ((*currentToken)->is_valid) {
auto stringToken = currentToken->get()->ToString();
const char* data = stringToken.c_str();
switch (column->getDataType().typeID) {
case INT64: {
auto val = TypeUtils::convertStringToNumber<int64_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case INT32: {
auto val = TypeUtils::convertStringToNumber<int32_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case INT16: {
auto val = TypeUtils::convertStringToNumber<int16_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case DOUBLE: {
auto val = TypeUtils::convertStringToNumber<double_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case FLOAT: {
auto val = TypeUtils::convertStringToNumber<float_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case BOOL: {
auto val = TypeUtils::convertToBoolean(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case DATE: {
date_t val = Date::FromCString(data, stringToken.length());
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case TIMESTAMP: {
timestamp_t val = Timestamp::FromCString(data, stringToken.length());
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case INTERVAL: {
interval_t val = Interval::FromCString(data, stringToken.length());
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case STRING: {
stringToken = stringToken.substr(0, BufferPoolConstants::PAGE_4KB_SIZE);
data = stringToken.c_str();
auto val = reinterpret_cast<NodeInMemColumnWithOverflow*>(column)
->getInMemOverflowFile()
->copyString(data, overflowCursors[columnIdx]);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case VAR_LIST: {
auto varListVal = getArrowVarList(stringToken, 1, stringToken.length() - 2,
column->getDataType(), copyDescription);
auto kuList = reinterpret_cast<NodeInMemColumnWithOverflow*>(column)
->getInMemOverflowFile()
->copyList(*varListVal, overflowCursors[columnIdx]);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&kuList));
} break;
case FIXED_LIST: {
auto fixedListVal = getArrowFixedList(stringToken, 1, stringToken.length() - 2,
column->getDataType(), copyDescription);
column->setElementInChunk(chunk, nodeOffset, fixedListVal.get());
} break;
default:
break;
}
setElementFunc(column, columnChunk, nodeOffset, stringToken);
}
}
}
Expand Down Expand Up @@ -300,5 +238,40 @@ void NodeCopier::appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overf
}
}

set_element_func_t NodeCopier::getSetElementFunc(common::DataTypeID typeID,
common::CopyDescription& copyDescription, PageByteCursor& pageByteCursor) {
switch (typeID) {
case common::DataTypeID::INT64:
return setNumericElement<int64_t>;
case common::DataTypeID::INT32:
return setNumericElement<int32_t>;
case common::DataTypeID::INT16:
return setNumericElement<int16_t>;
case common::DataTypeID::DOUBLE:
return setNumericElement<double_t>;
case common::DataTypeID::FLOAT:
return setNumericElement<float_t>;
case common::DataTypeID::BOOL:
return setBoolElement;
case common::DataTypeID::DATE:
return setTimeElement<common::Date>;
case common::DataTypeID::TIMESTAMP:
return setTimeElement<common::Timestamp>;
case common::DataTypeID::INTERVAL:
return setTimeElement<common::Interval>;
case common::DataTypeID::STRING:
return std::bind(setStringElement, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4, pageByteCursor);
case common::DataTypeID::VAR_LIST:
return std::bind(setVarListElement, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4, copyDescription, pageByteCursor);
case common::DataTypeID::FIXED_LIST:
return std::bind(setFixedListElement, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4, copyDescription);
default:
throw common::RuntimeException("Unsupported data type.");
}
}

} // namespace storage
} // namespace kuzu
6 changes: 3 additions & 3 deletions src/storage/copier/rel_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ RelCopier::RelCopier(CopyDescription& copyDescription, std::string outputDirecto
TaskScheduler& taskScheduler, Catalog& catalog,
std::map<table_id_t, offset_t> maxNodeOffsetsPerNodeTable, BufferManager* bufferManager,
table_id_t tableID, RelsStatistics* relsStatistics)
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID},
maxNodeOffsetsPerTable{std::move(maxNodeOffsetsPerNodeTable)}, relsStatistics{
relsStatistics} {
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID,
relsStatistics},
maxNodeOffsetsPerTable{std::move(maxNodeOffsetsPerNodeTable)} {
dummyReadOnlyTrx = Transaction::getDummyReadOnlyTrx();
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
initializePkIndexes(relTableSchema->srcTableID, *bufferManager);
Expand Down
Loading

0 comments on commit 6675cf1

Please sign in to comment.