Skip to content

Commit

Permalink
Fix asan issue during multi copy (#3431)
Browse files Browse the repository at this point in the history
* fix asan issue during multi copy

* add more multi copy in ldbc sf01
  • Loading branch information
ray6080 committed May 2, 2024
1 parent 3b0ce25 commit 82b4a5b
Show file tree
Hide file tree
Showing 12 changed files with 391,564 additions and 391,556 deletions.
107,802 changes: 1 addition & 107,801 deletions dataset/ldbc-sf01/Comment_hasCreator_Person.csv

Large diffs are not rendered by default.

107,800 changes: 107,800 additions & 0 deletions dataset/ldbc-sf01/Comment_hasCreator_Person_1.csv

Large diffs are not rendered by default.

171,711 changes: 0 additions & 171,711 deletions dataset/ldbc-sf01/Comment_hasTag_Tag.csv

Large diffs are not rendered by default.

171,711 changes: 171,711 additions & 0 deletions dataset/ldbc-sf01/Comment_hasTag_Tag_1.csv

Large diffs are not rendered by default.

84,636 changes: 0 additions & 84,636 deletions dataset/ldbc-sf01/Comment_isLocatedIn_Place.csv

Large diffs are not rendered by default.

84,636 changes: 84,636 additions & 0 deletions dataset/ldbc-sf01/Comment_isLocatedIn_Place_1.csv

Large diffs are not rendered by default.

27,385 changes: 0 additions & 27,385 deletions dataset/ldbc-sf01/Person_likes_Post.csv

Large diffs are not rendered by default.

27,385 changes: 27,385 additions & 0 deletions dataset/ldbc-sf01/Person_likes_Post_1.csv

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions dataset/ldbc-sf01/copy.cypher
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ COPY Post FROM "dataset/ldbc-sf01/Post.csv" (HEADER=true, DELIM='|');
COPY Tag FROM "dataset/ldbc-sf01/Tag.csv" (HEADER=true, DELIM='|');
COPY TagClass FROM "dataset/ldbc-sf01/TagClass.csv" (HEADER=true, DELIM='|');
COPY Comment_hasCreator FROM "dataset/ldbc-sf01/Comment_hasCreator_Person.csv" (HEADER=true, DELIM='|');
COPY Comment_hasCreator FROM "dataset/ldbc-sf01/Comment_hasCreator_Person_1.csv" (HEADER=false, DELIM='|');
COPY Comment_hasTag FROM "dataset/ldbc-sf01/Comment_hasTag_Tag.csv" (HEADER=true, DELIM='|');
COPY Comment_hasTag FROM "dataset/ldbc-sf01/Comment_hasTag_Tag_1.csv" (HEADER=false, DELIM='|');
COPY Comment_isLocatedIn FROM "dataset/ldbc-sf01/Comment_isLocatedIn_Place.csv" (HEADER=true, DELIM='|');
COPY Comment_isLocatedIn FROM "dataset/ldbc-sf01/Comment_isLocatedIn_Place_1.csv" (HEADER=false, DELIM='|');
COPY replyOf_Comment FROM "dataset/ldbc-sf01/Comment_replyOf_Comment.csv" (HEADER=true, DELIM='|');
COPY replyOf_Post FROM "dataset/ldbc-sf01/Comment_replyOf_Post.csv" (HEADER=true, DELIM='|');
COPY containerOf FROM "dataset/ldbc-sf01/Forum_containerOf_Post.csv" (HEADER=true, DELIM='|');
Expand All @@ -23,6 +26,7 @@ COPY knows FROM "dataset/ldbc-sf01/Person_knows_Person_1.csv" (HEADER=true, DELI
COPY likes_Comment FROM "dataset/ldbc-sf01/Person_likes_Comment.csv" (HEADER=true, DELIM='|');
COPY likes_Comment FROM "dataset/ldbc-sf01/Person_likes_Comment_1.csv" (HEADER=true, DELIM='|');
COPY likes_Post FROM "dataset/ldbc-sf01/Person_likes_Post.csv" (HEADER=true, DELIM='|');
COPY likes_Post FROM "dataset/ldbc-sf01/Person_likes_Post_1.csv" (HEADER=false, DELIM='|');
COPY studyAt FROM "dataset/ldbc-sf01/Person_studyAt_Organisation.csv" (HEADER=true, DELIM='|');
COPY workAt FROM "dataset/ldbc-sf01/Person_workAt_Organisation.csv" (HEADER=true, DELIM='|');
COPY isPartOf FROM "dataset/ldbc-sf01/Place_isPartOf_Place.csv" (HEADER=true, DELIM='|');
Expand Down
7 changes: 3 additions & 4 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <unordered_map>
#include <map>

#include "common/types/internal_id_t.h"
#include "common/vector/value_vector.h"
Expand All @@ -10,9 +10,8 @@
namespace kuzu {
namespace storage {

using offset_to_row_idx_t = std::unordered_map<common::offset_t, common::row_idx_t>;
using offset_to_row_idx_vec_t =
std::unordered_map<common::offset_t, std::vector<common::row_idx_t>>;
using offset_to_row_idx_t = std::map<common::offset_t, common::row_idx_t>;
using offset_to_row_idx_vec_t = std::map<common::offset_t, std::vector<common::row_idx_t>>;
using offset_set_t = std::unordered_set<common::offset_t>;

static constexpr common::column_id_t NBR_ID_COLUMN_ID = 0;
Expand Down
35 changes: 17 additions & 18 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ class Column {
// Append column chunk in a new node group.
virtual void append(ColumnChunk* columnChunk, common::node_group_idx_t nodeGroupIdx);

inline common::LogicalType& getDataType() { return dataType; }
inline const common::LogicalType& getDataType() const { return dataType; }
inline uint32_t getNumBytesPerValue() const { return numBytesPerFixedSizedValue; }
inline uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
common::LogicalType& getDataType() { return dataType; }
const common::LogicalType& getDataType() const { return dataType; }
uint32_t getNumBytesPerValue() const { return numBytesPerFixedSizedValue; }
uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
return metadataDA->getNumElements(transaction->getType());
}

Expand Down Expand Up @@ -115,13 +115,13 @@ class Column {
void populateWithDefaultVal(transaction::Transaction* transaction,
InMemDiskArray<ColumnChunkMetadata>* metadataDA, common::ValueVector* defaultValueVector);

inline ColumnChunkMetadata getMetadata(common::node_group_idx_t nodeGroupIdx,
ColumnChunkMetadata getMetadata(common::node_group_idx_t nodeGroupIdx,
transaction::TransactionType transaction) const {
return metadataDA->get(nodeGroupIdx, transaction);
}
inline InMemDiskArray<ColumnChunkMetadata>* getMetadataDA() const { return metadataDA.get(); }
InMemDiskArray<ColumnChunkMetadata>* getMetadataDA() const { return metadataDA.get(); }

inline std::string getName() const { return name; }
std::string getName() const { return name; }

virtual void scan(transaction::Transaction* transaction, const ChunkState& state,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, uint8_t* result);
Expand Down Expand Up @@ -170,12 +170,11 @@ class Column {
void updatePageWithCursor(PageCursor cursor,
const std::function<void(uint8_t*, common::offset_t)>& writeOp);

inline common::offset_t getMaxOffset(const std::vector<common::offset_t>& offsets) {
common::offset_t maxOffset = 0u;
for (auto offset : offsets) {
maxOffset = std::max(maxOffset, offset);
}
return maxOffset;
common::offset_t getMaxOffset(const std::vector<common::offset_t>& offsets) {
return offsets.empty() ? 0 : *std::max_element(offsets.begin(), offsets.end());
}
common::offset_t getMaxOffset(const offset_to_row_idx_t& offsets) {
return offsets.empty() ? 0 : offsets.rbegin()->first;
}

static ChunkCollection getNullChunkCollection(const ChunkCollection& chunkCollection);
Expand Down Expand Up @@ -217,7 +216,7 @@ class Column {
const offset_to_row_idx_t& info);

// check if val is in range [start, end)
static inline bool isInRange(uint64_t val, uint64_t start, uint64_t end) {
static bool isInRange(uint64_t val, uint64_t start, uint64_t end) {
return val >= start && val < end;
}

Expand Down Expand Up @@ -249,28 +248,28 @@ class InternalIDColumn : public Column {
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression);

inline void scan(transaction::Transaction* transaction, ChunkState& state,
void scan(transaction::Transaction* transaction, ChunkState& state,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector) override {
Column::scan(transaction, state, nodeIDVector, resultVector);
populateCommonTableID(resultVector);
}

inline void scan(transaction::Transaction* transaction, ChunkState& state,
void scan(transaction::Transaction* transaction, ChunkState& state,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector) override {
Column::scan(transaction, state, startOffsetInGroup, endOffsetInGroup, resultVector,
offsetInVector);
populateCommonTableID(resultVector);
}

inline void lookup(transaction::Transaction* transaction, ChunkState& state,
void lookup(transaction::Transaction* transaction, ChunkState& state,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector) override {
Column::lookup(transaction, state, nodeIDVector, resultVector);
populateCommonTableID(resultVector);
}

// TODO(Guodong): This function should be removed through rewritting INTERNAL_ID as STRUCT.
inline void setCommonTableID(common::table_id_t tableID) { commonTableID = tableID; }
void setCommonTableID(common::table_id_t tableID) { commonTableID = tableID; }

private:
void populateCommonTableID(common::ValueVector* resultVector) const;
Expand Down
8 changes: 7 additions & 1 deletion src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,16 +759,22 @@ std::unique_ptr<ColumnChunk> Column::getEmptyChunkForCommit(uint64_t capacity) {
return ColumnChunkFactory::createColumnChunk(*dataType.copy(), enableCompression, capacity);
}

// TODO: Pass state in to avoid access metadata.
void Column::commitLocalChunkOutOfPlace(Transaction* transaction, node_group_idx_t nodeGroupIdx,
bool isNewNodeGroup, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) {
auto columnChunk = getEmptyChunkForCommit(common::StorageConstants::NODE_GROUP_SIZE);
std::unique_ptr<ColumnChunk> columnChunk;
if (isNewNodeGroup) {
KU_ASSERT(updateInfo.empty() && deleteInfo.empty());
columnChunk = getEmptyChunkForCommit(getMaxOffset(insertInfo) + 1);
// Apply inserts from the local chunk.
applyLocalChunkToColumnChunk(localInsertChunks, columnChunk.get(), insertInfo);
} else {
auto maxNodeOffset = std::max(getMaxOffset(insertInfo), getMaxOffset(updateInfo));
auto chunkMeta = getMetadata(nodeGroupIdx, transaction->getType());
maxNodeOffset = std::max(maxNodeOffset, chunkMeta.numValues);
columnChunk = getEmptyChunkForCommit(maxNodeOffset + 1);
// First, scan the whole column chunk from persistent storage.
scan(transaction, nodeGroupIdx, columnChunk.get());
// Then, apply updates from the local chunk.
Expand Down

0 comments on commit 82b4a5b

Please sign in to comment.