Skip to content

Commit

Permalink
Rework copy table transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jun 9, 2023
1 parent 976d035 commit 5f1baf4
Show file tree
Hide file tree
Showing 24 changed files with 146 additions and 189 deletions.
2 changes: 1 addition & 1 deletion src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "processor/operator/sink.h"
#include "storage/copier/node_copy_executor.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/store/node_table.h"

Expand Down Expand Up @@ -31,6 +30,7 @@ class CopyNodeSharedState {
uint64_t& numRows;
std::mutex mtx;
std::shared_ptr<FactorizedTable> table;
bool hasLoggedWAL;
};

struct CopyNodeLocalState {
Expand Down
15 changes: 9 additions & 6 deletions src/include/processor/operator/scan/scan_rel_table_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,30 @@

#include "processor/operator/filtering_operator.h"
#include "processor/operator/scan/scan_rel_table.h"
#include "storage/store/rels_statistics.h"

namespace kuzu {
namespace processor {

class ScanRelTableColumns : public ScanRelTable, public SelVectorOverWriter {
public:
ScanRelTableColumns(storage::DirectedRelTableData* tableData, std::vector<uint32_t> propertyIds,
const DataPos& inNodeIDVectorPos, std::vector<DataPos> outputVectorsPos,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
ScanRelTableColumns(storage::DirectedRelTableData* tableData, storage::RelStatistics* relStats,
std::vector<uint32_t> propertyIds, const DataPos& inNodeIDVectorPos,
std::vector<DataPos> outputVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanRelTable{inNodeIDVectorPos, std::move(outputVectorsPos),
PhysicalOperatorType::SCAN_REL_TABLE_COLUMNS, std::move(child), id, paramsString},
tableData{tableData} {
scanState = std::make_unique<storage::RelTableScanState>(
std::move(propertyIds), storage::RelTableDataType::COLUMNS);
relStats, std::move(propertyIds), storage::RelTableDataType::COLUMNS);
}

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanRelTableColumns>(tableData, scanState->propertyIds,
inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id, paramsString);
return std::make_unique<ScanRelTableColumns>(tableData, scanState->relStats,
scanState->propertyIds, inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id,
paramsString);
}

private:
Expand Down
14 changes: 8 additions & 6 deletions src/include/processor/operator/scan/scan_rel_table_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ namespace processor {

class ScanRelTableLists : public ScanRelTable {
public:
ScanRelTableLists(storage::DirectedRelTableData* tableData, std::vector<uint32_t> propertyIds,
const DataPos& inNodeIDVectorPos, std::vector<DataPos> outputVectorsPos,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
ScanRelTableLists(storage::DirectedRelTableData* tableData, storage::RelStatistics* relStats,
std::vector<uint32_t> propertyIds, const DataPos& inNodeIDVectorPos,
std::vector<DataPos> outputVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanRelTable{inNodeIDVectorPos, std::move(outputVectorsPos),
PhysicalOperatorType::SCAN_REL_TABLE_LISTS, std::move(child), id, paramsString},
tableData{tableData} {
scanState = std::make_unique<storage::RelTableScanState>(
std::move(propertyIds), storage::RelTableDataType::LISTS);
relStats, std::move(propertyIds), storage::RelTableDataType::LISTS);
}

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanRelTableLists>(tableData, scanState->propertyIds, inNodeIDVectorPos,
outputVectorsPos, children[0]->clone(), id, paramsString);
return make_unique<ScanRelTableLists>(tableData, scanState->relStats,
scanState->propertyIds, inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id,
paramsString);
}

private:
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/copier/rel_copy_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DirectedInMemRelData {

class RelCopyExecutor {
public:
RelCopyExecutor(common::CopyDescription& copyDescription, std::string outputDirectory,
RelCopyExecutor(common::CopyDescription& copyDescription, WAL* wal,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
storage::NodesStore& nodesStore, storage::RelTable* table, RelsStatistics* relsStatistics);

Expand All @@ -58,6 +58,7 @@ class RelCopyExecutor {

private:
common::CopyDescription& copyDescription;
WAL* wal;
std::string outputDirectory;
std::unordered_map<std::string, FileBlockInfo> fileBlockInfos;
common::TaskScheduler& taskScheduler;
Expand Down
10 changes: 5 additions & 5 deletions src/include/storage/storage_structure/lists/lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,27 +186,27 @@ class AdjLists : public Lists {
bufferManager, false /* hasNullBytes */, wal, listsUpdatesStore},
nbrTableID{nbrTableID} {};

inline bool mayContainNulls() const override { return false; }
inline bool mayContainNulls() const final { return false; }

void readValues(transaction::Transaction* transaction, common::ValueVector* valueVector,
ListHandle& listHandle) override;
ListHandle& listHandle) final;

// Currently, used only in copyCSV tests.
std::unique_ptr<std::vector<common::nodeID_t>> readAdjacencyListOfNode(
common::offset_t nodeOffset);

void checkpointInMemoryIfNecessary() override {
inline void checkpointInMemoryIfNecessary() final {
headers->checkpointInMemoryIfNecessary();
Lists::checkpointInMemoryIfNecessary();
}

void rollbackInMemoryIfNecessary() override {
inline void rollbackInMemoryIfNecessary() final {
headers->rollbackInMemoryIfNecessary();
Lists::rollbackInMemoryIfNecessary();
}

private:
void readFromList(common::ValueVector* valueVector, ListHandle& listHandle) override;
void readFromList(common::ValueVector* valueVector, ListHandle& listHandle) final;
void readFromListsUpdatesStore(ListHandle& listHandle, common::ValueVector* valueVector);
void readFromPersistentStore(ListHandle& listHandle, common::ValueVector* valueVector);

Expand Down
2 changes: 0 additions & 2 deletions src/include/storage/storage_structure/storage_structure.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ class BaseColumnOrList : public StorageStructure {
uint16_t pagePosOfFirstElement, uint64_t numValuesToRead, common::table_id_t commonTableID,
bool hasNoNullGuarantee);

void setNullBitOfAPosInFrame(const uint8_t* frame, uint16_t elementPos, bool isNull) const;

void readNullBitsFromAPage(common::ValueVector* valueVector, const uint8_t* frame,
uint64_t posInPage, uint64_t posInVector, uint64_t numBitsToRead) const;

Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class NodeTable {
BufferManager& bufferManager, WAL* wal, catalog::NodeTableSchema* nodeTableSchema);

void initializeData(catalog::NodeTableSchema* nodeTableSchema);

void resetColumns(catalog::NodeTableSchema* nodeTableSchema);
static std::unordered_map<common::property_id_t, std::unique_ptr<Column>> initializeColumns(
WAL* wal, BufferManager* bm, catalog::NodeTableSchema* nodeTableSchema);

inline common::offset_t getMaxNodeOffset(transaction::Transaction* trx) const {
return nodesStatisticsAndDeletedIDs->getMaxNodeOffset(trx, tableID);
Expand Down
10 changes: 8 additions & 2 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "storage/storage_structure/column.h"
#include "storage/storage_structure/lists/lists.h"
#include "storage/storage_utils.h"
#include "storage/store/rels_statistics.h"

namespace kuzu {
namespace storage {
Expand Down Expand Up @@ -40,9 +41,10 @@ class ListsUpdateIteratorsForDirection {

struct RelTableScanState {
public:
RelTableScanState(
RelTableScanState(storage::RelStatistics* relStats,
std::vector<common::property_id_t> propertyIds, RelTableDataType relTableDataType)
: relTableDataType{relTableDataType}, propertyIds{std::move(propertyIds)} {
: relStats{relStats}, relTableDataType{relTableDataType}, propertyIds{
std::move(propertyIds)} {
if (relTableDataType == RelTableDataType::LISTS) {
syncState = std::make_unique<ListSyncState>();
// The first listHandle is for adj lists.
Expand All @@ -58,6 +60,7 @@ struct RelTableScanState {
syncState->hasMoreAndSwitchSourceIfNecessary();
}

RelStatistics* relStats;
RelTableDataType relTableDataType;
std::vector<common::property_id_t> propertyIds;
// sync state between adj and property lists
Expand Down Expand Up @@ -98,6 +101,9 @@ class DirectedRelTableData {
inline void scan(transaction::Transaction* transaction, RelTableScanState& scanState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) {
if (scanState.relStats->getNumTuples() == 0) {
return;
}
if (scanState.relTableDataType == RelTableDataType::COLUMNS) {
scanColumns(transaction, scanState, inNodeIDVector, outputVectors);
} else {
Expand Down
18 changes: 0 additions & 18 deletions src/include/storage/wal_replayer_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,6 @@ namespace storage {

class WALReplayerUtils {
public:
static inline void replaceNodeFilesWithVersionFromWALIfExists(
catalog::NodeTableSchema* nodeTableSchema, const std::string& directory) {
fileOperationOnNodeFiles(nodeTableSchema, directory,
replaceOriginalColumnFilesWithWALVersionIfExists,
replaceOriginalListFilesWithWALVersionIfExists);
}

static inline void replaceRelPropertyFilesWithVersionFromWALIfExists(
catalog::RelTableSchema* relTableSchema, const std::string& directory) {
fileOperationOnRelFiles(relTableSchema, directory,
replaceOriginalColumnFilesWithWALVersionIfExists,
replaceOriginalListFilesWithWALVersionIfExists);
}

static inline void removeDBFilesForNodeTable(
catalog::NodeTableSchema* tableSchema, const std::string& directory) {
fileOperationOnNodeFiles(
Expand Down Expand Up @@ -63,10 +49,6 @@ class WALReplayerUtils {
static void renameDBFilesForRelProperty(const std::string& directory,
catalog::RelTableSchema* relTableSchema, common::property_id_t propertyID);

static void replaceListsHeadersFilesWithVersionFromWALIfExists(
const std::unordered_set<catalog::RelTableSchema*>& relTableSchemas,
common::table_id_t boundTableID, const std::string& directory);

private:
static inline void removeColumnFilesForPropertyIfExists(const std::string& directory,
common::table_id_t relTableID, common::table_id_t boundTableID,
Expand Down
12 changes: 3 additions & 9 deletions src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,9 @@ void Database::rollbackAndClearWAL() {

void Database::recoverIfNecessary() {
if (!wal->isEmptyWAL()) {
if (wal->isLastLoggedRecordCommit()) {
logger->info("Starting up StorageManager and found a non-empty WAL with a committed "
"transaction. Replaying to checkpointInMemory.");
checkpointAndClearWAL(WALReplayMode::RECOVERY_CHECKPOINT);
} else {
logger->info("Starting up StorageManager and found a non-empty WAL but last record is "
"not commit. Clearing the WAL.");
wal->clearWAL();
}
logger->info("Starting up StorageManager and found a non-empty WAL with a committed "
"transaction. Replaying to checkpointInMemory.");
checkpointAndClearWAL(WALReplayMode::RECOVERY_CHECKPOINT);
}
}

Expand Down
25 changes: 14 additions & 11 deletions src/processor/mapper/map_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ static std::vector<property_id_t> populatePropertyIds(

static std::pair<DirectedRelTableData*, std::unique_ptr<RelTableScanState>>
getRelTableDataAndScanState(RelDataDirection direction, catalog::RelTableSchema* relTableSchema,
table_id_t boundNodeTableID, const RelsStore& relsStore, table_id_t relTableID,
table_id_t boundNodeTableID, RelsStore& relsStore, table_id_t relTableID,
const expression_vector& properties) {
if (relTableSchema->getBoundTableID(direction) != boundNodeTableID) {
// No data stored for given direction and boundNode.
Expand All @@ -40,17 +40,17 @@ getRelTableDataAndScanState(RelDataDirection direction, catalog::RelTableSchema*
propertyExpression->getPropertyID(relTableID) :
INVALID_PROPERTY_ID);
}
auto scanState = make_unique<RelTableScanState>(
std::move(propertyIds), relsStore.isSingleMultiplicityInDirection(direction, relTableID) ?
RelTableDataType::COLUMNS :
RelTableDataType::LISTS);
auto relStats = relsStore.getRelsStatistics().getRelStatistics(relTableID);
auto scanState = make_unique<RelTableScanState>(relStats, std::move(propertyIds),
relsStore.isSingleMultiplicityInDirection(direction, relTableID) ?
RelTableDataType::COLUMNS :
RelTableDataType::LISTS);
return std::make_pair(relData, std::move(scanState));
}

static std::unique_ptr<RelTableDataCollection> populateRelTableDataCollection(
table_id_t boundNodeTableID, const RelExpression& rel, ExtendDirection extendDirection,
const expression_vector& properties, const RelsStore& relsStore,
const catalog::Catalog& catalog) {
const expression_vector& properties, RelsStore& relsStore, const catalog::Catalog& catalog) {
std::vector<DirectedRelTableData*> relTableDatas;
std::vector<std::unique_ptr<RelTableScanState>> tableScanStates;
for (auto relTableID : rel.getTableIDs()) {
Expand Down Expand Up @@ -120,19 +120,22 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalExtendToPhysical(
extendDirection != planner::ExtendDirection::BOTH) {
auto relDataDirection = ExtendDirectionUtils::getRelDataDirection(extendDirection);
auto relTableID = rel->getSingleTableID();
auto relTableStats = relsStore.getRelsStatistics().getRelStatistics(relTableID);
if (relsStore.isSingleMultiplicityInDirection(relDataDirection, relTableID)) {
auto propertyIds = populatePropertyIds(relTableID, extend->getProperties());
return make_unique<ScanRelTableColumns>(
relsStore.getRelTable(relTableID)->getDirectedTableData(relDataDirection),
std::move(propertyIds), inNodeIDVectorPos, std::move(outputVectorsPos),
std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting());
relTableStats, std::move(propertyIds), inNodeIDVectorPos,
std::move(outputVectorsPos), std::move(prevOperator), getOperatorID(),
extend->getExpressionsForPrinting());
} else {
assert(!relsStore.isSingleMultiplicityInDirection(relDataDirection, relTableID));
auto propertyIds = populatePropertyIds(relTableID, extend->getProperties());
return make_unique<ScanRelTableLists>(
relsStore.getRelTable(relTableID)->getDirectedTableData(relDataDirection),
std::move(propertyIds), inNodeIDVectorPos, std::move(outputVectorsPos),
std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting());
relTableStats, std::move(propertyIds), inNodeIDVectorPos,
std::move(outputVectorsPos), std::move(prevOperator), getOperatorID(),
extend->getExpressionsForPrinting());
}
} else { // map to generic extend
std::unordered_map<table_id_t, std::unique_ptr<RelTableDataCollection>>
Expand Down
20 changes: 14 additions & 6 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace kuzu {
namespace processor {

CopyNodeSharedState::CopyNodeSharedState(uint64_t& numRows, storage::MemoryManager* memoryManager)
: numRows{numRows}, pkColumnID{0} {
: numRows{numRows}, pkColumnID{0}, hasLoggedWAL{false} {
auto ftTableSchema = std::make_unique<FactorizedTableSchema>();
ftTableSchema->appendColumn(
std::make_unique<ColumnSchema>(false /* flat */, 0 /* dataChunkPos */,
Expand All @@ -20,7 +20,7 @@ void CopyNodeSharedState::initializePrimaryKey(
common::LogicalTypeID::SERIAL) {
pkIndex = std::make_unique<storage::PrimaryKeyIndexBuilder>(
storage::StorageUtils::getNodeIndexFName(
directory, nodeTableSchema->tableID, common::DBFileType::WAL_VERSION),
directory, nodeTableSchema->tableID, common::DBFileType::ORIGINAL),
nodeTableSchema->getPrimaryKey().dataType);
pkIndex->bulkReserve(numRows);
}
Expand All @@ -40,13 +40,22 @@ void CopyNodeSharedState::initializeColumns(
// Skip SERIAL, as it is not physically stored.
continue;
}
auto fPath = storage::StorageUtils::getNodePropertyColumnFName(directory,
nodeTableSchema->tableID, property.propertyID, common::DBFileType::WAL_VERSION);
auto fPath = storage::StorageUtils::getNodePropertyColumnFName(
directory, nodeTableSchema->tableID, property.propertyID, common::DBFileType::ORIGINAL);
columns.push_back(std::make_unique<storage::InMemColumn>(fPath, property.dataType));
}
}

void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) {
{
std::unique_lock xLck{sharedState->mtx};
if (!sharedState->hasLoggedWAL) {
localState->wal->logCopyNodeRecord(localState->table->getTableID());
localState->wal->flushAllPages();
sharedState->hasLoggedWAL = true;
}
}
if (sharedState->hasLoggedWAL) {}
while (children[0]->getNextTuple(context)) {
std::vector<std::unique_ptr<storage::InMemColumnChunk>> columnChunks;
columnChunks.reserve(sharedState->columns.size());
Expand All @@ -69,21 +78,20 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) {
}

void CopyNode::finalize(kuzu::processor::ExecutionContext* context) {
auto tableID = localState->table->getTableID();
if (sharedState->pkIndex) {
sharedState->pkIndex->flush();
}
for (auto& column : sharedState->columns) {
column->saveToFile();
}
auto tableID = localState->table->getTableID();
for (auto& relTableSchema :
localState->catalog->getAllRelTableSchemasContainBoundTable(tableID)) {
localState->relsStore->getRelTable(relTableSchema->tableID)
->batchInitEmptyRelsForNewNodes(relTableSchema, sharedState->numRows);
}
localState->table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable(
tableID, sharedState->numRows);
localState->wal->logCopyNodeRecord(tableID);
auto outputMsg = common::StringUtils::string_format(
"{} number of tuples has been copied to table: {}.", sharedState->numRows,
localState->catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
Expand Down
5 changes: 2 additions & 3 deletions src/processor/operator/copy/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ namespace processor {

uint64_t CopyRel::executeInternal(
kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto relCopier = make_unique<RelCopyExecutor>(copyDescription, wal->getDirectory(),
*taskScheduler, *catalog, nodesStore, table, relsStatistics);
auto relCopier = std::make_unique<RelCopyExecutor>(
copyDescription, wal, *taskScheduler, *catalog, nodesStore, table, relsStatistics);
auto numRelsCopied = relCopier->copy(executionContext);
wal->logCopyRelRecord(tableID);
return numRelsCopied;
}

Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/scan/generic_scan_rel_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ bool RelTableDataCollection::scan(ValueVector* inVector,
std::unique_ptr<RelTableDataCollection> RelTableDataCollection::clone() const {
std::vector<std::unique_ptr<RelTableScanState>> clonedScanStates;
for (auto& scanState : tableScanStates) {
clonedScanStates.push_back(
make_unique<RelTableScanState>(scanState->propertyIds, scanState->relTableDataType));
clonedScanStates.push_back(make_unique<RelTableScanState>(
scanState->relStats, scanState->propertyIds, scanState->relTableDataType));
}
return make_unique<RelTableDataCollection>(relTableDatas, std::move(clonedScanStates));
}
Expand Down
Loading

0 comments on commit 5f1baf4

Please sign in to comment.