Skip to content

Commit

Permalink
populate column chunks one at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin authored and ray6080 committed Mar 29, 2023
1 parent e307fbc commit 6be982e
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 65 deletions.
22 changes: 6 additions & 16 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@ class NodeCopier : public TableCopier {
NodeCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, common::table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID},
nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs} {}
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID,
nodesStatisticsAndDeletedIDs} {}

protected:
inline void updateTableStatistics() override {
nodesStatisticsAndDeletedIDs->setNumTuplesForTable(tableSchema->tableID, numRows);
}

void initializeColumnsAndLists() override;

void populateColumnsAndLists() override;
Expand All @@ -46,13 +42,10 @@ class NodeCopier : public TableCopier {
arrow::Status populateColumnsFromParquet(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
static void putPropsOfLineIntoColumns(
std::unordered_map<uint64_t, std::unique_ptr<InMemColumnChunk>>& chunks,
std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>>&
propertyColumns,
std::vector<PageByteCursor>& overflowCursors,
const std::vector<std::shared_ptr<T>>& arrow_columns, common::offset_t nodeOffset,
uint64_t blockOffset, common::CopyDescription& copyDescription);
static void putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<T> arrowArray, common::offset_t startNodeOffset,
uint64_t numLinesInCurBlock, common::CopyDescription& copyDescription,
PageByteCursor& overflowCursor);

// Concurrent tasks.
// Note that primaryKeyPropertyIdx is *NOT* the property ID of the primary key property.
Expand All @@ -76,9 +69,6 @@ class NodeCopier : public TableCopier {
common::offset_t offset, HashIndexBuilder<T>* pkIndex) {
assert(false);
}

private:
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
};

} // namespace storage
Expand Down
5 changes: 0 additions & 5 deletions src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ class RelCopier : public TableCopier {
private:
static std::string getTaskTypeName(PopulateTaskType populateTaskType);

inline void updateTableStatistics() override {
relsStatistics->setNumRelsForTable(tableSchema->tableID, numRows);
}

void initializeColumnsAndLists() override;

void populateColumnsAndLists() override;
Expand Down Expand Up @@ -145,7 +141,6 @@ class RelCopier : public TableCopier {

private:
const std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerTable;
RelsStatistics* relsStatistics;
std::unique_ptr<transaction::Transaction> dummyReadOnlyTrx;
std::map<common::table_id_t, std::unique_ptr<PrimaryKeyIndex>> pkIndexes;
std::atomic<uint64_t> numRels = 0;
Expand Down
12 changes: 8 additions & 4 deletions src/include/storage/copier/table_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/task_system/task_scheduler.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/in_mem_storage_structure/in_mem_lists.h"
#include "storage/store/table_statistics.h"
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/io/api.h>
Expand Down Expand Up @@ -35,16 +36,14 @@ class TableCopier {

public:
TableCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
common::table_id_t tableID);
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, common::table_id_t tableID,
TablesStatistics* tableStatisticsAndDeletedIDs);

virtual ~TableCopier() = default;

uint64_t copy();

protected:
virtual void updateTableStatistics() = 0;

virtual void initializeColumnsAndLists() = 0;

virtual void populateColumnsAndLists() = 0;
Expand All @@ -53,6 +52,10 @@ class TableCopier {

virtual void populateInMemoryStructures();

inline void updateTableStatistics() {
tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows);
}

void countNumLines(const std::vector<std::string>& filePath);

arrow::Status countNumLinesCSV(const std::vector<std::string>& filePaths);
Expand Down Expand Up @@ -105,6 +108,7 @@ class TableCopier {
catalog::Catalog& catalog;
catalog::TableSchema* tableSchema;
uint64_t numRows;
TablesStatistics* tablesStatistics;
};

} // namespace storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class NodesStatisticsAndDeletedIDs : public TablesStatistics {
directory, common::DBFileType::ORIGINAL, transaction::TransactionType::READ_ONLY);
}

inline void setNumTuplesForTable(common::table_id_t tableID, uint64_t numTuples) {
inline void setNumTuplesForTable(common::table_id_t tableID, uint64_t numTuples) override {
initTableStatisticPerTableForWriteTrxIfNecessary();
((NodeStatisticsAndDeletedIDs*)tablesStatisticsContentForWriteTrx
->tableStatisticPerTable[tableID]
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/rels_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RelsStatistics : public TablesStatistics {
return (RelStatistics*)tableStatisticPerTable[tableID].get();
}

void setNumRelsForTable(common::table_id_t relTableID, uint64_t numRels);
void setNumTuplesForTable(common::table_id_t relTableID, uint64_t numRels) override;

void updateNumRelsByValue(common::table_id_t relTableID, int64_t value);

Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/store/table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class TablesStatistics {

virtual ~TablesStatistics() = default;

virtual void setNumTuplesForTable(common::table_id_t tableID, uint64_t numTuples) = 0;

inline void writeTablesStatisticsFileForWALRecord(const std::string& directory) {
saveToFile(directory, common::DBFileType::WAL_VERSION, transaction::TransactionType::WRITE);
}
Expand Down
68 changes: 36 additions & 32 deletions src/storage/copier/node_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* ove
template<typename T1, typename T2>
arrow::Status NodeCopier::batchPopulateColumnsTask(uint64_t primaryKeyPropertyIdx,
uint64_t blockIdx, uint64_t startOffset, HashIndexBuilder<T1>* pkIndex, NodeCopier* copier,
const std::vector<std::shared_ptr<T2>>& batchColumns, std::string filePath) {
const std::vector<std::shared_ptr<T2>>& batchArrays, std::string filePath) {
copier->logger->trace("Start: path={0} blkIdx={1}", filePath, blockIdx);
auto numLinesInCurBlock = copier->fileBlockInfos.at(filePath).numLinesPerBlock[blockIdx];

Expand All @@ -129,9 +129,10 @@ arrow::Status NodeCopier::batchPopulateColumnsTask(uint64_t primaryKeyPropertyId
column->getNumBytesForElement(), column->getNumElementsInAPage());
}
std::vector<PageByteCursor> overflowCursors(copier->tableSchema->getNumProperties());
for (auto blockOffset = 0u; blockOffset < numLinesInCurBlock; ++blockOffset) {
putPropsOfLineIntoColumns(chunks, copier->columns, overflowCursors, batchColumns,
startOffset + blockOffset, blockOffset, copier->copyDescription);
for (auto& [propertyIdx, column] : copier->columns) {
putPropsOfLinesIntoColumns(chunks.at(propertyIdx).get(), column.get(),
batchArrays[propertyIdx], startOffset, numLinesInCurBlock, copier->copyDescription,
overflowCursors[propertyIdx]);
}
// Flush each page within the [StartOffset, endOffset] range.
for (auto& [propertyIdx, column] : copier->columns) {
Expand All @@ -147,76 +148,79 @@ arrow::Status NodeCopier::batchPopulateColumnsTask(uint64_t primaryKeyPropertyId
}

template<typename T>
void NodeCopier::putPropsOfLineIntoColumns(
std::unordered_map<uint64_t, std::unique_ptr<InMemColumnChunk>>& chunks,
std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>>& propertyColumns,
std::vector<PageByteCursor>& overflowCursors,
const std::vector<std::shared_ptr<T>>& arrow_columns, uint64_t nodeOffset,
uint64_t bufferOffset, CopyDescription& copyDescription) {
for (auto columnIdx = 0u; columnIdx < propertyColumns.size(); columnIdx++) {
auto column = propertyColumns.at(columnIdx).get();
auto chunk = chunks.at(columnIdx).get();
auto currentToken = arrow_columns[columnIdx]->GetScalar(bufferOffset);
void NodeCopier::putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<T> arrowArray, common::offset_t startNodeOffset, uint64_t numLinesInCurBlock,
CopyDescription& copyDescription, PageByteCursor& overflowCursor) {
for (auto i = 0u; i < numLinesInCurBlock; i++) {
auto nodeOffset = startNodeOffset + i;
auto currentToken = arrowArray->GetScalar(i);
if ((*currentToken)->is_valid) {
auto stringToken = currentToken->get()->ToString();
const char* data = stringToken.c_str();
switch (column->getDataType().typeID) {
case INT64: {
auto val = TypeUtils::convertStringToNumber<int64_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case INT32: {
auto val = TypeUtils::convertStringToNumber<int32_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case INT16: {
auto val = TypeUtils::convertStringToNumber<int16_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case DOUBLE: {
auto val = TypeUtils::convertStringToNumber<double_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case FLOAT: {
auto val = TypeUtils::convertStringToNumber<float_t>(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case BOOL: {
auto val = TypeUtils::convertToBoolean(data);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case DATE: {
date_t val = Date::FromCString(data, stringToken.length());
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case TIMESTAMP: {
timestamp_t val = Timestamp::FromCString(data, stringToken.length());
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case INTERVAL: {
interval_t val = Interval::FromCString(data, stringToken.length());
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case STRING: {
stringToken = stringToken.substr(0, BufferPoolConstants::PAGE_4KB_SIZE);
data = stringToken.c_str();
auto val = reinterpret_cast<NodeInMemColumnWithOverflow*>(column)
->getInMemOverflowFile()
->copyString(data, overflowCursors[columnIdx]);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
auto val = column->getInMemOverflowFile()->copyString(data, overflowCursor);
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
} break;
case VAR_LIST: {
auto varListVal = getArrowVarList(stringToken, 1, stringToken.length() - 2,
column->getDataType(), copyDescription);
auto kuList = reinterpret_cast<NodeInMemColumnWithOverflow*>(column)
->getInMemOverflowFile()
->copyList(*varListVal, overflowCursors[columnIdx]);
column->setElementInChunk(chunk, nodeOffset, reinterpret_cast<uint8_t*>(&kuList));
auto kuList = column->getInMemOverflowFile()->copyList(*varListVal, overflowCursor);
column->setElementInChunk(
columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&kuList));
} break;
case FIXED_LIST: {
auto fixedListVal = getArrowFixedList(stringToken, 1, stringToken.length() - 2,
column->getDataType(), copyDescription);
column->setElementInChunk(chunk, nodeOffset, fixedListVal.get());
column->setElementInChunk(columnChunk, nodeOffset, fixedListVal.get());
} break;
default:
break;
Expand Down
6 changes: 3 additions & 3 deletions src/storage/copier/rel_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ RelCopier::RelCopier(CopyDescription& copyDescription, std::string outputDirecto
TaskScheduler& taskScheduler, Catalog& catalog,
std::map<table_id_t, offset_t> maxNodeOffsetsPerNodeTable, BufferManager* bufferManager,
table_id_t tableID, RelsStatistics* relsStatistics)
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID},
maxNodeOffsetsPerTable{std::move(maxNodeOffsetsPerNodeTable)}, relsStatistics{
relsStatistics} {
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID,
relsStatistics},
maxNodeOffsetsPerTable{std::move(maxNodeOffsetsPerNodeTable)} {
dummyReadOnlyTrx = Transaction::getDummyReadOnlyTrx();
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
initializePkIndexes(relTableSchema->srcTableID, *bufferManager);
Expand Down
6 changes: 4 additions & 2 deletions src/storage/copier/table_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ namespace kuzu {
namespace storage {

TableCopier::TableCopier(CopyDescription& copyDescription, std::string outputDirectory,
TaskScheduler& taskScheduler, Catalog& catalog, common::table_id_t tableID)
TaskScheduler& taskScheduler, Catalog& catalog, common::table_id_t tableID,
TablesStatistics* tablesStatistics)
: logger{LoggerUtils::getLogger(LoggerConstants::LoggerEnum::LOADER)},
copyDescription{copyDescription}, outputDirectory{std::move(outputDirectory)},
taskScheduler{taskScheduler}, catalog{catalog}, numRows{0},
tableSchema{catalog.getReadOnlyVersion()->getTableSchema(tableID)} {}
tableSchema{catalog.getReadOnlyVersion()->getTableSchema(tableID)}, tablesStatistics{
tablesStatistics} {}

uint64_t TableCopier::copy() {
logger->info(StringUtils::string_format("Copying {} file to table {}.",
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/rels_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace kuzu {
namespace storage {

// We should only call this function after we call setNumRelsPerDirectionBoundTableID.
void RelsStatistics::setNumRelsForTable(table_id_t relTableID, uint64_t numRels) {
void RelsStatistics::setNumTuplesForTable(table_id_t relTableID, uint64_t numRels) {
lock_t lck{mtx};
initTableStatisticPerTableForWriteTrxIfNecessary();
assert(tablesStatisticsContentForWriteTrx->tableStatisticPerTable.contains(relTableID));
Expand Down

0 comments on commit 6be982e

Please sign in to comment.