Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for non-thrown duplicated pk error #1890

Merged
merged 1 commit into from
Aug 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class CopyNode : public Sink {
storage::ColumnChunk* chunk, common::offset_t startNodeOffset, common::offset_t numNodes);
static void checkNonNullConstraint(
storage::NullColumnChunk* nullChunk, common::offset_t numNodes);
static void appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex,

template<typename T>
static uint64_t appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);

private:
Expand All @@ -106,5 +108,12 @@ class CopyNode : public Sink {
std::unique_ptr<storage::NodeGroup> localNodeGroup;
};

template<>
uint64_t CopyNode::appendToPKIndex<int64_t>(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);
template<>
uint64_t CopyNode::appendToPKIndex<common::ku_string_t>(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);

} // namespace processor
} // namespace kuzu
75 changes: 53 additions & 22 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,50 @@
void CopyNode::populatePKIndex(
PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, offset_t numNodes) {
checkNonNullConstraint(chunk->getNullChunk(), numNodes);
std::string errorPKValueStr;
pkIndex->lock();
try {
appendToPKIndex(pkIndex, chunk, startOffset, numNodes);
switch (chunk->getDataType().getPhysicalType()) {
case PhysicalTypeID::INT64: {
auto numAppendedNodes = appendToPKIndex<int64_t>(pkIndex, chunk, startOffset, numNodes);
if (numAppendedNodes < numNodes) {
errorPKValueStr =
std::to_string(chunk->getValue<int64_t>(startOffset + numAppendedNodes));
}
} break;
case PhysicalTypeID::STRING: {
auto numAppendedNodes =
appendToPKIndex<ku_string_t>(pkIndex, chunk, startOffset, numNodes);
if (numAppendedNodes < numNodes) {
errorPKValueStr =
chunk->getValue<ku_string_t>(startOffset + numAppendedNodes).getAsString();

Check warning on line 150 in src/processor/operator/copy/copy_node.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/copy/copy_node.cpp#L150

Added line #L150 was not covered by tests
}
} break;
default: {
throw CopyException(StringUtils::string_format(

Check warning on line 154 in src/processor/operator/copy/copy_node.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/copy/copy_node.cpp#L153-L154

Added lines #L153 - L154 were not covered by tests
"Invalid primary key column type {}. Primary key must be "
"either INT64, STRING or SERIAL.",
LogicalTypeUtils::dataTypeToString(chunk->getDataType())));

Check warning on line 157 in src/processor/operator/copy/copy_node.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/copy/copy_node.cpp#L157

Added line #L157 was not covered by tests
}
}
} catch (Exception& e) {
pkIndex->unlock();
throw;
}
pkIndex->unlock();
if (!errorPKValueStr.empty()) {
throw CopyException(
StringUtils::string_format("Found duplicated primary key value {}, which violates the "
"uniqueness constraint of the primary key column.",
errorPKValueStr));
}
}

void CopyNode::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNodes) {
for (auto posInChunk = 0u; posInChunk < numNodes; posInChunk++) {
if (nullChunk->isNull(posInChunk)) {
throw CopyException("Primary key cannot be null.");
throw CopyException(
"Found NULL, which violates the non-null constraint of the primary key column.");
}
}
}
Expand Down Expand Up @@ -175,30 +205,31 @@
sharedState->fTable.get(), outputMsg, context->memoryManager);
}

void CopyNode::appendToPKIndex(
template<>
uint64_t CopyNode::appendToPKIndex<int64_t>(
PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) {
switch (chunk->getDataType().getLogicalTypeID()) {
case LogicalTypeID::INT64: {
for (auto i = 0u; i < numValues; i++) {
auto offset = i + startOffset;
auto value = chunk->getValue<int64_t>(i);
pkIndex->append(value, offset);
for (auto i = 0u; i < numValues; i++) {
auto offset = i + startOffset;
auto value = chunk->getValue<int64_t>(i);
if (!pkIndex->append(value, offset)) {
return i;
}
} break;
case LogicalTypeID::STRING: {
auto varSizedChunk = (StringColumnChunk*)chunk;
for (auto i = 0u; i < numValues; i++) {
auto offset = i + startOffset;
auto value = varSizedChunk->getValue<std::string>(i);
pkIndex->append(value.c_str(), offset);
}
} break;
default: {
StringUtils::string_format("Invalid primary key column type {}. Primary key must be "
"either INT64, STRING or SERIAL.",
LogicalTypeUtils::dataTypeToString(chunk->getDataType()));
}
return numValues;
}

template<>
uint64_t CopyNode::appendToPKIndex<ku_string_t>(
PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) {
auto stringColumnChunk = (StringColumnChunk*)chunk;
for (auto i = 0u; i < numValues; i++) {
auto offset = i + startOffset;
auto value = stringColumnChunk->getValue<std::string>(i);
if (!pkIndex->append(value.c_str(), offset)) {
return i;
}
}
return numValues;
}

} // namespace processor
Expand Down
6 changes: 4 additions & 2 deletions src/processor/operator/copy/read_file.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
#include "processor/operator/copy/read_file.h"

using namespace kuzu::common;

namespace kuzu {
namespace processor {

bool ReadFile::getNextTuplesInternal(kuzu::processor::ExecutionContext* context) {
bool ReadFile::getNextTuplesInternal(ExecutionContext* context) {
auto morsel = sharedState->getMorsel();
if (morsel == nullptr) {
return false;
}
auto recordBatch = readTuples(std::move(morsel));
for (auto i = 0u; i < dataColumnPoses.size(); i++) {
common::ArrowColumnVector::setArrowColumn(
ArrowColumnVector::setArrowColumn(
resultSet->getValueVector(dataColumnPoses[i]).get(), recordBatch->column((int)i));
}
resultSet->dataChunks[0]->state->setToUnflat();
Expand Down
4 changes: 1 addition & 3 deletions test/test_files/exceptions/copy/duplicated.test
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
-GROUP CopyDuplicateIDTest
-DATASET CSV copy-fault-tests/duplicate-ids
-SKIP
# FIXME: This test is not working due to inconsistent error message.

--

-CASE DuplicateIDsError
-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/duplicate-ids/vPerson.csv"
---- error
Copy exception: Duplicated primary key value 10 found around L4 in file ${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/duplicate-ids/vPerson.csv violates the uniqueness constraint of the primary key column.
Copy exception: Found duplicated primary key value 10, which violates the uniqueness constraint of the primary key column.
6 changes: 2 additions & 4 deletions test/test_files/exceptions/copy/null_pk.test
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
-GROUP CopyNullPKTest
-DATASET CSV copy-fault-tests/null-pk
-SKIP
# FIXME: This test is not working because the error message is not consistent

--

-CASE NullPrimaryKeyInNodeFile
-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson.csv"
---- error
Copy exception: NULL found around L2 in file ${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson.csv violates the non-null constraint of the primary key column.
Copy exception: Found NULL, which violates the non-null constraint of the primary key column.

-CASE NullPrimaryKeyInMultiNodeFiles
-STATEMENT COPY person FROM ["${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson.csv", "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson2.csv"]
---- error
Copy exception: NULL found around L2 in file ${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson.csv violates the non-null constraint of the primary key column.
Copy exception: Found NULL, which violates the non-null constraint of the primary key column.

-CASE NullPrimaryKeyInRelFile
-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/vPerson-valid.csv"
Expand Down
Loading