Skip to content

Commit

Permalink
Merge pull request #1890 from kuzudb/fix-pk-errors
Browse files Browse the repository at this point in the history
Fix for non-thrown duplicated pk error
  • Loading branch information
ray6080 committed Aug 5, 2023
2 parents 3c40a45 + 4b777b5 commit 9af550a
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 32 deletions.
11 changes: 10 additions & 1 deletion src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class CopyNode : public Sink {
storage::ColumnChunk* chunk, common::offset_t startNodeOffset, common::offset_t numNodes);
static void checkNonNullConstraint(
storage::NullColumnChunk* nullChunk, common::offset_t numNodes);
static void appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex,

template<typename T>
static uint64_t appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);

private:
Expand All @@ -106,5 +108,12 @@ class CopyNode : public Sink {
std::unique_ptr<storage::NodeGroup> localNodeGroup;
};

template<>
uint64_t CopyNode::appendToPKIndex<int64_t>(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);
template<>
uint64_t CopyNode::appendToPKIndex<common::ku_string_t>(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);

} // namespace processor
} // namespace kuzu
75 changes: 53 additions & 22 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,50 @@ void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx,
void CopyNode::populatePKIndex(
PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, offset_t numNodes) {
checkNonNullConstraint(chunk->getNullChunk(), numNodes);
std::string errorPKValueStr;
pkIndex->lock();
try {
appendToPKIndex(pkIndex, chunk, startOffset, numNodes);
switch (chunk->getDataType().getPhysicalType()) {
case PhysicalTypeID::INT64: {
auto numAppendedNodes = appendToPKIndex<int64_t>(pkIndex, chunk, startOffset, numNodes);
if (numAppendedNodes < numNodes) {
errorPKValueStr =
std::to_string(chunk->getValue<int64_t>(startOffset + numAppendedNodes));
}
} break;
case PhysicalTypeID::STRING: {
auto numAppendedNodes =
appendToPKIndex<ku_string_t>(pkIndex, chunk, startOffset, numNodes);
if (numAppendedNodes < numNodes) {
errorPKValueStr =
chunk->getValue<ku_string_t>(startOffset + numAppendedNodes).getAsString();
}
} break;
default: {
throw CopyException(StringUtils::string_format(
"Invalid primary key column type {}. Primary key must be "
"either INT64, STRING or SERIAL.",
LogicalTypeUtils::dataTypeToString(chunk->getDataType())));
}
}
} catch (Exception& e) {
pkIndex->unlock();
throw;
}
pkIndex->unlock();
if (!errorPKValueStr.empty()) {
throw CopyException(
StringUtils::string_format("Found duplicated primary key value {}, which violates the "
"uniqueness constraint of the primary key column.",
errorPKValueStr));
}
}

void CopyNode::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNodes) {
for (auto posInChunk = 0u; posInChunk < numNodes; posInChunk++) {
if (nullChunk->isNull(posInChunk)) {
throw CopyException("Primary key cannot be null.");
throw CopyException(
"Found NULL, which violates the non-null constraint of the primary key column.");
}
}
}
Expand Down Expand Up @@ -175,30 +205,31 @@ void CopyNode::finalize(ExecutionContext* context) {
sharedState->fTable.get(), outputMsg, context->memoryManager);
}

void CopyNode::appendToPKIndex(
template<>
uint64_t CopyNode::appendToPKIndex<int64_t>(
PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) {
switch (chunk->getDataType().getLogicalTypeID()) {
case LogicalTypeID::INT64: {
for (auto i = 0u; i < numValues; i++) {
auto offset = i + startOffset;
auto value = chunk->getValue<int64_t>(i);
pkIndex->append(value, offset);
for (auto i = 0u; i < numValues; i++) {
auto offset = i + startOffset;
auto value = chunk->getValue<int64_t>(i);
if (!pkIndex->append(value, offset)) {
return i;
}
} break;
case LogicalTypeID::STRING: {
auto varSizedChunk = (StringColumnChunk*)chunk;
for (auto i = 0u; i < numValues; i++) {
auto offset = i + startOffset;
auto value = varSizedChunk->getValue<std::string>(i);
pkIndex->append(value.c_str(), offset);
}
} break;
default: {
StringUtils::string_format("Invalid primary key column type {}. Primary key must be "
"either INT64, STRING or SERIAL.",
LogicalTypeUtils::dataTypeToString(chunk->getDataType()));
}
return numValues;
}

template<>
uint64_t CopyNode::appendToPKIndex<ku_string_t>(
PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) {
auto stringColumnChunk = (StringColumnChunk*)chunk;
for (auto i = 0u; i < numValues; i++) {
auto offset = i + startOffset;
auto value = stringColumnChunk->getValue<std::string>(i);
if (!pkIndex->append(value.c_str(), offset)) {
return i;
}
}
return numValues;
}

} // namespace processor
Expand Down
6 changes: 4 additions & 2 deletions src/processor/operator/copy/read_file.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
#include "processor/operator/copy/read_file.h"

using namespace kuzu::common;

namespace kuzu {
namespace processor {

bool ReadFile::getNextTuplesInternal(kuzu::processor::ExecutionContext* context) {
bool ReadFile::getNextTuplesInternal(ExecutionContext* context) {
auto morsel = sharedState->getMorsel();
if (morsel == nullptr) {
return false;
}
auto recordBatch = readTuples(std::move(morsel));
for (auto i = 0u; i < dataColumnPoses.size(); i++) {
common::ArrowColumnVector::setArrowColumn(
ArrowColumnVector::setArrowColumn(
resultSet->getValueVector(dataColumnPoses[i]).get(), recordBatch->column((int)i));
}
resultSet->dataChunks[0]->state->setToUnflat();
Expand Down
4 changes: 1 addition & 3 deletions test/test_files/exceptions/copy/duplicated.test
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
-GROUP CopyDuplicateIDTest
-DATASET CSV copy-fault-tests/duplicate-ids
-SKIP
# FIXME: This test is not working due to inconsistent error message.

--

-CASE DuplicateIDsError
-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/duplicate-ids/vPerson.csv"
---- error
Copy exception: Duplicated primary key value 10 found around L4 in file ${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/duplicate-ids/vPerson.csv violates the uniqueness constraint of the primary key column.
Copy exception: Found duplicated primary key value 10, which violates the uniqueness constraint of the primary key column.
6 changes: 2 additions & 4 deletions test/test_files/exceptions/copy/null_pk.test
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
-GROUP CopyNullPKTest
-DATASET CSV copy-fault-tests/null-pk
-SKIP
# FIXME: This test is not working because the error message is not consistent

--

-CASE NullPrimaryKeyInNodeFile
-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson.csv"
---- error
Copy exception: NULL found around L2 in file ${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson.csv violates the non-null constraint of the primary key column.
Copy exception: Found NULL, which violates the non-null constraint of the primary key column.

-CASE NullPrimaryKeyInMultiNodeFiles
-STATEMENT COPY person FROM ["${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson.csv", "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson2.csv"]
---- error
Copy exception: NULL found around L2 in file ${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson.csv violates the non-null constraint of the primary key column.
Copy exception: Found NULL, which violates the non-null constraint of the primary key column.

-CASE NullPrimaryKeyInRelFile
-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson-valid.csv"
Expand Down

0 comments on commit 9af550a

Please sign in to comment.