Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache DiskArray write header in-memory #3109

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 26 additions & 68 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct DiskArrayHeader {

void readFromFile(FileHandle& fileHandle, uint64_t headerPageIdx);

bool operator==(const DiskArrayHeader& other) const = default;

// We do not need to store numElementsPerPageLog2, elementPageOffsetMask, and numArrayPages or
// save them on disk as they are functions of elementSize and numElements but we
// nonetheless store them (and save them to disk) for simplicity.
Expand Down Expand Up @@ -133,12 +135,18 @@ class BaseDiskArrayInternal {
checkpointOrRollbackInMemoryIfNecessaryNoLock(false /* is rollback */);
}

virtual void prepareCommit();

protected:
uint64_t pushBackNoLock(std::span<uint8_t> val);

uint64_t getNumElementsNoLock(transaction::TransactionType trxType);
inline uint64_t getNumElementsNoLock(transaction::TransactionType trxType) {
return getDiskArrayHeader(trxType).numElements;
}

uint64_t getNumAPsNoLock(transaction::TransactionType trxType);
inline uint64_t getNumAPsNoLock(transaction::TransactionType trxType) {
return getDiskArrayHeader(trxType).numAPs;
}

void setNextPIPPageIDxOfPIPNoLock(DiskArrayHeader* updatedDiskArrayHeader,
uint64_t pipIdxOfPreviousPIP, common::page_idx_t nextPIPPageIdx);
Expand Down Expand Up @@ -169,8 +177,13 @@ class BaseDiskArrayInternal {
bool checkOutOfBoundAccess(transaction::TransactionType trxType, uint64_t idx);
bool hasPIPUpdatesNoLock(uint64_t pipIdx);

uint64_t readUInt64HeaderFieldNoLock(
transaction::TransactionType trxType, std::function<uint64_t(DiskArrayHeader*)> readOp);
inline const DiskArrayHeader& getDiskArrayHeader(transaction::TransactionType trxType) {
if (trxType == transaction::TransactionType::READ_ONLY) {
return header;
} else {
return headerForWriteTrx;
}
}

// Returns the apPageIdx of the AP with idx apIdx and a bool indicating whether the apPageIdx is
// a newly inserted page.
Expand All @@ -184,6 +197,7 @@ class BaseDiskArrayInternal {
FileHandle& fileHandle;
DBFileID dbFileID;
common::page_idx_t headerPageIdx;
DiskArrayHeader headerForWriteTrx;
bool hasTransactionalUpdates;
BufferManager* bufferManager;
WAL* wal;
Expand Down Expand Up @@ -234,6 +248,7 @@ class BaseDiskArray {

inline void checkpointInMemoryIfNecessary() { diskArray.checkpointInMemoryIfNecessary(); }
inline void rollbackInMemoryIfNecessary() { diskArray.rollbackInMemoryIfNecessary(); }
inline void prepareCommit() { diskArray.prepareCommit(); }

private:
BaseDiskArrayInternal diskArray;
Expand Down Expand Up @@ -271,77 +286,20 @@ class BaseInMemDiskArray : public BaseDiskArrayInternal {
std::vector<std::unique_ptr<uint8_t[]>> inMemArrayPages;
};

/**
* Stores an array of type U's page by page in memory, using OS memory and not the buffer manager.
* Designed currently to be used by lists headers and metadata, where we want to avoid using
* pins/unpins when accessing data through the buffer manager.
*/
class InMemDiskArrayInternal : public BaseInMemDiskArray {
public:
InMemDiskArrayInternal(FileHandle& fileHandle, DBFileID dbFileID,
common::page_idx_t headerPageIdx, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction);

static inline common::page_idx_t addDAHPageToFile(
BMFileHandle& fileHandle, BufferManager* bufferManager, WAL* wal, size_t size) {
DiskArrayHeader daHeader(size);
return DBFileUtils::insertNewPage(fileHandle, DBFileID{DBFileType::METADATA},
*bufferManager, *wal,
[&](uint8_t* frame) -> void { memcpy(frame, &daHeader, sizeof(DiskArrayHeader)); });
}

inline void checkpointInMemoryIfNecessary() override {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(true /* is checkpoint */);
}
inline void rollbackInMemoryIfNecessary() override {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(false /* is rollback */);
}

private:
void checkpointOrRollbackInMemoryIfNecessaryNoLock(bool isCheckpoint) override;
};

template<typename U>
class InMemDiskArray {
class InMemDiskArray : public BaseDiskArray<U> {
public:
// Used when loading from file
InMemDiskArray(FileHandle& fileHandle, DBFileID dbFileID, common::page_idx_t headerPageIdx,
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction)
: diskArray(fileHandle, dbFileID, headerPageIdx, bufferManager, wal, transaction) {}

inline U& operator[](uint64_t idx) { return *(U*)diskArray[idx]; }

: BaseDiskArray<U>(fileHandle, dbFileID, headerPageIdx, bufferManager, wal, transaction) {}
static inline common::page_idx_t addDAHPageToFile(
BMFileHandle& fileHandle, BufferManager* bufferManager, WAL* wal) {
return InMemDiskArrayInternal::addDAHPageToFile(fileHandle, bufferManager, wal, sizeof(U));
}

// Note: This function is to be used only by the WRITE trx.
inline void update(uint64_t idx, U val) { diskArray.update(idx, getSpan(val)); }

inline U get(uint64_t idx, transaction::TransactionType trxType) {
U val;
diskArray.get(idx, trxType, getSpan(val));
return val;
}

// Note: Currently, this function doesn't support shrinking the size of the array.
inline uint64_t resize(uint64_t newNumElements) {
U defaultVal;
return diskArray.resize(newNumElements, getSpan(defaultVal));
}

inline uint64_t getNumElements(
transaction::TransactionType trxType = transaction::TransactionType::READ_ONLY) {
return diskArray.getNumElements(trxType);
DiskArrayHeader daHeader(sizeof(U));
return DBFileUtils::insertNewPage(fileHandle, DBFileID{DBFileType::METADATA},
*bufferManager, *wal,
[&](uint8_t* frame) -> void { memcpy(frame, &daHeader, sizeof(DiskArrayHeader)); });
}

inline void checkpointInMemoryIfNecessary() { diskArray.checkpointInMemoryIfNecessary(); }
inline void rollbackInMemoryIfNecessary() { diskArray.rollbackInMemoryIfNecessary(); }

private:
InMemDiskArrayInternal diskArray;
};

class InMemDiskArrayBuilderInternal : public BaseInMemDiskArray {
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class Column {

Column* getNullColumn();

virtual void prepareCommit();
virtual void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/dictionary_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class DictionaryColumn {
uint64_t getNumValuesInOffsets(
transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx);

void prepareCommit();
void checkpointInMemory();
void rollbackInMemory();

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class NodeTable final : public Table {
inline void append(ChunkedNodeGroup* nodeGroup) { tableData->append(nodeGroup); }

void prepareCommit(transaction::Transaction* transaction, LocalTable* localTable) override;
void prepareCommit() override;
void prepareRollback(LocalTable* localTable) override;
void checkpointInMemory() override;
void rollbackInMemory() override;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class RelTable final : public Table {
}

void prepareCommit(transaction::Transaction* transaction, LocalTable* localTable) override;
void prepareCommit() override;
void prepareRollback(LocalTable* localTable) override;
void checkpointInMemory() override;
void rollbackInMemory() override;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class RelTableData final : public TableData {
void prepareCommitNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalRelNG* localRelNG);

void prepareCommit() override;
void checkpointInMemory() override;
void rollbackInMemory() override;

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/string_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class StringColumn final : public Column {
void write(common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk,
ColumnChunk* data, common::offset_t dataOffset, common::length_t numValues) override;

void prepareCommit() override;
void checkpointInMemory() override;
void rollbackInMemory() override;

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/struct_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class StructColumn final : public Column {

void checkpointInMemory() override;
void rollbackInMemory() override;
void prepareCommit() override;

inline Column* getChild(common::vector_idx_t childIdx) {
KU_ASSERT(childIdx < childColumns.size());
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/store/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class Table {
virtual void dropColumn(common::column_id_t columnID) = 0;

virtual void prepareCommit(transaction::Transaction* transaction, LocalTable* localTable) = 0;
// For metadata-only updates
virtual void prepareCommit() = 0;
virtual void prepareRollback(LocalTable* localTable) = 0;
virtual void checkpointInMemory() = 0;
virtual void rollbackInMemory() = 0;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class TableData {
transaction::Transaction* transaction, LocalTableData* localTable) = 0;
virtual void checkpointInMemory();
virtual void rollbackInMemory();
virtual void prepareCommit();

virtual common::node_group_idx_t getNumNodeGroups(
transaction::Transaction* transaction) const = 0;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/var_list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class VarListColumn : public Column {
void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage);

void prepareCommit() final;
void checkpointInMemory() final;
void rollbackInMemory() final;

Expand Down
3 changes: 3 additions & 0 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ void HashIndex<T>::prepareCommit() {
[this](
Key key, offset_t value) -> void { this->insertIntoPersistentIndex(key, value); });
headerArray->update(INDEX_HEADER_IDX_IN_ARRAY, *indexHeaderForWriteTrx);
headerArray->prepareCommit();
pSlots->prepareCommit();
oSlots->prepareCommit();
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ void StorageManager::dropTable(table_id_t tableID) {

void StorageManager::prepareCommit(transaction::Transaction* transaction) {
transaction->getLocalStorage()->prepareCommit();
// Tables which are created but not inserted into may have pending writes
// which need to be flushed (specifically, the metadata disk array header)
// TODO(bmwinger): wal->getUpdatedTables isn't the ideal place to store this information
for (auto tableID : wal->getUpdatedTables()) {
benjaminwinger marked this conversation as resolved.
Show resolved Hide resolved
if (transaction->getLocalStorage()->getLocalTable(tableID) == nullptr) {
getTable(tableID)->prepareCommit();
}
}
if (nodesStatisticsAndDeletedIDs->hasUpdates()) {
wal->logTableStatisticsRecord(true /* isNodeTable */);
nodesStatisticsAndDeletedIDs->writeTablesStatisticsFileForWALRecord(wal->getDirectory());
Expand Down
Loading
Loading