Skip to content

Commit

Permalink
Cache DiskArray write header in-memory (#3109)
Browse files Browse the repository at this point in the history
* Cache DiskArray write header in-memory

* Replace InMemDiskArray with BaseDiskArray
  • Loading branch information
benjaminwinger committed Mar 26, 2024
1 parent de72fc9 commit a85f4fe
Show file tree
Hide file tree
Showing 23 changed files with 148 additions and 186 deletions.
94 changes: 26 additions & 68 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct DiskArrayHeader {

void readFromFile(FileHandle& fileHandle, uint64_t headerPageIdx);

bool operator==(const DiskArrayHeader& other) const = default;

// We do not need to store numElementsPerPageLog2, elementPageOffsetMask, and numArrayPages or
// save them on disk as they are functions of elementSize and numElements but we
// nonetheless store them (and save them to disk) for simplicity.
Expand Down Expand Up @@ -133,12 +135,18 @@ class BaseDiskArrayInternal {
checkpointOrRollbackInMemoryIfNecessaryNoLock(false /* is rollback */);
}

virtual void prepareCommit();

protected:
uint64_t pushBackNoLock(std::span<uint8_t> val);

uint64_t getNumElementsNoLock(transaction::TransactionType trxType);
inline uint64_t getNumElementsNoLock(transaction::TransactionType trxType) {
return getDiskArrayHeader(trxType).numElements;
}

uint64_t getNumAPsNoLock(transaction::TransactionType trxType);
inline uint64_t getNumAPsNoLock(transaction::TransactionType trxType) {
return getDiskArrayHeader(trxType).numAPs;
}

void setNextPIPPageIDxOfPIPNoLock(DiskArrayHeader* updatedDiskArrayHeader,
uint64_t pipIdxOfPreviousPIP, common::page_idx_t nextPIPPageIdx);
Expand Down Expand Up @@ -169,8 +177,13 @@ class BaseDiskArrayInternal {
bool checkOutOfBoundAccess(transaction::TransactionType trxType, uint64_t idx);
bool hasPIPUpdatesNoLock(uint64_t pipIdx);

uint64_t readUInt64HeaderFieldNoLock(
transaction::TransactionType trxType, std::function<uint64_t(DiskArrayHeader*)> readOp);
inline const DiskArrayHeader& getDiskArrayHeader(transaction::TransactionType trxType) {
if (trxType == transaction::TransactionType::READ_ONLY) {
return header;
} else {
return headerForWriteTrx;
}
}

// Returns the apPageIdx of the AP with idx apIdx and a bool indicating whether the apPageIdx is
// a newly inserted page.
Expand All @@ -184,6 +197,7 @@ class BaseDiskArrayInternal {
FileHandle& fileHandle;
DBFileID dbFileID;
common::page_idx_t headerPageIdx;
DiskArrayHeader headerForWriteTrx;
bool hasTransactionalUpdates;
BufferManager* bufferManager;
WAL* wal;
Expand Down Expand Up @@ -234,6 +248,7 @@ class BaseDiskArray {

inline void checkpointInMemoryIfNecessary() { diskArray.checkpointInMemoryIfNecessary(); }
inline void rollbackInMemoryIfNecessary() { diskArray.rollbackInMemoryIfNecessary(); }
inline void prepareCommit() { diskArray.prepareCommit(); }

private:
BaseDiskArrayInternal diskArray;
Expand Down Expand Up @@ -271,77 +286,20 @@ class BaseInMemDiskArray : public BaseDiskArrayInternal {
std::vector<std::unique_ptr<uint8_t[]>> inMemArrayPages;
};

/**
* Stores an array of type U's page by page in memory, using OS memory and not the buffer manager.
* Designed currently to be used by lists headers and metadata, where we want to avoid using
* pins/unpins when accessing data through the buffer manager.
*/
class InMemDiskArrayInternal : public BaseInMemDiskArray {
public:
InMemDiskArrayInternal(FileHandle& fileHandle, DBFileID dbFileID,
common::page_idx_t headerPageIdx, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction);

static inline common::page_idx_t addDAHPageToFile(
BMFileHandle& fileHandle, BufferManager* bufferManager, WAL* wal, size_t size) {
DiskArrayHeader daHeader(size);
return DBFileUtils::insertNewPage(fileHandle, DBFileID{DBFileType::METADATA},
*bufferManager, *wal,
[&](uint8_t* frame) -> void { memcpy(frame, &daHeader, sizeof(DiskArrayHeader)); });
}

inline void checkpointInMemoryIfNecessary() override {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(true /* is checkpoint */);
}
inline void rollbackInMemoryIfNecessary() override {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(false /* is rollback */);
}

private:
void checkpointOrRollbackInMemoryIfNecessaryNoLock(bool isCheckpoint) override;
};

template<typename U>
class InMemDiskArray {
class InMemDiskArray : public BaseDiskArray<U> {
public:
// Used when loading from file
InMemDiskArray(FileHandle& fileHandle, DBFileID dbFileID, common::page_idx_t headerPageIdx,
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction)
: diskArray(fileHandle, dbFileID, headerPageIdx, bufferManager, wal, transaction) {}

inline U& operator[](uint64_t idx) { return *(U*)diskArray[idx]; }

: BaseDiskArray<U>(fileHandle, dbFileID, headerPageIdx, bufferManager, wal, transaction) {}
static inline common::page_idx_t addDAHPageToFile(
BMFileHandle& fileHandle, BufferManager* bufferManager, WAL* wal) {
return InMemDiskArrayInternal::addDAHPageToFile(fileHandle, bufferManager, wal, sizeof(U));
}

// Note: This function is to be used only by the WRITE trx.
inline void update(uint64_t idx, U val) { diskArray.update(idx, getSpan(val)); }

inline U get(uint64_t idx, transaction::TransactionType trxType) {
U val;
diskArray.get(idx, trxType, getSpan(val));
return val;
}

// Note: Currently, this function doesn't support shrinking the size of the array.
inline uint64_t resize(uint64_t newNumElements) {
U defaultVal;
return diskArray.resize(newNumElements, getSpan(defaultVal));
}

inline uint64_t getNumElements(
transaction::TransactionType trxType = transaction::TransactionType::READ_ONLY) {
return diskArray.getNumElements(trxType);
DiskArrayHeader daHeader(sizeof(U));
return DBFileUtils::insertNewPage(fileHandle, DBFileID{DBFileType::METADATA},
*bufferManager, *wal,
[&](uint8_t* frame) -> void { memcpy(frame, &daHeader, sizeof(DiskArrayHeader)); });
}

inline void checkpointInMemoryIfNecessary() { diskArray.checkpointInMemoryIfNecessary(); }
inline void rollbackInMemoryIfNecessary() { diskArray.rollbackInMemoryIfNecessary(); }

private:
InMemDiskArrayInternal diskArray;
};

class InMemDiskArrayBuilderInternal : public BaseInMemDiskArray {
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class Column {

Column* getNullColumn();

virtual void prepareCommit();
virtual void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/dictionary_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class DictionaryColumn {
uint64_t getNumValuesInOffsets(
transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx);

void prepareCommit();
void checkpointInMemory();
void rollbackInMemory();

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class NodeTable final : public Table {
inline void append(ChunkedNodeGroup* nodeGroup) { tableData->append(nodeGroup); }

void prepareCommit(transaction::Transaction* transaction, LocalTable* localTable) override;
void prepareCommit() override;
void prepareRollback(LocalTable* localTable) override;
void checkpointInMemory() override;
void rollbackInMemory() override;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class RelTable final : public Table {
}

void prepareCommit(transaction::Transaction* transaction, LocalTable* localTable) override;
void prepareCommit() override;
void prepareRollback(LocalTable* localTable) override;
void checkpointInMemory() override;
void rollbackInMemory() override;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class RelTableData final : public TableData {
void prepareCommitNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalRelNG* localRelNG);

void prepareCommit() override;
void checkpointInMemory() override;
void rollbackInMemory() override;

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/string_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class StringColumn final : public Column {
void write(common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk,
ColumnChunk* data, common::offset_t dataOffset, common::length_t numValues) override;

void prepareCommit() override;
void checkpointInMemory() override;
void rollbackInMemory() override;

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/struct_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class StructColumn final : public Column {

void checkpointInMemory() override;
void rollbackInMemory() override;
void prepareCommit() override;

inline Column* getChild(common::vector_idx_t childIdx) {
KU_ASSERT(childIdx < childColumns.size());
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/store/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class Table {
virtual void dropColumn(common::column_id_t columnID) = 0;

virtual void prepareCommit(transaction::Transaction* transaction, LocalTable* localTable) = 0;
// For metadata-only updates
virtual void prepareCommit() = 0;
virtual void prepareRollback(LocalTable* localTable) = 0;
virtual void checkpointInMemory() = 0;
virtual void rollbackInMemory() = 0;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class TableData {
transaction::Transaction* transaction, LocalTableData* localTable) = 0;
virtual void checkpointInMemory();
virtual void rollbackInMemory();
virtual void prepareCommit();

virtual common::node_group_idx_t getNumNodeGroups(
transaction::Transaction* transaction) const = 0;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/var_list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class VarListColumn : public Column {
void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage);

void prepareCommit() final;
void checkpointInMemory() final;
void rollbackInMemory() final;

Expand Down
3 changes: 3 additions & 0 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ void HashIndex<T>::prepareCommit() {
[this](
Key key, offset_t value) -> void { this->insertIntoPersistentIndex(key, value); });
headerArray->update(INDEX_HEADER_IDX_IN_ARRAY, *indexHeaderForWriteTrx);
headerArray->prepareCommit();
pSlots->prepareCommit();
oSlots->prepareCommit();
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ void StorageManager::dropTable(table_id_t tableID) {

void StorageManager::prepareCommit(transaction::Transaction* transaction) {
transaction->getLocalStorage()->prepareCommit();
// Tables which are created but not inserted into may have pending writes
// which need to be flushed (specifically, the metadata disk array header)
// TODO(bmwinger): wal->getUpdatedTables isn't the ideal place to store this information
for (auto tableID : wal->getUpdatedTables()) {
if (transaction->getLocalStorage()->getLocalTable(tableID) == nullptr) {
getTable(tableID)->prepareCommit();
}
}
if (nodesStatisticsAndDeletedIDs->hasUpdates()) {
wal->logTableStatisticsRecord(true /* isNodeTable */);
nodesStatisticsAndDeletedIDs->writeTablesStatisticsFileForWALRecord(wal->getDirectory());
Expand Down
Loading

0 comments on commit a85f4fe

Please sign in to comment.