Skip to content

Commit

Permalink
use stats
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jun 8, 2023
1 parent feb9781 commit 4d7df5f
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 71 deletions.
15 changes: 9 additions & 6 deletions src/include/processor/operator/scan/scan_rel_table_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,30 @@

#include "processor/operator/filtering_operator.h"
#include "processor/operator/scan/scan_rel_table.h"
#include "storage/store/rels_statistics.h"

namespace kuzu {
namespace processor {

class ScanRelTableColumns : public ScanRelTable, public SelVectorOverWriter {
public:
ScanRelTableColumns(storage::DirectedRelTableData* tableData, std::vector<uint32_t> propertyIds,
const DataPos& inNodeIDVectorPos, std::vector<DataPos> outputVectorsPos,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
ScanRelTableColumns(storage::DirectedRelTableData* tableData, storage::RelStatistics* relStats,
std::vector<uint32_t> propertyIds, const DataPos& inNodeIDVectorPos,
std::vector<DataPos> outputVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanRelTable{inNodeIDVectorPos, std::move(outputVectorsPos),
PhysicalOperatorType::SCAN_REL_TABLE_COLUMNS, std::move(child), id, paramsString},
tableData{tableData} {
scanState = std::make_unique<storage::RelTableScanState>(
std::move(propertyIds), storage::RelTableDataType::COLUMNS);
relStats, std::move(propertyIds), storage::RelTableDataType::COLUMNS);
}

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanRelTableColumns>(tableData, scanState->propertyIds,
inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id, paramsString);
return std::make_unique<ScanRelTableColumns>(tableData, scanState->relStats,
scanState->propertyIds, inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id,
paramsString);
}

private:
Expand Down
14 changes: 8 additions & 6 deletions src/include/processor/operator/scan/scan_rel_table_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ namespace processor {

class ScanRelTableLists : public ScanRelTable {
public:
ScanRelTableLists(storage::DirectedRelTableData* tableData, std::vector<uint32_t> propertyIds,
const DataPos& inNodeIDVectorPos, std::vector<DataPos> outputVectorsPos,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
ScanRelTableLists(storage::DirectedRelTableData* tableData, storage::RelStatistics* relStats,
std::vector<uint32_t> propertyIds, const DataPos& inNodeIDVectorPos,
std::vector<DataPos> outputVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanRelTable{inNodeIDVectorPos, std::move(outputVectorsPos),
PhysicalOperatorType::SCAN_REL_TABLE_LISTS, std::move(child), id, paramsString},
tableData{tableData} {
scanState = std::make_unique<storage::RelTableScanState>(
std::move(propertyIds), storage::RelTableDataType::LISTS);
relStats, std::move(propertyIds), storage::RelTableDataType::LISTS);
}

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanRelTableLists>(tableData, scanState->propertyIds, inNodeIDVectorPos,
outputVectorsPos, children[0]->clone(), id, paramsString);
return make_unique<ScanRelTableLists>(tableData, scanState->relStats,
scanState->propertyIds, inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id,
paramsString);
}

private:
Expand Down
4 changes: 1 addition & 3 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ class NodeTable {

void initializeData(catalog::NodeTableSchema* nodeTableSchema);

void resetColumns(catalog::NodeTableSchema* nodeTableSchema);

inline common::offset_t getMaxNodeOffset(transaction::Transaction* trx) const {
return nodesStatisticsAndDeletedIDs->getMaxNodeOffset(trx, tableID);
}
Expand Down Expand Up @@ -58,7 +56,7 @@ class NodeTable {
void prepareRollback();
inline void checkpointInMemory() { pkIndex->checkpointInMemory(); }
inline void rollback() { pkIndex->rollback(); }
inline void revertCopy() { // truncate all columns to empty.
inline void revertCopy() { // Truncate all columns to empty.
for (auto& [_, column] : propertyColumns) {
column->truncateToEmpty();

Check warning on line 61 in src/include/storage/store/node_table.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/store/node_table.h#L59-L61

Added lines #L59 - L61 were not covered by tests
}
Expand Down
10 changes: 8 additions & 2 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "storage/storage_structure/column.h"
#include "storage/storage_structure/lists/lists.h"
#include "storage/storage_utils.h"
#include "storage/store/rels_statistics.h"

namespace kuzu {
namespace storage {
Expand Down Expand Up @@ -40,9 +41,10 @@ class ListsUpdateIteratorsForDirection {

struct RelTableScanState {
public:
RelTableScanState(
RelTableScanState(storage::RelStatistics* relStats,
std::vector<common::property_id_t> propertyIds, RelTableDataType relTableDataType)
: relTableDataType{relTableDataType}, propertyIds{std::move(propertyIds)} {
: relStats{relStats}, relTableDataType{relTableDataType}, propertyIds{
std::move(propertyIds)} {
if (relTableDataType == RelTableDataType::LISTS) {
syncState = std::make_unique<ListSyncState>();
// The first listHandle is for adj lists.
Expand All @@ -58,6 +60,7 @@ struct RelTableScanState {
syncState->hasMoreAndSwitchSourceIfNecessary();
}

RelStatistics* relStats;
RelTableDataType relTableDataType;
std::vector<common::property_id_t> propertyIds;
// sync state between adj and property lists
Expand Down Expand Up @@ -98,6 +101,9 @@ class DirectedRelTableData {
inline void scan(transaction::Transaction* transaction, RelTableScanState& scanState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) {
if (scanState.relStats->getNumTuples() == 0) {
return;
}
if (scanState.relTableDataType == RelTableDataType::COLUMNS) {
scanColumns(transaction, scanState, inNodeIDVector, outputVectors);
} else {
Expand Down
25 changes: 14 additions & 11 deletions src/processor/mapper/map_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static std::vector<property_id_t> populatePropertyIds(

static std::pair<DirectedRelTableData*, std::unique_ptr<RelTableScanState>>
getRelTableDataAndScanState(RelDataDirection direction, catalog::RelTableSchema* relTableSchema,
table_id_t boundNodeTableID, const RelsStore& relsStore, table_id_t relTableID,
table_id_t boundNodeTableID, RelsStore& relsStore, table_id_t relTableID,
const expression_vector& properties) {
if (relTableSchema->getBoundTableID(direction) != boundNodeTableID) {
// No data stored for given direction and boundNode.
Expand All @@ -41,17 +41,17 @@ getRelTableDataAndScanState(RelDataDirection direction, catalog::RelTableSchema*
propertyExpression->getPropertyID(relTableID) :
INVALID_PROPERTY_ID);
}
auto scanState = make_unique<RelTableScanState>(
std::move(propertyIds), relsStore.isSingleMultiplicityInDirection(direction, relTableID) ?
RelTableDataType::COLUMNS :
RelTableDataType::LISTS);
auto relStats = relsStore.getRelsStatistics().getRelStatistics(relTableID);
auto scanState = make_unique<RelTableScanState>(relStats, std::move(propertyIds),
relsStore.isSingleMultiplicityInDirection(direction, relTableID) ?
RelTableDataType::COLUMNS :
RelTableDataType::LISTS);
return std::make_pair(relData, std::move(scanState));
}

static std::unique_ptr<RelTableDataCollection> populateRelTableDataCollection(
table_id_t boundNodeTableID, const RelExpression& rel, ExtendDirection extendDirection,
const expression_vector& properties, const RelsStore& relsStore,
const catalog::Catalog& catalog) {
const expression_vector& properties, RelsStore& relsStore, const catalog::Catalog& catalog) {
std::vector<DirectedRelTableData*> relTableDatas;
std::vector<std::unique_ptr<RelTableScanState>> tableScanStates;
for (auto relTableID : rel.getTableIDs()) {
Expand Down Expand Up @@ -121,19 +121,22 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalExtendToPhysical(
extendDirection != planner::ExtendDirection::BOTH) {
auto relDataDirection = ExtendDirectionUtils::getRelDataDirection(extendDirection);
auto relTableID = rel->getSingleTableID();
auto relTableStats = relsStore.getRelsStatistics().getRelStatistics(relTableID);
if (relsStore.isSingleMultiplicityInDirection(relDataDirection, relTableID)) {
auto propertyIds = populatePropertyIds(relTableID, extend->getProperties());
return make_unique<ScanRelTableColumns>(
relsStore.getRelTable(relTableID)->getDirectedTableData(relDataDirection),
std::move(propertyIds), inNodeIDVectorPos, std::move(outputVectorsPos),
std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting());
relTableStats, std::move(propertyIds), inNodeIDVectorPos,
std::move(outputVectorsPos), std::move(prevOperator), getOperatorID(),
extend->getExpressionsForPrinting());
} else {
assert(!relsStore.isSingleMultiplicityInDirection(relDataDirection, relTableID));
auto propertyIds = populatePropertyIds(relTableID, extend->getProperties());
return make_unique<ScanRelTableLists>(
relsStore.getRelTable(relTableID)->getDirectedTableData(relDataDirection),
std::move(propertyIds), inNodeIDVectorPos, std::move(outputVectorsPos),
std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting());
relTableStats, std::move(propertyIds), inNodeIDVectorPos,
std::move(outputVectorsPos), std::move(prevOperator), getOperatorID(),
extend->getExpressionsForPrinting());
}
} else { // map to generic extend
std::unordered_map<table_id_t, std::unique_ptr<RelTableDataCollection>>
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,22 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) {
}

void CopyNode::finalize(kuzu::processor::ExecutionContext* context) {
auto tableID = localState->table->getTableID();
localState->wal->logCopyNodeRecord(tableID);
localState->wal->flushAllPages();
if (sharedState->pkIndex) {
sharedState->pkIndex->flush();
}
for (auto& column : sharedState->columns) {
column->saveToFile();
}
auto tableID = localState->table->getTableID();
for (auto& relTableSchema :
localState->catalog->getAllRelTableSchemasContainBoundTable(tableID)) {
localState->relsStore->getRelTable(relTableSchema->tableID)
->batchInitEmptyRelsForNewNodes(relTableSchema, sharedState->numRows);
}
localState->table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable(
tableID, sharedState->numRows);
localState->wal->logCopyNodeRecord(tableID);
auto outputMsg = common::StringUtils::string_format(
"{} number of tuples has been copied to table: {}.", sharedState->numRows,
localState->catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/copy/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ uint64_t CopyRel::executeInternal(
kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto relCopier = make_unique<RelCopyExecutor>(copyDescription, wal->getDirectory(),
*taskScheduler, *catalog, nodesStore, table, relsStatistics);
auto numRelsCopied = relCopier->copy(executionContext);
wal->logCopyRelRecord(tableID);
wal->flushAllPages();
auto numRelsCopied = relCopier->copy(executionContext);
return numRelsCopied;
}

Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/scan/generic_scan_rel_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ bool RelTableDataCollection::scan(ValueVector* inVector,
std::unique_ptr<RelTableDataCollection> RelTableDataCollection::clone() const {
std::vector<std::unique_ptr<RelTableScanState>> clonedScanStates;
for (auto& scanState : tableScanStates) {
clonedScanStates.push_back(
make_unique<RelTableScanState>(scanState->propertyIds, scanState->relTableDataType));
clonedScanStates.push_back(make_unique<RelTableScanState>(
scanState->relStats, scanState->propertyIds, scanState->relTableDataType));
}
return make_unique<RelTableDataCollection>(relTableDatas, std::move(clonedScanStates));
}
Expand Down
7 changes: 0 additions & 7 deletions src/storage/store/node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ void NodeTable::initializeData(NodeTableSchema* nodeTableSchema) {
}
}

void NodeTable::resetColumns(catalog::NodeTableSchema* nodeTableSchema) {
for (auto& property : nodeTableSchema->getAllNodeProperties()) {
propertyColumns[property.propertyID].reset();
}
pkIndex.reset();
}

void NodeTable::scan(transaction::Transaction* transaction, ValueVector* inputIDVector,
const std::vector<uint32_t>& columnIds, std::vector<ValueVector*> outputVectors) {
assert(columnIds.size() == outputVectors.size());
Expand Down
35 changes: 4 additions & 31 deletions src/storage/wal_replayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,29 +288,9 @@ void WALReplayer::replayCopyNodeRecord(const kuzu::storage::WALRecord& walRecord
}
} else {
// RECOVERY.
auto catalogForRecovery = getCatalogForRecovery(DBFileType::ORIGINAL);
if (wal->isLastLoggedRecordCommit()) {
// Already committed. Nothing to redo or undo.
return;
}
// Not committed, we need to undo the changes.
// Truncate node table files.
auto nodeTable =
storageManager->getNodesStore().getNodeTable(walRecord.copyNodeRecord.tableID);
nodeTable->rollback();
nodeTable->initializeData(
catalogForRecovery->getReadOnlyVersion()->getNodeTableSchema(tableID));
// Truncate rel table files.
auto relTableSchemas = catalog->getAllRelTableSchemasContainBoundTable(tableID);
for (auto relTableSchema : relTableSchemas) {
auto relDirection = relTableSchema->getBoundTableID(FWD) == tableID ? FWD : BWD;
storageManager->getRelsStore()
.getRelTable(relTableSchema->tableID)
->revertCopy(relDirection);
}
}
} else {
// Rollback. Truncate files.
// ROLLBACK.
auto nodeTableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(tableID);
auto nodeTable = storageManager->getNodesStore().getNodeTable(tableID);
nodeTable->revertCopy();
Expand All @@ -329,6 +309,7 @@ void WALReplayer::replayCopyRelRecord(const kuzu::storage::WALRecord& walRecord)
auto tableID = walRecord.copyRelRecord.tableID;
if (isCheckpoint) {
if (!isRecovering) {
// CHECKPOINT.
storageManager->getRelsStore().getRelTable(tableID)->resetColumnsAndLists(
catalog->getReadOnlyVersion()->getRelTableSchema(tableID));
// See comments for COPY_NODE_RECORD.
Expand All @@ -337,18 +318,10 @@ void WALReplayer::replayCopyRelRecord(const kuzu::storage::WALRecord& walRecord)
storageManager->getNodesStore().getNodesStatisticsAndDeletedIDs().setAdjListsAndColumns(
&storageManager->getRelsStore());
} else {
// Recovery.
if (wal->isLastLoggedRecordCommit()) {
return;
}
// Not COMMIT, so we need to revert the changes.
// Truncate rel table files.
auto relTable = storageManager->getRelsStore().getRelTable(tableID);
relTable->revertCopy(FWD);
relTable->revertCopy(BWD);
// RECOVERY.
}
} else {
// Rollback. Truncate rel table files.
// ROLLBACK.
auto relTable = storageManager->getRelsStore().getRelTable(tableID);
relTable->revertCopy(FWD);
relTable->revertCopy(BWD);
Expand Down

0 comments on commit 4d7df5f

Please sign in to comment.