Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix asan issue during multi copy #3431

Merged
merged 2 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107,802 changes: 1 addition & 107,801 deletions dataset/ldbc-sf01/Comment_hasCreator_Person.csv

Large diffs are not rendered by default.

107,800 changes: 107,800 additions & 0 deletions dataset/ldbc-sf01/Comment_hasCreator_Person_1.csv

Large diffs are not rendered by default.

171,711 changes: 0 additions & 171,711 deletions dataset/ldbc-sf01/Comment_hasTag_Tag.csv

Large diffs are not rendered by default.

171,711 changes: 171,711 additions & 0 deletions dataset/ldbc-sf01/Comment_hasTag_Tag_1.csv

Large diffs are not rendered by default.

84,636 changes: 0 additions & 84,636 deletions dataset/ldbc-sf01/Comment_isLocatedIn_Place.csv

Large diffs are not rendered by default.

84,636 changes: 84,636 additions & 0 deletions dataset/ldbc-sf01/Comment_isLocatedIn_Place_1.csv

Large diffs are not rendered by default.

27,385 changes: 0 additions & 27,385 deletions dataset/ldbc-sf01/Person_likes_Post.csv

Large diffs are not rendered by default.

27,385 changes: 27,385 additions & 0 deletions dataset/ldbc-sf01/Person_likes_Post_1.csv

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions dataset/ldbc-sf01/copy.cypher
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ COPY Post FROM "dataset/ldbc-sf01/Post.csv" (HEADER=true, DELIM='|');
COPY Tag FROM "dataset/ldbc-sf01/Tag.csv" (HEADER=true, DELIM='|');
COPY TagClass FROM "dataset/ldbc-sf01/TagClass.csv" (HEADER=true, DELIM='|');
COPY Comment_hasCreator FROM "dataset/ldbc-sf01/Comment_hasCreator_Person.csv" (HEADER=true, DELIM='|');
COPY Comment_hasCreator FROM "dataset/ldbc-sf01/Comment_hasCreator_Person_1.csv" (HEADER=false, DELIM='|');
COPY Comment_hasTag FROM "dataset/ldbc-sf01/Comment_hasTag_Tag.csv" (HEADER=true, DELIM='|');
COPY Comment_hasTag FROM "dataset/ldbc-sf01/Comment_hasTag_Tag_1.csv" (HEADER=false, DELIM='|');
COPY Comment_isLocatedIn FROM "dataset/ldbc-sf01/Comment_isLocatedIn_Place.csv" (HEADER=true, DELIM='|');
COPY Comment_isLocatedIn FROM "dataset/ldbc-sf01/Comment_isLocatedIn_Place_1.csv" (HEADER=false, DELIM='|');
COPY replyOf_Comment FROM "dataset/ldbc-sf01/Comment_replyOf_Comment.csv" (HEADER=true, DELIM='|');
COPY replyOf_Post FROM "dataset/ldbc-sf01/Comment_replyOf_Post.csv" (HEADER=true, DELIM='|');
COPY containerOf FROM "dataset/ldbc-sf01/Forum_containerOf_Post.csv" (HEADER=true, DELIM='|');
Expand All @@ -23,6 +26,7 @@ COPY knows FROM "dataset/ldbc-sf01/Person_knows_Person_1.csv" (HEADER=true, DELI
COPY likes_Comment FROM "dataset/ldbc-sf01/Person_likes_Comment.csv" (HEADER=true, DELIM='|');
COPY likes_Comment FROM "dataset/ldbc-sf01/Person_likes_Comment_1.csv" (HEADER=true, DELIM='|');
COPY likes_Post FROM "dataset/ldbc-sf01/Person_likes_Post.csv" (HEADER=true, DELIM='|');
COPY likes_Post FROM "dataset/ldbc-sf01/Person_likes_Post_1.csv" (HEADER=false, DELIM='|');
COPY studyAt FROM "dataset/ldbc-sf01/Person_studyAt_Organisation.csv" (HEADER=true, DELIM='|');
COPY workAt FROM "dataset/ldbc-sf01/Person_workAt_Organisation.csv" (HEADER=true, DELIM='|');
COPY isPartOf FROM "dataset/ldbc-sf01/Place_isPartOf_Place.csv" (HEADER=true, DELIM='|');
Expand Down
7 changes: 3 additions & 4 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <unordered_map>
#include <map>

#include "common/types/internal_id_t.h"
#include "common/vector/value_vector.h"
Expand All @@ -10,9 +10,8 @@
namespace kuzu {
namespace storage {

using offset_to_row_idx_t = std::unordered_map<common::offset_t, common::row_idx_t>;
using offset_to_row_idx_vec_t =
std::unordered_map<common::offset_t, std::vector<common::row_idx_t>>;
using offset_to_row_idx_t = std::map<common::offset_t, common::row_idx_t>;
using offset_to_row_idx_vec_t = std::map<common::offset_t, std::vector<common::row_idx_t>>;
using offset_set_t = std::unordered_set<common::offset_t>;

static constexpr common::column_id_t NBR_ID_COLUMN_ID = 0;
Expand Down
35 changes: 17 additions & 18 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ class Column {
// Append column chunk in a new node group.
virtual void append(ColumnChunk* columnChunk, common::node_group_idx_t nodeGroupIdx);

inline common::LogicalType& getDataType() { return dataType; }
inline const common::LogicalType& getDataType() const { return dataType; }
inline uint32_t getNumBytesPerValue() const { return numBytesPerFixedSizedValue; }
inline uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
common::LogicalType& getDataType() { return dataType; }
const common::LogicalType& getDataType() const { return dataType; }
uint32_t getNumBytesPerValue() const { return numBytesPerFixedSizedValue; }
uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
return metadataDA->getNumElements(transaction->getType());
}

Expand Down Expand Up @@ -115,13 +115,13 @@ class Column {
void populateWithDefaultVal(transaction::Transaction* transaction,
InMemDiskArray<ColumnChunkMetadata>* metadataDA, common::ValueVector* defaultValueVector);

inline ColumnChunkMetadata getMetadata(common::node_group_idx_t nodeGroupIdx,
ColumnChunkMetadata getMetadata(common::node_group_idx_t nodeGroupIdx,
transaction::TransactionType transaction) const {
return metadataDA->get(nodeGroupIdx, transaction);
}
inline InMemDiskArray<ColumnChunkMetadata>* getMetadataDA() const { return metadataDA.get(); }
InMemDiskArray<ColumnChunkMetadata>* getMetadataDA() const { return metadataDA.get(); }

inline std::string getName() const { return name; }
std::string getName() const { return name; }

virtual void scan(transaction::Transaction* transaction, const ChunkState& state,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, uint8_t* result);
Expand Down Expand Up @@ -170,12 +170,11 @@ class Column {
void updatePageWithCursor(PageCursor cursor,
const std::function<void(uint8_t*, common::offset_t)>& writeOp);

inline common::offset_t getMaxOffset(const std::vector<common::offset_t>& offsets) {
common::offset_t maxOffset = 0u;
for (auto offset : offsets) {
maxOffset = std::max(maxOffset, offset);
}
return maxOffset;
common::offset_t getMaxOffset(const std::vector<common::offset_t>& offsets) {
return offsets.empty() ? 0 : *std::max_element(offsets.begin(), offsets.end());
}
common::offset_t getMaxOffset(const offset_to_row_idx_t& offsets) {
return offsets.empty() ? 0 : offsets.rbegin()->first;
}

static ChunkCollection getNullChunkCollection(const ChunkCollection& chunkCollection);
Expand Down Expand Up @@ -217,7 +216,7 @@ class Column {
const offset_to_row_idx_t& info);

// check if val is in range [start, end)
static inline bool isInRange(uint64_t val, uint64_t start, uint64_t end) {
static bool isInRange(uint64_t val, uint64_t start, uint64_t end) {
return val >= start && val < end;
}

Expand Down Expand Up @@ -249,28 +248,28 @@ class InternalIDColumn : public Column {
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression);

inline void scan(transaction::Transaction* transaction, ChunkState& state,
void scan(transaction::Transaction* transaction, ChunkState& state,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector) override {
Column::scan(transaction, state, nodeIDVector, resultVector);
populateCommonTableID(resultVector);
}

inline void scan(transaction::Transaction* transaction, ChunkState& state,
void scan(transaction::Transaction* transaction, ChunkState& state,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector) override {
Column::scan(transaction, state, startOffsetInGroup, endOffsetInGroup, resultVector,
offsetInVector);
populateCommonTableID(resultVector);
}

inline void lookup(transaction::Transaction* transaction, ChunkState& state,
void lookup(transaction::Transaction* transaction, ChunkState& state,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector) override {
Column::lookup(transaction, state, nodeIDVector, resultVector);
populateCommonTableID(resultVector);
}

// TODO(Guodong): This function should be removed through rewritting INTERNAL_ID as STRUCT.
inline void setCommonTableID(common::table_id_t tableID) { commonTableID = tableID; }
void setCommonTableID(common::table_id_t tableID) { commonTableID = tableID; }

private:
void populateCommonTableID(common::ValueVector* resultVector) const;
Expand Down
8 changes: 7 additions & 1 deletion src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,16 +759,22 @@ std::unique_ptr<ColumnChunk> Column::getEmptyChunkForCommit(uint64_t capacity) {
return ColumnChunkFactory::createColumnChunk(*dataType.copy(), enableCompression, capacity);
}

// TODO: Pass state in to avoid access metadata.
void Column::commitLocalChunkOutOfPlace(Transaction* transaction, node_group_idx_t nodeGroupIdx,
bool isNewNodeGroup, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) {
auto columnChunk = getEmptyChunkForCommit(common::StorageConstants::NODE_GROUP_SIZE);
std::unique_ptr<ColumnChunk> columnChunk;
if (isNewNodeGroup) {
KU_ASSERT(updateInfo.empty() && deleteInfo.empty());
columnChunk = getEmptyChunkForCommit(getMaxOffset(insertInfo) + 1);
// Apply inserts from the local chunk.
applyLocalChunkToColumnChunk(localInsertChunks, columnChunk.get(), insertInfo);
} else {
auto maxNodeOffset = std::max(getMaxOffset(insertInfo), getMaxOffset(updateInfo));
auto chunkMeta = getMetadata(nodeGroupIdx, transaction->getType());
maxNodeOffset = std::max(maxNodeOffset, chunkMeta.numValues);
columnChunk = getEmptyChunkForCommit(maxNodeOffset + 1);
// First, scan the whole column chunk from persistent storage.
scan(transaction, nodeGroupIdx, columnChunk.get());
// Then, apply updates from the local chunk.
Expand Down
Loading