Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework copy transaction to not rely on file renaming #1649

Merged
merged 1 commit into from
Jun 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "processor/operator/sink.h"
#include "storage/copier/node_copy_executor.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/store/node_table.h"

Expand Down Expand Up @@ -31,6 +30,7 @@ class CopyNodeSharedState {
uint64_t& numRows;
std::mutex mtx;
std::shared_ptr<FactorizedTable> table;
bool hasLoggedWAL;
};

struct CopyNodeLocalState {
Expand Down
15 changes: 9 additions & 6 deletions src/include/processor/operator/scan/scan_rel_table_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,30 @@

#include "processor/operator/filtering_operator.h"
#include "processor/operator/scan/scan_rel_table.h"
#include "storage/store/rels_statistics.h"

namespace kuzu {
namespace processor {

class ScanRelTableColumns : public ScanRelTable, public SelVectorOverWriter {
public:
ScanRelTableColumns(storage::DirectedRelTableData* tableData, std::vector<uint32_t> propertyIds,
const DataPos& inNodeIDVectorPos, std::vector<DataPos> outputVectorsPos,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
ScanRelTableColumns(storage::DirectedRelTableData* tableData, storage::RelStatistics* relStats,
std::vector<uint32_t> propertyIds, const DataPos& inNodeIDVectorPos,
std::vector<DataPos> outputVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanRelTable{inNodeIDVectorPos, std::move(outputVectorsPos),
PhysicalOperatorType::SCAN_REL_TABLE_COLUMNS, std::move(child), id, paramsString},
tableData{tableData} {
scanState = std::make_unique<storage::RelTableScanState>(
std::move(propertyIds), storage::RelTableDataType::COLUMNS);
relStats, std::move(propertyIds), storage::RelTableDataType::COLUMNS);
}

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanRelTableColumns>(tableData, scanState->propertyIds,
inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id, paramsString);
return std::make_unique<ScanRelTableColumns>(tableData, scanState->relStats,
scanState->propertyIds, inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id,
paramsString);
}

private:
Expand Down
14 changes: 8 additions & 6 deletions src/include/processor/operator/scan/scan_rel_table_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ namespace processor {

class ScanRelTableLists : public ScanRelTable {
public:
ScanRelTableLists(storage::DirectedRelTableData* tableData, std::vector<uint32_t> propertyIds,
const DataPos& inNodeIDVectorPos, std::vector<DataPos> outputVectorsPos,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
ScanRelTableLists(storage::DirectedRelTableData* tableData, storage::RelStatistics* relStats,
std::vector<uint32_t> propertyIds, const DataPos& inNodeIDVectorPos,
std::vector<DataPos> outputVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanRelTable{inNodeIDVectorPos, std::move(outputVectorsPos),
PhysicalOperatorType::SCAN_REL_TABLE_LISTS, std::move(child), id, paramsString},
tableData{tableData} {
scanState = std::make_unique<storage::RelTableScanState>(
std::move(propertyIds), storage::RelTableDataType::LISTS);
relStats, std::move(propertyIds), storage::RelTableDataType::LISTS);
}

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanRelTableLists>(tableData, scanState->propertyIds, inNodeIDVectorPos,
outputVectorsPos, children[0]->clone(), id, paramsString);
return make_unique<ScanRelTableLists>(tableData, scanState->relStats,
scanState->propertyIds, inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id,
paramsString);
}

private:
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/copier/rel_copy_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DirectedInMemRelData {

class RelCopyExecutor {
public:
RelCopyExecutor(common::CopyDescription& copyDescription, std::string outputDirectory,
RelCopyExecutor(common::CopyDescription& copyDescription, WAL* wal,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
storage::NodesStore& nodesStore, storage::RelTable* table, RelsStatistics* relsStatistics);

Expand All @@ -58,6 +58,7 @@ class RelCopyExecutor {

private:
common::CopyDescription& copyDescription;
WAL* wal;
std::string outputDirectory;
std::unordered_map<std::string, FileBlockInfo> fileBlockInfos;
common::TaskScheduler& taskScheduler;
Expand Down
10 changes: 5 additions & 5 deletions src/include/storage/storage_structure/lists/lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,27 +186,27 @@ class AdjLists : public Lists {
bufferManager, false /* hasNullBytes */, wal, listsUpdatesStore},
nbrTableID{nbrTableID} {};

inline bool mayContainNulls() const override { return false; }
inline bool mayContainNulls() const final { return false; }

void readValues(transaction::Transaction* transaction, common::ValueVector* valueVector,
ListHandle& listHandle) override;
ListHandle& listHandle) final;

// Currently, used only in copyCSV tests.
std::unique_ptr<std::vector<common::nodeID_t>> readAdjacencyListOfNode(
common::offset_t nodeOffset);

void checkpointInMemoryIfNecessary() override {
inline void checkpointInMemoryIfNecessary() final {
headers->checkpointInMemoryIfNecessary();
Lists::checkpointInMemoryIfNecessary();
}

void rollbackInMemoryIfNecessary() override {
inline void rollbackInMemoryIfNecessary() final {
headers->rollbackInMemoryIfNecessary();
Lists::rollbackInMemoryIfNecessary();
}

private:
void readFromList(common::ValueVector* valueVector, ListHandle& listHandle) override;
void readFromList(common::ValueVector* valueVector, ListHandle& listHandle) final;
void readFromListsUpdatesStore(ListHandle& listHandle, common::ValueVector* valueVector);
void readFromPersistentStore(ListHandle& listHandle, common::ValueVector* valueVector);

Expand Down
2 changes: 0 additions & 2 deletions src/include/storage/storage_structure/storage_structure.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ class BaseColumnOrList : public StorageStructure {
uint16_t pagePosOfFirstElement, uint64_t numValuesToRead, common::table_id_t commonTableID,
bool hasNoNullGuarantee);

void setNullBitOfAPosInFrame(const uint8_t* frame, uint16_t elementPos, bool isNull) const;

void readNullBitsFromAPage(common::ValueVector* valueVector, const uint8_t* frame,
uint64_t posInPage, uint64_t posInVector, uint64_t numBitsToRead) const;

Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class NodeTable {
BufferManager& bufferManager, WAL* wal, catalog::NodeTableSchema* nodeTableSchema);

void initializeData(catalog::NodeTableSchema* nodeTableSchema);

void resetColumns(catalog::NodeTableSchema* nodeTableSchema);
static std::unordered_map<common::property_id_t, std::unique_ptr<Column>> initializeColumns(
WAL* wal, BufferManager* bm, catalog::NodeTableSchema* nodeTableSchema);

inline common::offset_t getMaxNodeOffset(transaction::Transaction* trx) const {
return nodesStatisticsAndDeletedIDs->getMaxNodeOffset(trx, tableID);
Expand Down
10 changes: 8 additions & 2 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "storage/storage_structure/column.h"
#include "storage/storage_structure/lists/lists.h"
#include "storage/storage_utils.h"
#include "storage/store/rels_statistics.h"

namespace kuzu {
namespace storage {
Expand Down Expand Up @@ -40,9 +41,10 @@ class ListsUpdateIteratorsForDirection {

struct RelTableScanState {
public:
RelTableScanState(
RelTableScanState(storage::RelStatistics* relStats,
std::vector<common::property_id_t> propertyIds, RelTableDataType relTableDataType)
: relTableDataType{relTableDataType}, propertyIds{std::move(propertyIds)} {
: relStats{relStats}, relTableDataType{relTableDataType}, propertyIds{
std::move(propertyIds)} {
if (relTableDataType == RelTableDataType::LISTS) {
syncState = std::make_unique<ListSyncState>();
// The first listHandle is for adj lists.
Expand All @@ -58,6 +60,7 @@ struct RelTableScanState {
syncState->hasMoreAndSwitchSourceIfNecessary();
}

RelStatistics* relStats;
RelTableDataType relTableDataType;
std::vector<common::property_id_t> propertyIds;
// sync state between adj and property lists
Expand Down Expand Up @@ -98,6 +101,9 @@ class DirectedRelTableData {
inline void scan(transaction::Transaction* transaction, RelTableScanState& scanState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) {
if (scanState.relStats->getNumTuples() == 0) {
return;
}
if (scanState.relTableDataType == RelTableDataType::COLUMNS) {
scanColumns(transaction, scanState, inNodeIDVector, outputVectors);
} else {
Expand Down
18 changes: 0 additions & 18 deletions src/include/storage/wal_replayer_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,6 @@ namespace storage {

class WALReplayerUtils {
public:
static inline void replaceNodeFilesWithVersionFromWALIfExists(
catalog::NodeTableSchema* nodeTableSchema, const std::string& directory) {
fileOperationOnNodeFiles(nodeTableSchema, directory,
replaceOriginalColumnFilesWithWALVersionIfExists,
replaceOriginalListFilesWithWALVersionIfExists);
}

static inline void replaceRelPropertyFilesWithVersionFromWALIfExists(
catalog::RelTableSchema* relTableSchema, const std::string& directory) {
fileOperationOnRelFiles(relTableSchema, directory,
replaceOriginalColumnFilesWithWALVersionIfExists,
replaceOriginalListFilesWithWALVersionIfExists);
}

static inline void removeDBFilesForNodeTable(
catalog::NodeTableSchema* tableSchema, const std::string& directory) {
fileOperationOnNodeFiles(
Expand Down Expand Up @@ -63,10 +49,6 @@ class WALReplayerUtils {
static void renameDBFilesForRelProperty(const std::string& directory,
catalog::RelTableSchema* relTableSchema, common::property_id_t propertyID);

static void replaceListsHeadersFilesWithVersionFromWALIfExists(
const std::unordered_set<catalog::RelTableSchema*>& relTableSchemas,
common::table_id_t boundTableID, const std::string& directory);

private:
static inline void removeColumnFilesForPropertyIfExists(const std::string& directory,
common::table_id_t relTableID, common::table_id_t boundTableID,
Expand Down
12 changes: 3 additions & 9 deletions src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,9 @@ void Database::rollbackAndClearWAL() {

void Database::recoverIfNecessary() {
if (!wal->isEmptyWAL()) {
if (wal->isLastLoggedRecordCommit()) {
logger->info("Starting up StorageManager and found a non-empty WAL with a committed "
"transaction. Replaying to checkpointInMemory.");
checkpointAndClearWAL(WALReplayMode::RECOVERY_CHECKPOINT);
} else {
logger->info("Starting up StorageManager and found a non-empty WAL but last record is "
"not commit. Clearing the WAL.");
wal->clearWAL();
}
logger->info("Starting up StorageManager and found a non-empty WAL with a committed "
"transaction. Replaying to checkpointInMemory.");
checkpointAndClearWAL(WALReplayMode::RECOVERY_CHECKPOINT);
}
}

Expand Down
25 changes: 14 additions & 11 deletions src/processor/mapper/map_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ static std::vector<property_id_t> populatePropertyIds(

static std::pair<DirectedRelTableData*, std::unique_ptr<RelTableScanState>>
getRelTableDataAndScanState(RelDataDirection direction, catalog::RelTableSchema* relTableSchema,
table_id_t boundNodeTableID, const RelsStore& relsStore, table_id_t relTableID,
table_id_t boundNodeTableID, RelsStore& relsStore, table_id_t relTableID,
const expression_vector& properties) {
if (relTableSchema->getBoundTableID(direction) != boundNodeTableID) {
// No data stored for given direction and boundNode.
Expand All @@ -40,17 +40,17 @@ getRelTableDataAndScanState(RelDataDirection direction, catalog::RelTableSchema*
propertyExpression->getPropertyID(relTableID) :
INVALID_PROPERTY_ID);
}
auto scanState = make_unique<RelTableScanState>(
std::move(propertyIds), relsStore.isSingleMultiplicityInDirection(direction, relTableID) ?
RelTableDataType::COLUMNS :
RelTableDataType::LISTS);
auto relStats = relsStore.getRelsStatistics().getRelStatistics(relTableID);
auto scanState = make_unique<RelTableScanState>(relStats, std::move(propertyIds),
relsStore.isSingleMultiplicityInDirection(direction, relTableID) ?
RelTableDataType::COLUMNS :
RelTableDataType::LISTS);
return std::make_pair(relData, std::move(scanState));
}

static std::unique_ptr<RelTableDataCollection> populateRelTableDataCollection(
table_id_t boundNodeTableID, const RelExpression& rel, ExtendDirection extendDirection,
const expression_vector& properties, const RelsStore& relsStore,
const catalog::Catalog& catalog) {
const expression_vector& properties, RelsStore& relsStore, const catalog::Catalog& catalog) {
std::vector<DirectedRelTableData*> relTableDatas;
std::vector<std::unique_ptr<RelTableScanState>> tableScanStates;
for (auto relTableID : rel.getTableIDs()) {
Expand Down Expand Up @@ -120,19 +120,22 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalExtendToPhysical(
extendDirection != planner::ExtendDirection::BOTH) {
auto relDataDirection = ExtendDirectionUtils::getRelDataDirection(extendDirection);
auto relTableID = rel->getSingleTableID();
auto relTableStats = relsStore.getRelsStatistics().getRelStatistics(relTableID);
if (relsStore.isSingleMultiplicityInDirection(relDataDirection, relTableID)) {
auto propertyIds = populatePropertyIds(relTableID, extend->getProperties());
return make_unique<ScanRelTableColumns>(
relsStore.getRelTable(relTableID)->getDirectedTableData(relDataDirection),
std::move(propertyIds), inNodeIDVectorPos, std::move(outputVectorsPos),
std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting());
relTableStats, std::move(propertyIds), inNodeIDVectorPos,
std::move(outputVectorsPos), std::move(prevOperator), getOperatorID(),
extend->getExpressionsForPrinting());
} else {
assert(!relsStore.isSingleMultiplicityInDirection(relDataDirection, relTableID));
auto propertyIds = populatePropertyIds(relTableID, extend->getProperties());
return make_unique<ScanRelTableLists>(
relsStore.getRelTable(relTableID)->getDirectedTableData(relDataDirection),
std::move(propertyIds), inNodeIDVectorPos, std::move(outputVectorsPos),
std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting());
relTableStats, std::move(propertyIds), inNodeIDVectorPos,
std::move(outputVectorsPos), std::move(prevOperator), getOperatorID(),
extend->getExpressionsForPrinting());
}
} else { // map to generic extend
std::unordered_map<table_id_t, std::unique_ptr<RelTableDataCollection>>
Expand Down
20 changes: 14 additions & 6 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace kuzu {
namespace processor {

CopyNodeSharedState::CopyNodeSharedState(uint64_t& numRows, storage::MemoryManager* memoryManager)
: numRows{numRows}, pkColumnID{0} {
: numRows{numRows}, pkColumnID{0}, hasLoggedWAL{false} {
auto ftTableSchema = std::make_unique<FactorizedTableSchema>();
ftTableSchema->appendColumn(
std::make_unique<ColumnSchema>(false /* flat */, 0 /* dataChunkPos */,
Expand All @@ -20,7 +20,7 @@ void CopyNodeSharedState::initializePrimaryKey(
common::LogicalTypeID::SERIAL) {
pkIndex = std::make_unique<storage::PrimaryKeyIndexBuilder>(
storage::StorageUtils::getNodeIndexFName(
directory, nodeTableSchema->tableID, common::DBFileType::WAL_VERSION),
directory, nodeTableSchema->tableID, common::DBFileType::ORIGINAL),
nodeTableSchema->getPrimaryKey().dataType);
pkIndex->bulkReserve(numRows);
}
Expand All @@ -40,13 +40,22 @@ void CopyNodeSharedState::initializeColumns(
// Skip SERIAL, as it is not physically stored.
continue;
}
auto fPath = storage::StorageUtils::getNodePropertyColumnFName(directory,
nodeTableSchema->tableID, property.propertyID, common::DBFileType::WAL_VERSION);
auto fPath = storage::StorageUtils::getNodePropertyColumnFName(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the dbfile type parameter from the getNodePropertyColumnFName, since we are always getting the original version of the node file

directory, nodeTableSchema->tableID, property.propertyID, common::DBFileType::ORIGINAL);
columns.push_back(std::make_unique<storage::InMemColumn>(fPath, property.dataType));
}
}

void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) {
{
std::unique_lock xLck{sharedState->mtx};
if (!sharedState->hasLoggedWAL) {
localState->wal->logCopyNodeRecord(localState->table->getTableID());
localState->wal->flushAllPages();
sharedState->hasLoggedWAL = true;
}
}
if (sharedState->hasLoggedWAL) {}
while (children[0]->getNextTuple(context)) {
std::vector<std::unique_ptr<storage::InMemColumnChunk>> columnChunks;
columnChunks.reserve(sharedState->columns.size());
Expand All @@ -69,21 +78,20 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) {
}

void CopyNode::finalize(kuzu::processor::ExecutionContext* context) {
auto tableID = localState->table->getTableID();
if (sharedState->pkIndex) {
sharedState->pkIndex->flush();
}
for (auto& column : sharedState->columns) {
column->saveToFile();
}
auto tableID = localState->table->getTableID();
for (auto& relTableSchema :
localState->catalog->getAllRelTableSchemasContainBoundTable(tableID)) {
localState->relsStore->getRelTable(relTableSchema->tableID)
->batchInitEmptyRelsForNewNodes(relTableSchema, sharedState->numRows);
}
localState->table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable(
tableID, sharedState->numRows);
localState->wal->logCopyNodeRecord(tableID);
auto outputMsg = common::StringUtils::string_format(
"{} number of tuples has been copied to table: {}.", sharedState->numRows,
localState->catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
Expand Down
5 changes: 2 additions & 3 deletions src/processor/operator/copy/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ namespace processor {

uint64_t CopyRel::executeInternal(
kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto relCopier = make_unique<RelCopyExecutor>(copyDescription, wal->getDirectory(),
*taskScheduler, *catalog, nodesStore, table, relsStatistics);
auto relCopier = std::make_unique<RelCopyExecutor>(
copyDescription, wal, *taskScheduler, *catalog, nodesStore, table, relsStatistics);
auto numRelsCopied = relCopier->copy(executionContext);
wal->logCopyRelRecord(tableID);
return numRelsCopied;
}

Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/scan/generic_scan_rel_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ bool RelTableDataCollection::scan(ValueVector* inVector,
std::unique_ptr<RelTableDataCollection> RelTableDataCollection::clone() const {
std::vector<std::unique_ptr<RelTableScanState>> clonedScanStates;
for (auto& scanState : tableScanStates) {
clonedScanStates.push_back(
make_unique<RelTableScanState>(scanState->propertyIds, scanState->relTableDataType));
clonedScanStates.push_back(make_unique<RelTableScanState>(
scanState->relStats, scanState->propertyIds, scanState->relTableDataType));
}
return make_unique<RelTableDataCollection>(relTableDatas, std::move(clonedScanStates));
}
Expand Down
Loading
Loading