Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add copy-to-csv header option #2436

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/processor/operator/persistent/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CopyToLocalState {

virtual void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) = 0;

virtual void sink(CopyToSharedState* sharedState) = 0;
virtual void sink(CopyToSharedState* sharedState, CopyToInfo* info) = 0;

virtual void finalize(CopyToSharedState* sharedState) = 0;
};
Expand Down
32 changes: 18 additions & 14 deletions src/include/processor/operator/persistent/copy_to_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace kuzu {
namespace processor {

struct CopyToCSVInfo : public CopyToInfo {
struct CopyToCSVInfo final : public CopyToInfo {

Check warning on line 11 in src/include/processor/operator/persistent/copy_to_csv.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/copy_to_csv.h#L11

Added line #L11 was not covered by tests
std::vector<bool> isFlat;
std::unique_ptr<common::CSVOption> copyToOption;

Expand All @@ -26,22 +26,24 @@
}
};

class CopyToCSVLocalState : public CopyToLocalState {
class CopyToCSVLocalState final : public CopyToLocalState {

Check warning on line 29 in src/include/processor/operator/persistent/copy_to_csv.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/copy_to_csv.h#L29

Added line #L29 was not covered by tests
public:
void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) final;
void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) override;
acquamarin marked this conversation as resolved.
Show resolved Hide resolved

void sink(CopyToSharedState* sharedState) final;
void sink(CopyToSharedState* sharedState, CopyToInfo* info) override;

void finalize(CopyToSharedState* sharedState) final;
void finalize(CopyToSharedState* sharedState) override;

private:
bool requireQuotes(const uint8_t* str, uint64_t len);
bool requireQuotes(CopyToCSVInfo* info, const uint8_t* str, uint64_t len);

std::string addEscapes(char toEscape, char escape, const std::string& val);

void writeString(const uint8_t* strData, uint64_t strLen, bool forceQuote);
void writeString(CopyToCSVInfo* info, const uint8_t* strData, uint64_t strLen, bool forceQuote);

void writeRows();
void writeRows(CopyToCSVInfo* info);

void writeHeader(CopyToSharedState* sharedState, CopyToCSVInfo* info);

private:
std::unique_ptr<common::BufferedSerializer> serializer;
Expand All @@ -50,24 +52,26 @@
std::vector<common::ValueVector*> castVectors;
std::vector<function::scalar_exec_func> castFuncs;
std::vector<std::shared_ptr<common::ValueVector>> vectorsToCast;
common::CSVOption* copyToOption;
};

class CopyToCSVSharedState : public CopyToSharedState {
class CopyToCSVSharedState final : public CopyToSharedState {
public:
void init(CopyToInfo* info, storage::MemoryManager* mm) final;
void init(CopyToInfo* info, storage::MemoryManager* mm) override;

void finalize() final {}
void finalize() override {}

void writeRows(const uint8_t* data, uint64_t size);

bool writeHeader();

private:
std::mutex mtx;
std::unique_ptr<common::FileInfo> fileInfo;
common::offset_t offset = 0;
bool outputHeader;
};

class CopyToCSV : public CopyTo {
class CopyToCSV final : public CopyTo {
public:
CopyToCSV(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<CopyToInfo> info, std::shared_ptr<CopyToSharedState> sharedState,
Expand All @@ -76,7 +80,7 @@
std::make_unique<CopyToCSVLocalState>(), std::move(sharedState),
PhysicalOperatorType::COPY_TO_CSV, std::move(child), id, paramsString} {}

inline std::unique_ptr<PhysicalOperator> clone() final {
inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyToCSV>(resultSetDescriptor->copy(), info->copy(), sharedState,
children[0]->clone(), id, paramsString);
}
Expand Down
20 changes: 10 additions & 10 deletions src/include/processor/operator/persistent/copy_to_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace kuzu {
namespace processor {

struct CopyToParquetInfo : public CopyToInfo {
struct CopyToParquetInfo final : public CopyToInfo {

Check warning on line 11 in src/include/processor/operator/persistent/copy_to_parquet.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/copy_to_parquet.h#L11

Added line #L11 was not covered by tests
kuzu_parquet::format::CompressionCodec::type codec =
kuzu_parquet::format::CompressionCodec::SNAPPY;
std::unique_ptr<FactorizedTableSchema> tableSchema;
Expand All @@ -27,32 +27,32 @@
}
};

class CopyToParquetLocalState : public CopyToLocalState {
void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) final;
class CopyToParquetLocalState final : public CopyToLocalState {

Check warning on line 30 in src/include/processor/operator/persistent/copy_to_parquet.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/copy_to_parquet.h#L30

Added line #L30 was not covered by tests
void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) override;

void sink(CopyToSharedState* sharedState) final;
void sink(CopyToSharedState* sharedState, CopyToInfo* info) override;

void finalize(CopyToSharedState* sharedState) final;
void finalize(CopyToSharedState* sharedState) override;

private:
std::unique_ptr<FactorizedTable> ft;
std::vector<common::ValueVector*> vectorsToAppend;
storage::MemoryManager* mm;
};

class CopyToParquetSharedState : public CopyToSharedState {
class CopyToParquetSharedState final : public CopyToSharedState {
public:
void init(CopyToInfo* info, storage::MemoryManager* mm) final;
void init(CopyToInfo* info, storage::MemoryManager* mm) override;

void finalize() final;
void finalize() override;

void flush(FactorizedTable& ft);

private:
std::unique_ptr<ParquetWriter> writer;
};

class CopyToParquet : public CopyTo {
class CopyToParquet final : public CopyTo {
public:
CopyToParquet(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<CopyToInfo> info, std::shared_ptr<CopyToSharedState> sharedState,
Expand All @@ -61,7 +61,7 @@
std::make_unique<CopyToParquetLocalState>(), std::move(sharedState),
PhysicalOperatorType::COPY_TO_PARQUET, std::move(child), id, paramsString} {}

std::unique_ptr<PhysicalOperator> clone() final {
std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyToParquet>(resultSetDescriptor->copy(), info->copy(),
sharedState, children[0]->clone(), id, paramsString);
}
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/persistent/copy_to.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void CopyTo::finalize(ExecutionContext* /*context*/) {

void CopyTo::executeInternal(processor::ExecutionContext* context) {
while (children[0]->getNextTuple(context)) {
localState->sink(sharedState.get());
localState->sink(sharedState.get(), info.get());
}
localState->finalize(sharedState.get());
}
Expand Down
72 changes: 51 additions & 21 deletions src/processor/operator/persistent/copy_to_csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@
unflatCastDataChunk->insert(i - numInsertedFlatVector, std::move(castVector));
}
}
copyToOption = copyToCSVInfo->copyToOption.get();
}

void CopyToCSVLocalState::sink(CopyToSharedState* sharedState) {
writeRows();
void CopyToCSVLocalState::sink(CopyToSharedState* sharedState, CopyToInfo* info) {
auto copyToCSVInfo = reinterpret_cast<CopyToCSVInfo*>(info);
writeHeader(sharedState, copyToCSVInfo);
writeRows(copyToCSVInfo);
if (serializer->getSize() > CopyToCSVConstants::DEFAULT_CSV_FLUSH_SIZE) {
reinterpret_cast<CopyToCSVSharedState*>(sharedState)
->writeRows(serializer->getBlobData(), serializer->getSize());
Expand All @@ -64,16 +65,17 @@
}
}

bool CopyToCSVLocalState::requireQuotes(const uint8_t* str, uint64_t len) {
bool CopyToCSVLocalState::requireQuotes(

Check warning on line 68 in src/processor/operator/persistent/copy_to_csv.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/persistent/copy_to_csv.cpp#L68

Added line #L68 was not covered by tests
CopyToCSVInfo* copyToCsvInfo, const uint8_t* str, uint64_t len) {
// Check if the string is equal to the null string.
if (len == strlen(CopyToCSVConstants::DEFAULT_NULL_STR) &&
memcmp(str, CopyToCSVConstants::DEFAULT_NULL_STR, len) == 0) {
return true;
}
for (auto i = 0u; i < len; i++) {
if (str[i] == copyToOption->quoteChar ||
if (str[i] == copyToCsvInfo->copyToOption->quoteChar ||
str[i] == CopyToCSVConstants::DEFAULT_CSV_NEWLINE[0] ||
str[i] == copyToOption->delimiter) {
str[i] == copyToCsvInfo->copyToOption->delimiter) {
return true;
}
}
Expand All @@ -100,42 +102,44 @@
return escapedStr;
}

void CopyToCSVLocalState::writeString(const uint8_t* strData, uint64_t strLen, bool forceQuote) {
void CopyToCSVLocalState::writeString(
CopyToCSVInfo* copyToCsvInfo, const uint8_t* strData, uint64_t strLen, bool forceQuote) {
if (!forceQuote) {
forceQuote = requireQuotes(strData, strLen);
forceQuote = requireQuotes(copyToCsvInfo, strData, strLen);
}
if (forceQuote) {
bool requiresEscape = false;
for (auto i = 0; i < strLen; i++) {
if (strData[i] == copyToOption->quoteChar || strData[i] == copyToOption->escapeChar) {
if (strData[i] == copyToCsvInfo->copyToOption->quoteChar ||
strData[i] == copyToCsvInfo->copyToOption->escapeChar) {
requiresEscape = true;
break;
}
}

if (!requiresEscape) {
serializer->writeBufferData(copyToOption->quoteChar);
serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar);
serializer->write(strData, strLen);
serializer->writeBufferData(copyToOption->quoteChar);
serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar);
return;
}

std::string strValToWrite = std::string(reinterpret_cast<const char*>(strData), strLen);
strValToWrite =
addEscapes(copyToOption->escapeChar, copyToOption->escapeChar, strValToWrite);
if (copyToOption->escapeChar != copyToOption->quoteChar) {
strValToWrite =
addEscapes(copyToOption->quoteChar, copyToOption->escapeChar, strValToWrite);
strValToWrite = addEscapes(copyToCsvInfo->copyToOption->escapeChar,
copyToCsvInfo->copyToOption->escapeChar, strValToWrite);
if (copyToCsvInfo->copyToOption->escapeChar != copyToCsvInfo->copyToOption->quoteChar) {
strValToWrite = addEscapes(copyToCsvInfo->copyToOption->quoteChar,
copyToCsvInfo->copyToOption->escapeChar, strValToWrite);
}
serializer->writeBufferData(copyToOption->quoteChar);
serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar);
serializer->writeBufferData(strValToWrite);
serializer->writeBufferData(copyToOption->quoteChar);
serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar);
} else {
serializer->write(strData, strLen);
}
}

void CopyToCSVLocalState::writeRows() {
void CopyToCSVLocalState::writeRows(CopyToCSVInfo* copyToCsvInfo) {
for (auto i = 0u; i < vectorsToCast.size(); i++) {
std::vector<std::shared_ptr<ValueVector>> vectorToCast = {vectorsToCast[i]};
castFuncs[i](vectorToCast, *castVectors[i], nullptr);
Expand All @@ -150,7 +154,7 @@
for (auto i = 0; i < numRowsToWrite; i++) {
for (auto j = 0u; j < castVectors.size(); j++) {
if (j != 0) {
serializer->writeBufferData(copyToOption->delimiter);
serializer->writeBufferData(copyToCsvInfo->copyToOption->delimiter);
}
auto vector = castVectors[j];
auto pos = vector->state->isFlat() ? vector->state->selVector->selectedPositions[0] :
Expand All @@ -162,16 +166,32 @@
}
auto strValue = vector->getValue<ku_string_t>(pos);
// Note: we need blindly add quotes to VAR_LIST.
writeString(strValue.getData(), strValue.len,
writeString(copyToCsvInfo, strValue.getData(), strValue.len,
CopyToCSVConstants::DEFAULT_FORCE_QUOTE ||
vectorsToCast[j]->dataType.getLogicalTypeID() == LogicalTypeID::VAR_LIST);
}
serializer->writeBufferData(CopyToCSVConstants::DEFAULT_CSV_NEWLINE);
}
}

void CopyToCSVLocalState::writeHeader(CopyToSharedState* sharedState, CopyToCSVInfo* info) {
auto copyToCSVSharedState = reinterpret_cast<CopyToCSVSharedState*>(sharedState);
if (!copyToCSVSharedState->writeHeader()) {
return;
}
for (auto i = 0u; i < info->names.size(); i++) {
if (i != 0) {
serializer->writeBufferData(info->copyToOption->delimiter);
}
writeString(info, reinterpret_cast<const uint8_t*>(info->names[i].c_str()),
info->names[i].length(), false /* forceQuote */);
}
serializer->writeBufferData(CopyToCSVConstants::DEFAULT_CSV_NEWLINE);
}

void CopyToCSVSharedState::init(CopyToInfo* info, MemoryManager* /*mm*/) {
fileInfo = FileUtils::openFile(info->fileName, O_WRONLY | O_CREAT | O_TRUNC);
outputHeader = reinterpret_cast<CopyToCSVInfo*>(info)->copyToOption->hasHeader;
}

void CopyToCSVSharedState::writeRows(const uint8_t* data, uint64_t size) {
Expand All @@ -180,5 +200,15 @@
offset += size;
}

bool CopyToCSVSharedState::writeHeader() {

Check warning on line 203 in src/processor/operator/persistent/copy_to_csv.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/persistent/copy_to_csv.cpp#L203

Added line #L203 was not covered by tests
acquamarin marked this conversation as resolved.
Show resolved Hide resolved
std::lock_guard lck(mtx);
if (outputHeader) {
outputHeader = false;
return true;
} else {
return false;
}
}

} // namespace processor
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/processor/operator/persistent/copy_to_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void CopyToParquetLocalState::init(
this->mm = mm;
}

void CopyToParquetLocalState::sink(CopyToSharedState* sharedState) {
void CopyToParquetLocalState::sink(CopyToSharedState* sharedState, CopyToInfo* /*info*/) {
ft->append(vectorsToAppend);
if (ft->getTotalNumFlatTuples() > StorageConstants::NODE_GROUP_SIZE) {
reinterpret_cast<CopyToParquetSharedState*>(sharedState)->flush(*ft);
Expand Down
26 changes: 13 additions & 13 deletions test/test_files/copy/copy_to_csv.test
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

-CASE TinySnbCopyToCSV

-STATEMENT COPY (MATCH (p:person) RETURN [id(p)], p.*) TO "${DATABASE_PATH}/tinysnb_person.csv"
-STATEMENT COPY (MATCH (p:person) RETURN [id(p)], p.*) TO "${DATABASE_PATH}/tinysnb_person.csv" (header=true)
---- ok
-STATEMENT load from "${DATABASE_PATH}/tinysnb_person.csv" return *
-STATEMENT load from "${DATABASE_PATH}/tinysnb_person.csv"(header=true) return *
---- 8
[0:0]|0|Alice|1|True|False|35|5.000000|1900-01-01|2011-08-20 11:25:30|3 years 2 days 13:02:00|[10,5]|[Aida]|[[10,8],[6,7,8]]|[96,54,86,92]|1.731000
[0:1]|2|Bob|2|True|False|30|5.100000|1900-01-01|2008-11-03 15:25:30.000526|10 years 5 months 13:00:00.000024|[12,8]|[Bobby]|[[8,9],[9,10]]|[98,42,93,88]|0.990000
Expand All @@ -20,24 +20,24 @@

-STATEMENT COPY (MATCH (m:movies) RETURN m.*) TO "${DATABASE_PATH}/movies.csv"
---- ok
-STATEMENT load from "${DATABASE_PATH}/movies.csv" return *
-STATEMENT load from "${DATABASE_PATH}/movies.csv"(header=false) return *
---- 3
Sóló cón tu párejâ|126| this is a very very good movie|{rating: 5.300000, stars: 2, views: 152, release: 2011-08-20 11:25:30, film: 2012-05-11, u8: 220, u16: 20, u32: 1, u64: 180, hugedata: 1844674407370955161811111111}|\xAA\xABinteresting\x0B|{audience1=52, audience53=42}|True
The 😂😃🧘🏻‍♂️🌍🌦️🍞🚗 movie|2544| the movie is very very good|{rating: 7.000000, stars: 10, views: 982, release: 2018-11-13 13:33:11, film: 2014-09-12, u8: 12, u16: 120, u32: 55, u64: 1, hugedata: -1844674407370955161511}|\xAB\xCD|{audience1=33}|8.989000
Roma|298|the movie is very interesting and funny|{rating: 1223.000000, stars: 100, views: 10003, release: 2011-02-11 16:44:22, film: 2013-02-22, u8: 1, u16: 15, u32: 200, u64: 4, hugedata: -15}|pure ascii characters|{}|254.000000


-STATEMENT COPY (MATCH (p:person)-[s:studyAt]->(o:organisation) RETURN p.ID, s.level, s.places, o.ID) TO "${DATABASE_PATH}/studyAt.csv"
-STATEMENT COPY (MATCH (p:person)-[s:studyAt]->(o:organisation) RETURN p.ID, s.level, s.places, o.ID) TO "${DATABASE_PATH}/studyAt.csv" (header=true)
---- ok
-STATEMENT load from "${DATABASE_PATH}/studyAt.csv" return *
-STATEMENT load from "${DATABASE_PATH}/studyAt.csv"(header=true) return *
---- 3
0|5|[wwAewsdndweusd,wek]|1
2|120|[anew,jsdnwusklklklwewsd]|1
8|2|[awndsnjwejwen,isuhuwennjnuhuhuwewe]|1

-STATEMENT COPY (MATCH (p:person)-[e:knows]->(p1:person) RETURN p.ID, p1.ID, p.grades, p1.grades) TO "${DATABASE_PATH}/onehop.csv"
-STATEMENT COPY (MATCH (p:person)-[e:knows]->(p1:person) RETURN p.ID, p1.ID, p.grades, p1.grades) TO "${DATABASE_PATH}/onehop.csv" (header=true)
---- ok
-STATEMENT load from "${DATABASE_PATH}/onehop.csv" return *
-STATEMENT load from "${DATABASE_PATH}/onehop.csv"(header=true) return *
---- 14
0|2|[96,54,86,92]|[98,42,93,88]
0|3|[96,54,86,92]|[91,75,21,95]
Expand All @@ -55,23 +55,23 @@ Roma|298|the movie is very interesting and funny|{rating: 1223.000000, stars: 10
7|9|[96,59,65,88]|[43,83,67,43]

-CASE CopyToWithNullAndEmptyList
-STATEMENT COPY (RETURN NULL,[],[1,3,NULL,5],[[2,3],[2],NULL,[1,5,6]], [['a'], []]) TO "${DATABASE_PATH}/nullAndEmptyList.csv"
-STATEMENT COPY (RETURN NULL,[],[1,3,NULL,5],[[2,3],[2],NULL,[1,5,6]], [['a'], []]) TO "${DATABASE_PATH}/nullAndEmptyList.csv" (header=false)
---- ok
-STATEMENT load from "${DATABASE_PATH}/nullAndEmptyList.csv" return *
-STATEMENT load from "${DATABASE_PATH}/nullAndEmptyList.csv" (header=false) return *
---- 1
|[]|[1,3,,5]|[[2,3],[2],,[1,5,6]]|[[a],[]]

-CASE StringEscapeCopyTo
-STATEMENT COPY (RETURN 100,'a string with "quotes"',5.6,'","',',') TO "${DATABASE_PATH}/string.csv"
-STATEMENT COPY (RETURN 100,'a string with "quotes"',5.6,'","',',') TO "${DATABASE_PATH}/string.csv" (header=true)
---- ok
-STATEMENT LOAD FROM '${DATABASE_PATH}/string.csv' RETURN *
-STATEMENT LOAD FROM '${DATABASE_PATH}/string.csv'(header=true) RETURN *
---- 1
100|a string with "quotes"|5.600000|","|,

-CASE StringCopyToWithOption
-STATEMENT COPY (RETURN 100,'kuzu is # a |graph database') TO "${DATABASE_PATH}/copy_to_with_option.csv" (delim = '|', QUOTE='#', Escape = '!')
-STATEMENT COPY (RETURN 100,'kuzu is # a |graph database') TO "${DATABASE_PATH}/copy_to_with_option.csv" (delim = '|', QUOTE='#', Escape = '!', header = false)
---- ok
-STATEMENT LOAD FROM '${DATABASE_PATH}/copy_to_with_option.csv' (delim = '|', QUOTE='#', Escape = '!') RETURN *
-STATEMENT LOAD FROM '${DATABASE_PATH}/copy_to_with_option.csv' (delim = '|', QUOTE='#', Escape = '!', header=false) RETURN *
---- 1
100|kuzu is # a |graph database

Expand Down