Skip to content

Commit

Permalink
fix overflow cursor concurrency bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jul 15, 2023
1 parent deb1734 commit e37f41f
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 161 deletions.
70 changes: 37 additions & 33 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/store/node_table.h"

using namespace kuzu::storage;

namespace kuzu {
namespace processor {

Expand Down Expand Up @@ -33,62 +35,53 @@ class CopyNodeSharedState {
bool hasLoggedWAL;
};

struct CopyNodeLocalState {
CopyNodeLocalState(const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
const DataPos& offsetVectorPos, std::vector<DataPos> arrowColumnPoses)
: copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
offsetVectorPos{offsetVectorPos}, offsetVector{nullptr}, arrowColumnPoses{std::move(
arrowColumnPoses)} {}

std::pair<common::offset_t, common::offset_t> getStartAndEndOffset(
common::vector_idx_t columnIdx);

common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
struct CopyNodeDataInfo {
DataPos offsetVectorPos;
common::ValueVector* offsetVector;
std::vector<DataPos> arrowColumnPoses;
std::vector<common::ValueVector*> arrowColumnVectors;
};

class CopyNode : public Sink {
public:
CopyNode(std::unique_ptr<CopyNodeLocalState> localState,
std::shared_ptr<CopyNodeSharedState> sharedState,
CopyNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeDataInfo copyNodeDataInfo,
common::CopyDescription copyDesc, storage::NodeTable* table, storage::RelsStore* relsStore,
catalog::Catalog* catalog, storage::WAL* wal,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child),
id, paramsString},
localState{std::move(localState)}, sharedState{std::move(sharedState)} {}
sharedState{std::move(sharedState)}, copyNodeDataInfo{std::move(copyNodeDataInfo)},
copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal} {
auto tableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID());
copyStates.resize(tableSchema->getNumProperties());
for (auto i = 0u; i < tableSchema->getNumProperties(); i++) {
auto& property = tableSchema->properties[i];
copyStates[i] = std::make_unique<PropertyCopyState>(property.dataType);
}
}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
localState->offsetVector = resultSet->getValueVector(localState->offsetVectorPos).get();
for (auto& arrowColumnPos : localState->arrowColumnPoses) {
localState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
offsetVector = resultSet->getValueVector(copyNodeDataInfo.offsetVectorPos).get();
for (auto& arrowColumnPos : copyNodeDataInfo.arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}

inline void initGlobalStateInternal(ExecutionContext* context) override {
if (!isCopyAllowed()) {
throw common::CopyException("COPY commands can only be executed once on a table.");
}
auto nodeTableSchema = localState->catalog->getReadOnlyVersion()->getNodeTableSchema(
localState->table->getTableID());
sharedState->initialize(nodeTableSchema, localState->wal->getDirectory());
auto nodeTableSchema =
catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID());
sharedState->initialize(nodeTableSchema, wal->getDirectory());
}

void executeInternal(ExecutionContext* context) override;

void finalize(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNode>(std::make_unique<CopyNodeLocalState>(*localState),
sharedState, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
return std::make_unique<CopyNode>(sharedState, copyNodeDataInfo, copyDesc, table, relsStore,
catalog, wal, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}

protected:
Expand All @@ -97,10 +90,13 @@ class CopyNode : public Sink {

void logCopyWALRecord();

std::pair<common::offset_t, common::offset_t> getStartAndEndOffset(
common::vector_idx_t columnIdx);

private:
inline bool isCopyAllowed() {
auto nodesStatistics = localState->table->getNodeStatisticsAndDeletedIDs();
return nodesStatistics->getNodeStatisticsAndDeletedIDs(localState->table->getTableID())
auto nodesStatistics = table->getNodeStatisticsAndDeletedIDs();
return nodesStatistics->getNodeStatisticsAndDeletedIDs(table->getTableID())
->getNumTuples() == 0;
}

Expand All @@ -115,8 +111,16 @@ class CopyNode : public Sink {
}

protected:
std::unique_ptr<CopyNodeLocalState> localState;
std::shared_ptr<CopyNodeSharedState> sharedState;
CopyNodeDataInfo copyNodeDataInfo;
common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
common::ValueVector* offsetVector;
std::vector<common::ValueVector*> arrowColumnVectors;
std::vector<std::unique_ptr<storage::PropertyCopyState>> copyStates;
};

} // namespace processor
Expand Down
46 changes: 17 additions & 29 deletions src/include/processor/operator/copy/copy_npy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,24 @@
namespace kuzu {
namespace processor {

struct CopyNPYNodeLocalState : public CopyNodeLocalState {
public:
CopyNPYNodeLocalState(const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
const DataPos& offsetVectorPos, const DataPos& columnIdxPos,
std::vector<DataPos> arrowColumnPoses)
: CopyNodeLocalState{copyDesc, table, relsStore, catalog, wal, offsetVectorPos,
std::move(arrowColumnPoses)},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}

DataPos columnIdxPos;
common::ValueVector* columnIdxVector;
};

class CopyNPYNode : public CopyNode {
public:
CopyNPYNode(std::unique_ptr<CopyNPYNodeLocalState> localState,
std::shared_ptr<CopyNodeSharedState> sharedState,
CopyNPYNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeDataInfo copyNodeDataInfo,
DataPos columnIdxPos, const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: CopyNode{std::move(localState), std::move(sharedState), std::move(resultSetDescriptor),
std::move(child), id, paramsString} {}
: CopyNode{std::move(sharedState), std::move(copyNodeDataInfo), copyDesc, table, relsStore,
catalog, wal, std::move(resultSetDescriptor), std::move(child), id, paramsString},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}

void executeInternal(ExecutionContext* context) final;

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final {
auto npyLocalState = (CopyNPYNodeLocalState*)localState.get();
npyLocalState->offsetVector =
resultSet->getValueVector(npyLocalState->offsetVectorPos).get();
npyLocalState->columnIdxVector =
resultSet->getValueVector(npyLocalState->columnIdxPos).get();
for (auto& arrowColumnPos : npyLocalState->arrowColumnPoses) {
npyLocalState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
offsetVector = resultSet->getValueVector(copyNodeDataInfo.offsetVectorPos).get();
columnIdxVector = resultSet->getValueVector(columnIdxPos).get();
for (auto& arrowColumnPos : copyNodeDataInfo.arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}

Expand All @@ -48,10 +32,14 @@ class CopyNPYNode : public CopyNode {
common::vector_idx_t columnToCopy);

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<CopyNPYNode>(
std::make_unique<CopyNPYNodeLocalState>((CopyNPYNodeLocalState&)*localState),
sharedState, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
return std::make_unique<CopyNPYNode>(sharedState, copyNodeDataInfo, columnIdxPos, copyDesc,
table, relsStore, catalog, wal, resultSetDescriptor->copy(), children[0]->clone(), id,
paramsString);
}

private:
DataPos columnIdxPos;
common::ValueVector* columnIdxVector;
};

} // namespace processor
Expand Down
14 changes: 13 additions & 1 deletion src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,17 @@ class RelCopier {
catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData,
DirectedInMemRelData* bwdRelData, std::vector<PrimaryKeyIndex*> pkIndexes)
: sharedState{std::move(sharedState)}, copyDesc{copyDesc}, schema{schema},
fwdRelData{fwdRelData}, bwdRelData{bwdRelData}, pkIndexes{std::move(pkIndexes)} {}
fwdRelData{fwdRelData}, bwdRelData{bwdRelData}, pkIndexes{std::move(pkIndexes)} {
fwdCopyStates.resize(schema->getNumProperties());
for (auto i = 0u; i < schema->getNumProperties(); i++) {
fwdCopyStates[i] = std::make_unique<PropertyCopyState>(schema->properties[i].dataType);
}
bwdCopyStates.resize(schema->getNumProperties());
for (auto i = 0u; i < schema->getNumProperties(); i++) {
bwdCopyStates[i] = std::make_unique<PropertyCopyState>(schema->properties[i].dataType);
}
}

virtual ~RelCopier() = default;

void execute(processor::ExecutionContext* executionContext);
Expand Down Expand Up @@ -60,6 +70,8 @@ class RelCopier {
DirectedInMemRelData* fwdRelData;
DirectedInMemRelData* bwdRelData;
std::vector<PrimaryKeyIndex*> pkIndexes;
std::vector<std::unique_ptr<PropertyCopyState>> fwdCopyStates;
std::vector<std::unique_ptr<PropertyCopyState>> bwdCopyStates;
};

class RelListsCounterAndColumnCopier : public RelCopier {
Expand Down
33 changes: 22 additions & 11 deletions src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
namespace kuzu {
namespace storage {

struct PropertyCopyState {
PropertyCopyState(common::LogicalType& dataType);

PageByteCursor overflowCursor;
std::vector<std::unique_ptr<PropertyCopyState>> childStates;
};

struct StructFieldIdxAndValue {
StructFieldIdxAndValue(common::struct_field_idx_t fieldIdx, std::string fieldValue)
: fieldIdx{fieldIdx}, fieldValue{std::move(fieldValue)} {}
Expand Down Expand Up @@ -46,7 +53,8 @@ class InMemColumnChunk {
inline uint64_t getNumBytes() const { return numBytes; }
inline InMemColumnChunk* getNullChunk() { return nullChunk.get(); }
void copyArrowBatch(std::shared_ptr<arrow::RecordBatch> batch);
virtual void copyArrowArray(arrow::Array& arrowArray, arrow::Array* nodeOffsets = nullptr);
virtual void copyArrowArray(arrow::Array& arrowArray, PropertyCopyState* copyState,
arrow::Array* nodeOffsets = nullptr);
virtual void flush(common::FileInfo* walFileInfo);

template<typename T>
Expand Down Expand Up @@ -92,23 +100,24 @@ class InMemColumnChunkWithOverflow : public InMemColumnChunk {
inMemOverflowFile{inMemOverflowFile}, blobBuffer{std::make_unique<uint8_t[]>(
common::BufferPoolConstants::PAGE_4KB_SIZE)} {}

void copyArrowArray(arrow::Array& array, arrow::Array* nodeOffsets = nullptr) final;
void copyArrowArray(arrow::Array& array, PropertyCopyState* copyState,
arrow::Array* nodeOffsets = nullptr) final;

template<typename T>
void templateCopyValuesAsStringToPageWithOverflow(
arrow::Array& array, arrow::Array* nodeOffsets);
arrow::Array& array, PropertyCopyState* copyState, arrow::Array* nodeOffsets);

void copyValuesToPageWithOverflow(arrow::Array& array, arrow::Array* nodeOffsets);
void copyValuesToPageWithOverflow(
arrow::Array& array, PropertyCopyState* copyState, arrow::Array* nodeOffsets);

template<typename T>
void setValWithOverflow(const char* value, uint64_t length, uint64_t pos) {
void setValWithOverflow(
PageByteCursor& overflowCursor, const char* value, uint64_t length, uint64_t pos) {
assert(false);
}

private:
storage::InMemOverflowFile* inMemOverflowFile;
// TODO(Ziyi/Guodong): Fix this for rel columns.
PageByteCursor overflowCursor;
std::unique_ptr<uint8_t[]> blobBuffer;
};

Expand All @@ -125,13 +134,15 @@ class InMemStructColumnChunk : public InMemColumnChunk {
fieldChunks.push_back(std::move(fieldChunk));
}

void copyArrowArray(arrow::Array& array, arrow::Array* nodeOffsets = nullptr) final;
void copyArrowArray(arrow::Array& array, PropertyCopyState* copyState = nullptr,
arrow::Array* nodeOffsets = nullptr) final;

private:
void setStructFields(const char* value, uint64_t length, uint64_t pos);
void setStructFields(
PropertyCopyState* copyState, const char* value, uint64_t length, uint64_t pos);

void setValueToStructField(common::offset_t pos, const std::string& structFieldValue,
common::struct_field_idx_t structFiledIdx);
void setValueToStructField(PropertyCopyState* copyState, common::offset_t pos,
const std::string& structFieldValue, common::struct_field_idx_t structFiledIdx);

std::vector<StructFieldIdxAndValue> parseStructFieldNameAndValues(
common::LogicalType& type, const std::string& structString);
Expand Down
25 changes: 13 additions & 12 deletions src/include/storage/in_mem_storage_structure/in_mem_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ typedef std::vector<std::atomic<uint64_t>> atomic_uint64_vec_t;

class InMemLists;
class AdjLists;
struct PropertyCopyState;

using fill_in_mem_lists_function_t =
std::function<void(InMemLists* inMemLists, uint8_t* defaultVal, PageByteCursor& pageByteCursor,
Expand Down Expand Up @@ -57,8 +58,8 @@ class InMemLists {
inline common::LogicalType getDataType() { return dataType; }
inline uint64_t getNumElementsInAPage() const { return numElementsInAPage; }

virtual void copyArrowArray(
arrow::Array* boundNodeOffsets, arrow::Array* posInRelLists, arrow::Array* array);
virtual void copyArrowArray(arrow::Array* boundNodeOffsets, arrow::Array* posInRelLists,
arrow::Array* array, PropertyCopyState* copyState);
template<typename T>
void templateCopyArrayToRelLists(
arrow::Array* boundNodeOffsets, arrow::Array* posInRelList, arrow::Array* array);
Expand Down Expand Up @@ -124,14 +125,14 @@ class InMemListsWithOverflow : public InMemLists {
std::shared_ptr<ListHeadersBuilder> listHeadersBuilder,
const common::CopyDescription* copyDescription);

void copyArrowArray(
arrow::Array* boundNodeOffsets, arrow::Array* posInRelLists, arrow::Array* array) final;
void copyArrowArray(arrow::Array* boundNodeOffsets, arrow::Array* posInRelLists,
arrow::Array* array, PropertyCopyState* copyState) final;
template<typename T>
void templateCopyArrayAsStringToRelListsWithOverflow(
arrow::Array* boundNodeOffsets, arrow::Array* posInRelList, arrow::Array* array);
void templateCopyArrayAsStringToRelListsWithOverflow(arrow::Array* boundNodeOffsets,
arrow::Array* posInRelList, arrow::Array* array, PageByteCursor& overflowCursor);
template<typename T>
void setValueFromStringWithOverflow(
common::offset_t nodeOffset, uint64_t pos, const char* val, uint64_t length) {
void setValueFromStringWithOverflow(PageByteCursor& overflowCursor, common::offset_t nodeOffset,
uint64_t pos, const char* val, uint64_t length) {
assert(false);
}

Expand All @@ -140,8 +141,6 @@ class InMemListsWithOverflow : public InMemLists {

protected:
std::unique_ptr<InMemOverflowFile> overflowInMemFile;
// TODO(Guodong/Ziyi): Fix for concurrent writes.
PageByteCursor overflowCursor;
std::unique_ptr<uint8_t[]> blobBuffer;
};

Expand Down Expand Up @@ -216,10 +215,12 @@ void InMemLists::setValueFromString<common::timestamp_t>(

template<>
void InMemListsWithOverflow::setValueFromStringWithOverflow<common::ku_string_t>(
common::offset_t nodeOffset, uint64_t pos, const char* val, uint64_t length);
PageByteCursor& overflowCursor, common::offset_t nodeOffset, uint64_t pos, const char* val,
uint64_t length);
template<>
void InMemListsWithOverflow::setValueFromStringWithOverflow<common::ku_list_t>(
common::offset_t nodeOffset, uint64_t pos, const char* val, uint64_t length);
PageByteCursor& overflowCursor, common::offset_t nodeOffset, uint64_t pos, const char* val,
uint64_t length);

} // namespace storage
} // namespace kuzu
15 changes: 7 additions & 8 deletions src/processor/mapper/map_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,19 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyNodeToPhysical(Logic
auto ftSharedState = std::make_shared<FTableSharedState>(
copyNodeSharedState->table, common::DEFAULT_VECTOR_CAPACITY);
std::unique_ptr<CopyNode> copyNode;
CopyNodeDataInfo copyNodeDataInfo{offsetVectorPos, arrowColumnPoses};
if (copy->getCopyDescription().fileType == common::CopyDescription::FileType::NPY) {
auto localState = std::make_unique<CopyNPYNodeLocalState>(copy->getCopyDescription(),
copyNode = std::make_unique<CopyNPYNode>(copyNodeSharedState, copyNodeDataInfo,
columnIdxPos, copy->getCopyDescription(),
storageManager.getNodesStore().getNodeTable(copy->getTableID()),
&storageManager.getRelsStore(), catalog, storageManager.getWAL(), offsetVectorPos,
columnIdxPos, arrowColumnPoses);
copyNode = std::make_unique<CopyNPYNode>(std::move(localState), copyNodeSharedState,
&storageManager.getRelsStore(), catalog, storageManager.getWAL(),
std::make_unique<ResultSetDescriptor>(copy->getSchema()), std::move(readFile),
getOperatorID(), copy->getExpressionsForPrinting());
} else {
auto localState = std::make_unique<CopyNodeLocalState>(copy->getCopyDescription(),
copyNode = std::make_unique<CopyNode>(copyNodeSharedState, copyNodeDataInfo,
copy->getCopyDescription(),
storageManager.getNodesStore().getNodeTable(copy->getTableID()),
&storageManager.getRelsStore(), catalog, storageManager.getWAL(), offsetVectorPos,
arrowColumnPoses);
copyNode = std::make_unique<CopyNode>(std::move(localState), copyNodeSharedState,
&storageManager.getRelsStore(), catalog, storageManager.getWAL(),
std::make_unique<ResultSetDescriptor>(copy->getSchema()), std::move(readFile),
getOperatorID(), copy->getExpressionsForPrinting());
}
Expand Down
Loading

0 comments on commit e37f41f

Please sign in to comment.