Skip to content

Commit

Permalink
Add copy-to-csv header
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Nov 16, 2023
1 parent 093e627 commit d70f842
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 61 deletions.
2 changes: 1 addition & 1 deletion src/include/processor/operator/persistent/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CopyToLocalState {

virtual void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) = 0;

virtual void sink(CopyToSharedState* sharedState) = 0;
virtual void sink(CopyToSharedState* sharedState, CopyToInfo* info) = 0;

virtual void finalize(CopyToSharedState* sharedState) = 0;
};
Expand Down
32 changes: 18 additions & 14 deletions src/include/processor/operator/persistent/copy_to_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace kuzu {
namespace processor {

struct CopyToCSVInfo : public CopyToInfo {
struct CopyToCSVInfo final : public CopyToInfo {

Check warning on line 11 in src/include/processor/operator/persistent/copy_to_csv.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/copy_to_csv.h#L11

Added line #L11 was not covered by tests
std::vector<bool> isFlat;
std::unique_ptr<common::CSVOption> copyToOption;

Expand All @@ -26,22 +26,24 @@ struct CopyToCSVInfo : public CopyToInfo {
}
};

class CopyToCSVLocalState : public CopyToLocalState {
class CopyToCSVLocalState final : public CopyToLocalState {

Check warning on line 29 in src/include/processor/operator/persistent/copy_to_csv.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/copy_to_csv.h#L29

Added line #L29 was not covered by tests
public:
void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) final;
void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) override;

void sink(CopyToSharedState* sharedState) final;
void sink(CopyToSharedState* sharedState, CopyToInfo* info) override;

void finalize(CopyToSharedState* sharedState) final;
void finalize(CopyToSharedState* sharedState) override;

private:
bool requireQuotes(const uint8_t* str, uint64_t len);
bool requireQuotes(CopyToCSVInfo* info, const uint8_t* str, uint64_t len);

std::string addEscapes(char toEscape, char escape, const std::string& val);

void writeString(const uint8_t* strData, uint64_t strLen, bool forceQuote);
void writeString(CopyToCSVInfo* info, const uint8_t* strData, uint64_t strLen, bool forceQuote);

void writeRows();
void writeRows(CopyToCSVInfo* info);

void writeHeader(CopyToSharedState* sharedState, CopyToCSVInfo* info);

private:
std::unique_ptr<common::BufferedSerializer> serializer;
Expand All @@ -50,24 +52,26 @@ class CopyToCSVLocalState : public CopyToLocalState {
std::vector<common::ValueVector*> castVectors;
std::vector<function::scalar_exec_func> castFuncs;
std::vector<std::shared_ptr<common::ValueVector>> vectorsToCast;
common::CSVOption* copyToOption;
};

class CopyToCSVSharedState : public CopyToSharedState {
class CopyToCSVSharedState final : public CopyToSharedState {
public:
void init(CopyToInfo* info, storage::MemoryManager* mm) final;
void init(CopyToInfo* info, storage::MemoryManager* mm) override;

void finalize() final {}
void finalize() override {}

void writeRows(const uint8_t* data, uint64_t size);

bool writeHeader();

private:
std::mutex mtx;
std::unique_ptr<common::FileInfo> fileInfo;
common::offset_t offset = 0;
bool outputHeader;
};

class CopyToCSV : public CopyTo {
class CopyToCSV final : public CopyTo {
public:
CopyToCSV(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<CopyToInfo> info, std::shared_ptr<CopyToSharedState> sharedState,
Expand All @@ -76,7 +80,7 @@ class CopyToCSV : public CopyTo {
std::make_unique<CopyToCSVLocalState>(), std::move(sharedState),
PhysicalOperatorType::COPY_TO_CSV, std::move(child), id, paramsString} {}

inline std::unique_ptr<PhysicalOperator> clone() final {
inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyToCSV>(resultSetDescriptor->copy(), info->copy(), sharedState,
children[0]->clone(), id, paramsString);
}
Expand Down
20 changes: 10 additions & 10 deletions src/include/processor/operator/persistent/copy_to_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace kuzu {
namespace processor {

struct CopyToParquetInfo : public CopyToInfo {
struct CopyToParquetInfo final : public CopyToInfo {

Check warning on line 11 in src/include/processor/operator/persistent/copy_to_parquet.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/copy_to_parquet.h#L11

Added line #L11 was not covered by tests
kuzu_parquet::format::CompressionCodec::type codec =
kuzu_parquet::format::CompressionCodec::SNAPPY;
std::unique_ptr<FactorizedTableSchema> tableSchema;
Expand All @@ -27,32 +27,32 @@ struct CopyToParquetInfo : public CopyToInfo {
}
};

class CopyToParquetLocalState : public CopyToLocalState {
void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) final;
class CopyToParquetLocalState final : public CopyToLocalState {

Check warning on line 30 in src/include/processor/operator/persistent/copy_to_parquet.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/copy_to_parquet.h#L30

Added line #L30 was not covered by tests
void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) override;

void sink(CopyToSharedState* sharedState) final;
void sink(CopyToSharedState* sharedState, CopyToInfo* info) override;

void finalize(CopyToSharedState* sharedState) final;
void finalize(CopyToSharedState* sharedState) override;

private:
std::unique_ptr<FactorizedTable> ft;
std::vector<common::ValueVector*> vectorsToAppend;
storage::MemoryManager* mm;
};

class CopyToParquetSharedState : public CopyToSharedState {
class CopyToParquetSharedState final : public CopyToSharedState {
public:
void init(CopyToInfo* info, storage::MemoryManager* mm) final;
void init(CopyToInfo* info, storage::MemoryManager* mm) override;

void finalize() final;
void finalize() override;

void flush(FactorizedTable& ft);

private:
std::unique_ptr<ParquetWriter> writer;
};

class CopyToParquet : public CopyTo {
class CopyToParquet final : public CopyTo {
public:
CopyToParquet(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<CopyToInfo> info, std::shared_ptr<CopyToSharedState> sharedState,
Expand All @@ -61,7 +61,7 @@ class CopyToParquet : public CopyTo {
std::make_unique<CopyToParquetLocalState>(), std::move(sharedState),
PhysicalOperatorType::COPY_TO_PARQUET, std::move(child), id, paramsString} {}

std::unique_ptr<PhysicalOperator> clone() final {
std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyToParquet>(resultSetDescriptor->copy(), info->copy(),
sharedState, children[0]->clone(), id, paramsString);
}
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/persistent/copy_to.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void CopyTo::finalize(ExecutionContext* /*context*/) {

void CopyTo::executeInternal(processor::ExecutionContext* context) {
while (children[0]->getNextTuple(context)) {
localState->sink(sharedState.get());
localState->sink(sharedState.get(), info.get());
}
localState->finalize(sharedState.get());
}
Expand Down
73 changes: 52 additions & 21 deletions src/processor/operator/persistent/copy_to_csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ void CopyToCSVLocalState::init(CopyToInfo* info, MemoryManager* mm, ResultSet* r
unflatCastDataChunk->insert(i - numInsertedFlatVector, std::move(castVector));
}
}
copyToOption = copyToCSVInfo->copyToOption.get();
}

void CopyToCSVLocalState::sink(CopyToSharedState* sharedState) {
writeRows();
void CopyToCSVLocalState::sink(CopyToSharedState* sharedState, CopyToInfo* info) {
auto copyToCSVInfo = reinterpret_cast<CopyToCSVInfo*>(info);
writeHeader(sharedState, copyToCSVInfo);
writeRows(copyToCSVInfo);
if (serializer->getSize() > CopyToCSVConstants::DEFAULT_CSV_FLUSH_SIZE) {
reinterpret_cast<CopyToCSVSharedState*>(sharedState)
->writeRows(serializer->getBlobData(), serializer->getSize());
Expand All @@ -64,16 +65,17 @@ void CopyToCSVLocalState::finalize(CopyToSharedState* sharedState) {
}
}

bool CopyToCSVLocalState::requireQuotes(const uint8_t* str, uint64_t len) {
bool CopyToCSVLocalState::requireQuotes(

Check warning on line 68 in src/processor/operator/persistent/copy_to_csv.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/persistent/copy_to_csv.cpp#L68

Added line #L68 was not covered by tests
CopyToCSVInfo* copyToCsvInfo, const uint8_t* str, uint64_t len) {
// Check if the string is equal to the null string.
if (len == strlen(CopyToCSVConstants::DEFAULT_NULL_STR) &&
memcmp(str, CopyToCSVConstants::DEFAULT_NULL_STR, len) == 0) {
return true;
}
for (auto i = 0u; i < len; i++) {
if (str[i] == copyToOption->quoteChar ||
if (str[i] == copyToCsvInfo->copyToOption->quoteChar ||
str[i] == CopyToCSVConstants::DEFAULT_CSV_NEWLINE[0] ||
str[i] == copyToOption->delimiter) {
str[i] == copyToCsvInfo->copyToOption->delimiter) {
return true;
}
}
Expand All @@ -100,42 +102,44 @@ std::string CopyToCSVLocalState::addEscapes(char toEscape, char escape, const st
return escapedStr;
}

void CopyToCSVLocalState::writeString(const uint8_t* strData, uint64_t strLen, bool forceQuote) {
void CopyToCSVLocalState::writeString(
CopyToCSVInfo* copyToCsvInfo, const uint8_t* strData, uint64_t strLen, bool forceQuote) {
if (!forceQuote) {
forceQuote = requireQuotes(strData, strLen);
forceQuote = requireQuotes(copyToCsvInfo, strData, strLen);
}
if (forceQuote) {
bool requiresEscape = false;
for (auto i = 0; i < strLen; i++) {
if (strData[i] == copyToOption->quoteChar || strData[i] == copyToOption->escapeChar) {
if (strData[i] == copyToCsvInfo->copyToOption->quoteChar ||
strData[i] == copyToCsvInfo->copyToOption->escapeChar) {
requiresEscape = true;
break;
}
}

if (!requiresEscape) {
serializer->writeBufferData(copyToOption->quoteChar);
serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar);
serializer->write(strData, strLen);
serializer->writeBufferData(copyToOption->quoteChar);
serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar);
return;
}

std::string strValToWrite = std::string(reinterpret_cast<const char*>(strData), strLen);
strValToWrite =
addEscapes(copyToOption->escapeChar, copyToOption->escapeChar, strValToWrite);
if (copyToOption->escapeChar != copyToOption->quoteChar) {
strValToWrite =
addEscapes(copyToOption->quoteChar, copyToOption->escapeChar, strValToWrite);
strValToWrite = addEscapes(copyToCsvInfo->copyToOption->escapeChar,
copyToCsvInfo->copyToOption->escapeChar, strValToWrite);
if (copyToCsvInfo->copyToOption->escapeChar != copyToCsvInfo->copyToOption->quoteChar) {
strValToWrite = addEscapes(copyToCsvInfo->copyToOption->quoteChar,
copyToCsvInfo->copyToOption->escapeChar, strValToWrite);
}
serializer->writeBufferData(copyToOption->quoteChar);
serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar);
serializer->writeBufferData(strValToWrite);
serializer->writeBufferData(copyToOption->quoteChar);
serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar);
} else {
serializer->write(strData, strLen);
}
}

void CopyToCSVLocalState::writeRows() {
void CopyToCSVLocalState::writeRows(CopyToCSVInfo* copyToCsvInfo) {
for (auto i = 0u; i < vectorsToCast.size(); i++) {
std::vector<std::shared_ptr<ValueVector>> vectorToCast = {vectorsToCast[i]};
castFuncs[i](vectorToCast, *castVectors[i], nullptr);
Expand All @@ -150,7 +154,7 @@ void CopyToCSVLocalState::writeRows() {
for (auto i = 0; i < numRowsToWrite; i++) {
for (auto j = 0u; j < castVectors.size(); j++) {
if (j != 0) {
serializer->writeBufferData(copyToOption->delimiter);
serializer->writeBufferData(copyToCsvInfo->copyToOption->delimiter);
}
auto vector = castVectors[j];
auto pos = vector->state->isFlat() ? vector->state->selVector->selectedPositions[0] :
Expand All @@ -162,16 +166,33 @@ void CopyToCSVLocalState::writeRows() {
}
auto strValue = vector->getValue<ku_string_t>(pos);
// Note: we need blindly add quotes to VAR_LIST.
writeString(strValue.getData(), strValue.len,
writeString(copyToCsvInfo, strValue.getData(), strValue.len,
CopyToCSVConstants::DEFAULT_FORCE_QUOTE ||
vectorsToCast[j]->dataType.getLogicalTypeID() == LogicalTypeID::VAR_LIST);
}
serializer->writeBufferData(CopyToCSVConstants::DEFAULT_CSV_NEWLINE);
}
}

void CopyToCSVLocalState::writeHeader(CopyToSharedState* sharedState, CopyToCSVInfo* info) {
auto copyToCSVSharedState = reinterpret_cast<CopyToCSVSharedState*>(sharedState);
if (!copyToCSVSharedState->writeHeader()) {
return;
}
for (auto i = 0u; i < info->names.size(); i++) {
if (i != 0) {
serializer->writeBufferData(info->copyToOption->delimiter);
}
writeString(info, reinterpret_cast<const uint8_t*>(info->names[i].c_str()),
info->names[i].length(), false /* forceQuote */);
serializer->writeBufferData(info->names[i]);
}
serializer->writeBufferData(CopyToCSVConstants::DEFAULT_CSV_NEWLINE);
}

void CopyToCSVSharedState::init(CopyToInfo* info, MemoryManager* /*mm*/) {
fileInfo = FileUtils::openFile(info->fileName, O_WRONLY | O_CREAT | O_TRUNC);
outputHeader = reinterpret_cast<CopyToCSVInfo*>(info)->copyToOption->hasHeader;
}

void CopyToCSVSharedState::writeRows(const uint8_t* data, uint64_t size) {
Expand All @@ -180,5 +201,15 @@ void CopyToCSVSharedState::writeRows(const uint8_t* data, uint64_t size) {
offset += size;
}

bool CopyToCSVSharedState::writeHeader() {

Check warning on line 204 in src/processor/operator/persistent/copy_to_csv.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/persistent/copy_to_csv.cpp#L204

Added line #L204 was not covered by tests
std::lock_guard lck(mtx);
if (outputHeader) {
outputHeader = false;
return true;
} else {
return false;
}
}

} // namespace processor
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/processor/operator/persistent/copy_to_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void CopyToParquetLocalState::init(
this->mm = mm;
}

void CopyToParquetLocalState::sink(CopyToSharedState* sharedState) {
void CopyToParquetLocalState::sink(CopyToSharedState* sharedState, CopyToInfo* /*info*/) {
ft->append(vectorsToAppend);
if (ft->getTotalNumFlatTuples() > StorageConstants::NODE_GROUP_SIZE) {
reinterpret_cast<CopyToParquetSharedState*>(sharedState)->flush(*ft);
Expand Down
26 changes: 13 additions & 13 deletions test/test_files/copy/copy_to_csv.test
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

-CASE TinySnbCopyToCSV

-STATEMENT COPY (MATCH (p:person) RETURN [id(p)], p.*) TO "${DATABASE_PATH}/tinysnb_person.csv"
-STATEMENT COPY (MATCH (p:person) RETURN [id(p)], p.*) TO "${DATABASE_PATH}/tinysnb_person.csv" (header=true)
---- ok
-STATEMENT load from "${DATABASE_PATH}/tinysnb_person.csv" return *
-STATEMENT load from "${DATABASE_PATH}/tinysnb_person.csv"(header=true) return *
---- 8
[0:0]|0|Alice|1|True|False|35|5.000000|1900-01-01|2011-08-20 11:25:30|3 years 2 days 13:02:00|[10,5]|[Aida]|[[10,8],[6,7,8]]|[96,54,86,92]|1.731000
[0:1]|2|Bob|2|True|False|30|5.100000|1900-01-01|2008-11-03 15:25:30.000526|10 years 5 months 13:00:00.000024|[12,8]|[Bobby]|[[8,9],[9,10]]|[98,42,93,88]|0.990000
Expand All @@ -20,24 +20,24 @@

-STATEMENT COPY (MATCH (m:movies) RETURN m.*) TO "${DATABASE_PATH}/movies.csv"
---- ok
-STATEMENT load from "${DATABASE_PATH}/movies.csv" return *
-STATEMENT load from "${DATABASE_PATH}/movies.csv"(header=false) return *
---- 3
Sóló cón tu párejâ|126| this is a very very good movie|{rating: 5.300000, stars: 2, views: 152, release: 2011-08-20 11:25:30, film: 2012-05-11, u8: 220, u16: 20, u32: 1, u64: 180, hugedata: 1844674407370955161811111111}|\xAA\xABinteresting\x0B|{audience1=52, audience53=42}|True
The 😂😃🧘🏻‍♂️🌍🌦️🍞🚗 movie|2544| the movie is very very good|{rating: 7.000000, stars: 10, views: 982, release: 2018-11-13 13:33:11, film: 2014-09-12, u8: 12, u16: 120, u32: 55, u64: 1, hugedata: -1844674407370955161511}|\xAB\xCD|{audience1=33}|8.989000
Roma|298|the movie is very interesting and funny|{rating: 1223.000000, stars: 100, views: 10003, release: 2011-02-11 16:44:22, film: 2013-02-22, u8: 1, u16: 15, u32: 200, u64: 4, hugedata: -15}|pure ascii characters|{}|254.000000


-STATEMENT COPY (MATCH (p:person)-[s:studyAt]->(o:organisation) RETURN p.ID, s.level, s.places, o.ID) TO "${DATABASE_PATH}/studyAt.csv"
-STATEMENT COPY (MATCH (p:person)-[s:studyAt]->(o:organisation) RETURN p.ID, s.level, s.places, o.ID) TO "${DATABASE_PATH}/studyAt.csv" (header=true)
---- ok
-STATEMENT load from "${DATABASE_PATH}/studyAt.csv" return *
-STATEMENT load from "${DATABASE_PATH}/studyAt.csv"(header=true) return *
---- 3
0|5|[wwAewsdndweusd,wek]|1
2|120|[anew,jsdnwusklklklwewsd]|1
8|2|[awndsnjwejwen,isuhuwennjnuhuhuwewe]|1

-STATEMENT COPY (MATCH (p:person)-[e:knows]->(p1:person) RETURN p.ID, p1.ID, p.grades, p1.grades) TO "${DATABASE_PATH}/onehop.csv"
-STATEMENT COPY (MATCH (p:person)-[e:knows]->(p1:person) RETURN p.ID, p1.ID, p.grades, p1.grades) TO "${DATABASE_PATH}/onehop.csv" (header=true)
---- ok
-STATEMENT load from "${DATABASE_PATH}/onehop.csv" return *
-STATEMENT load from "${DATABASE_PATH}/onehop.csv"(header=true) return *
---- 14
0|2|[96,54,86,92]|[98,42,93,88]
0|3|[96,54,86,92]|[91,75,21,95]
Expand All @@ -55,23 +55,23 @@ Roma|298|the movie is very interesting and funny|{rating: 1223.000000, stars: 10
7|9|[96,59,65,88]|[43,83,67,43]

-CASE CopyToWithNullAndEmptyList
-STATEMENT COPY (RETURN NULL,[],[1,3,NULL,5],[[2,3],[2],NULL,[1,5,6]], [['a'], []]) TO "${DATABASE_PATH}/nullAndEmptyList.csv"
-STATEMENT COPY (RETURN NULL,[],[1,3,NULL,5],[[2,3],[2],NULL,[1,5,6]], [['a'], []]) TO "${DATABASE_PATH}/nullAndEmptyList.csv" (header=false)
---- ok
-STATEMENT load from "${DATABASE_PATH}/nullAndEmptyList.csv" return *
-STATEMENT load from "${DATABASE_PATH}/nullAndEmptyList.csv" (header=false) return *
---- 1
|[]|[1,3,,5]|[[2,3],[2],,[1,5,6]]|[[a],[]]

-CASE StringEscapeCopyTo
-STATEMENT COPY (RETURN 100,'a string with "quotes"',5.6,'","',',') TO "${DATABASE_PATH}/string.csv"
-STATEMENT COPY (RETURN 100,'a string with "quotes"',5.6,'","',',') TO "${DATABASE_PATH}/string.csv" (header=false)
---- ok
-STATEMENT LOAD FROM '${DATABASE_PATH}/string.csv' RETURN *
-STATEMENT LOAD FROM '${DATABASE_PATH}/string.csv'(header=false) RETURN *
---- 1
100|a string with "quotes"|5.600000|","|,

-CASE StringCopyToWithOption
-STATEMENT COPY (RETURN 100,'kuzu is # a |graph database') TO "${DATABASE_PATH}/copy_to_with_option.csv" (delim = '|', QUOTE='#', Escape = '!')
-STATEMENT COPY (RETURN 100,'kuzu is # a |graph database') TO "${DATABASE_PATH}/copy_to_with_option.csv" (delim = '|', QUOTE='#', Escape = '!', header = false)
---- ok
-STATEMENT LOAD FROM '${DATABASE_PATH}/copy_to_with_option.csv' (delim = '|', QUOTE='#', Escape = '!') RETURN *
-STATEMENT LOAD FROM '${DATABASE_PATH}/copy_to_with_option.csv' (delim = '|', QUOTE='#', Escape = '!', header=false) RETURN *
---- 1
100|kuzu is # a |graph database

Expand Down

0 comments on commit d70f842

Please sign in to comment.