diff --git a/src/include/processor/operator/persistent/copy_to.h b/src/include/processor/operator/persistent/copy_to.h index dd58d2bc4fa..55245824674 100644 --- a/src/include/processor/operator/persistent/copy_to.h +++ b/src/include/processor/operator/persistent/copy_to.h @@ -27,7 +27,7 @@ class CopyToLocalState { virtual void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) = 0; - virtual void sink(CopyToSharedState* sharedState) = 0; + virtual void sink(CopyToSharedState* sharedState, CopyToInfo* info) = 0; virtual void finalize(CopyToSharedState* sharedState) = 0; }; diff --git a/src/include/processor/operator/persistent/copy_to_csv.h b/src/include/processor/operator/persistent/copy_to_csv.h index 626e1caba1f..d26458c80ea 100644 --- a/src/include/processor/operator/persistent/copy_to_csv.h +++ b/src/include/processor/operator/persistent/copy_to_csv.h @@ -8,7 +8,7 @@ namespace kuzu { namespace processor { -struct CopyToCSVInfo : public CopyToInfo { +struct CopyToCSVInfo final : public CopyToInfo { std::vector isFlat; std::unique_ptr copyToOption; @@ -26,22 +26,24 @@ struct CopyToCSVInfo : public CopyToInfo { } }; -class CopyToCSVLocalState : public CopyToLocalState { +class CopyToCSVLocalState final : public CopyToLocalState { public: - void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) final; + void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) override; - void sink(CopyToSharedState* sharedState) final; + void sink(CopyToSharedState* sharedState, CopyToInfo* info) override; - void finalize(CopyToSharedState* sharedState) final; + void finalize(CopyToSharedState* sharedState) override; private: - bool requireQuotes(const uint8_t* str, uint64_t len); + bool requireQuotes(CopyToCSVInfo* info, const uint8_t* str, uint64_t len); std::string addEscapes(char toEscape, char escape, const std::string& val); - void writeString(const uint8_t* strData, uint64_t strLen, bool forceQuote); + void writeString(CopyToCSVInfo* info, const uint8_t* strData, uint64_t strLen, bool forceQuote); - void writeRows(); + void writeRows(CopyToCSVInfo* info); + + void writeHeader(CopyToSharedState* sharedState, CopyToCSVInfo* info); private: std::unique_ptr serializer; @@ -50,24 +52,26 @@ class CopyToCSVLocalState : public CopyToLocalState { std::vector castVectors; std::vector castFuncs; std::vector> vectorsToCast; - common::CSVOption* copyToOption; }; -class CopyToCSVSharedState : public CopyToSharedState { +class CopyToCSVSharedState final : public CopyToSharedState { public: - void init(CopyToInfo* info, storage::MemoryManager* mm) final; + void init(CopyToInfo* info, storage::MemoryManager* mm) override; - void finalize() final {} + void finalize() override {} void writeRows(const uint8_t* data, uint64_t size); + bool writeHeader(); + private: std::mutex mtx; std::unique_ptr fileInfo; common::offset_t offset = 0; + bool outputHeader; }; -class CopyToCSV : public CopyTo { +class CopyToCSV final : public CopyTo { public: CopyToCSV(std::unique_ptr resultSetDescriptor, std::unique_ptr info, std::shared_ptr sharedState, @@ -76,7 +80,7 @@ class CopyToCSV : public CopyTo { std::make_unique(), std::move(sharedState), PhysicalOperatorType::COPY_TO_CSV, std::move(child), id, paramsString} {} - inline std::unique_ptr clone() final { + inline std::unique_ptr clone() override { return std::make_unique(resultSetDescriptor->copy(), info->copy(), sharedState, children[0]->clone(), id, paramsString); } diff --git a/src/include/processor/operator/persistent/copy_to_parquet.h b/src/include/processor/operator/persistent/copy_to_parquet.h index 6984f4e1f07..e709ba87f3f 100644 --- a/src/include/processor/operator/persistent/copy_to_parquet.h +++ b/src/include/processor/operator/persistent/copy_to_parquet.h @@ -8,7 +8,7 @@ namespace kuzu { namespace processor { -struct CopyToParquetInfo : public CopyToInfo { +struct CopyToParquetInfo final : public CopyToInfo { kuzu_parquet::format::CompressionCodec::type codec = kuzu_parquet::format::CompressionCodec::SNAPPY; std::unique_ptr tableSchema; @@ -27,12 +27,12 @@ struct CopyToParquetInfo : public CopyToInfo { } }; -class CopyToParquetLocalState : public CopyToLocalState { - void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) final; +class CopyToParquetLocalState final : public CopyToLocalState { + void init(CopyToInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) override; - void sink(CopyToSharedState* sharedState) final; + void sink(CopyToSharedState* sharedState, CopyToInfo* info) override; - void finalize(CopyToSharedState* sharedState) final; + void finalize(CopyToSharedState* sharedState) override; private: std::unique_ptr ft; @@ -40,11 +40,11 @@ class CopyToParquetLocalState : public CopyToLocalState { storage::MemoryManager* mm; }; -class CopyToParquetSharedState : public CopyToSharedState { +class CopyToParquetSharedState final : public CopyToSharedState { public: - void init(CopyToInfo* info, storage::MemoryManager* mm) final; + void init(CopyToInfo* info, storage::MemoryManager* mm) override; - void finalize() final; + void finalize() override; void flush(FactorizedTable& ft); @@ -52,7 +52,7 @@ class CopyToParquetSharedState : public CopyToSharedState { std::unique_ptr writer; }; -class CopyToParquet : public CopyTo { +class CopyToParquet final : public CopyTo { public: CopyToParquet(std::unique_ptr resultSetDescriptor, std::unique_ptr info, std::shared_ptr sharedState, @@ -61,7 +61,7 @@ class CopyToParquet : public CopyTo { std::make_unique(), std::move(sharedState), PhysicalOperatorType::COPY_TO_PARQUET, std::move(child), id, paramsString} {} - std::unique_ptr clone() final { + std::unique_ptr clone() override { return std::make_unique(resultSetDescriptor->copy(), info->copy(), sharedState, children[0]->clone(), id, paramsString); } diff --git a/src/processor/operator/persistent/copy_to.cpp b/src/processor/operator/persistent/copy_to.cpp index 8c8ac96c272..ff7f087c71c 100644 --- a/src/processor/operator/persistent/copy_to.cpp +++ b/src/processor/operator/persistent/copy_to.cpp @@ -17,7 +17,7 @@ void CopyTo::finalize(ExecutionContext* /*context*/) { void CopyTo::executeInternal(processor::ExecutionContext* context) { while (children[0]->getNextTuple(context)) { - localState->sink(sharedState.get()); + localState->sink(sharedState.get(), info.get()); } localState->finalize(sharedState.get()); } diff --git a/src/processor/operator/persistent/copy_to_csv.cpp b/src/processor/operator/persistent/copy_to_csv.cpp index e2886dbc8b6..0a84637c249 100644 --- a/src/processor/operator/persistent/copy_to_csv.cpp +++ b/src/processor/operator/persistent/copy_to_csv.cpp @@ -44,11 +44,12 @@ void CopyToCSVLocalState::init(CopyToInfo* info, MemoryManager* mm, ResultSet* r unflatCastDataChunk->insert(i - numInsertedFlatVector, std::move(castVector)); } } - copyToOption = copyToCSVInfo->copyToOption.get(); } -void CopyToCSVLocalState::sink(CopyToSharedState* sharedState) { - writeRows(); +void CopyToCSVLocalState::sink(CopyToSharedState* sharedState, CopyToInfo* info) { + auto copyToCSVInfo = reinterpret_cast(info); + writeHeader(sharedState, copyToCSVInfo); + writeRows(copyToCSVInfo); if (serializer->getSize() > CopyToCSVConstants::DEFAULT_CSV_FLUSH_SIZE) { reinterpret_cast(sharedState) ->writeRows(serializer->getBlobData(), serializer->getSize()); @@ -64,16 +65,17 @@ void CopyToCSVLocalState::finalize(CopyToSharedState* sharedState) { } } -bool CopyToCSVLocalState::requireQuotes(const uint8_t* str, uint64_t len) { +bool CopyToCSVLocalState::requireQuotes( + CopyToCSVInfo* copyToCsvInfo, const uint8_t* str, uint64_t len) { // Check if the string is equal to the null string. if (len == strlen(CopyToCSVConstants::DEFAULT_NULL_STR) && memcmp(str, CopyToCSVConstants::DEFAULT_NULL_STR, len) == 0) { return true; } for (auto i = 0u; i < len; i++) { - if (str[i] == copyToOption->quoteChar || + if (str[i] == copyToCsvInfo->copyToOption->quoteChar || str[i] == CopyToCSVConstants::DEFAULT_CSV_NEWLINE[0] || - str[i] == copyToOption->delimiter) { + str[i] == copyToCsvInfo->copyToOption->delimiter) { return true; } } @@ -100,42 +102,44 @@ std::string CopyToCSVLocalState::addEscapes(char toEscape, char escape, const st return escapedStr; } -void CopyToCSVLocalState::writeString(const uint8_t* strData, uint64_t strLen, bool forceQuote) { +void CopyToCSVLocalState::writeString( + CopyToCSVInfo* copyToCsvInfo, const uint8_t* strData, uint64_t strLen, bool forceQuote) { if (!forceQuote) { - forceQuote = requireQuotes(strData, strLen); + forceQuote = requireQuotes(copyToCsvInfo, strData, strLen); } if (forceQuote) { bool requiresEscape = false; for (auto i = 0; i < strLen; i++) { - if (strData[i] == copyToOption->quoteChar || strData[i] == copyToOption->escapeChar) { + if (strData[i] == copyToCsvInfo->copyToOption->quoteChar || + strData[i] == copyToCsvInfo->copyToOption->escapeChar) { requiresEscape = true; break; } } if (!requiresEscape) { - serializer->writeBufferData(copyToOption->quoteChar); + serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar); serializer->write(strData, strLen); - serializer->writeBufferData(copyToOption->quoteChar); + serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar); return; } std::string strValToWrite = std::string(reinterpret_cast(strData), strLen); - strValToWrite = - addEscapes(copyToOption->escapeChar, copyToOption->escapeChar, strValToWrite); - if (copyToOption->escapeChar != copyToOption->quoteChar) { - strValToWrite = - addEscapes(copyToOption->quoteChar, copyToOption->escapeChar, strValToWrite); + strValToWrite = addEscapes(copyToCsvInfo->copyToOption->escapeChar, + copyToCsvInfo->copyToOption->escapeChar, strValToWrite); + if (copyToCsvInfo->copyToOption->escapeChar != copyToCsvInfo->copyToOption->quoteChar) { + strValToWrite = addEscapes(copyToCsvInfo->copyToOption->quoteChar, + copyToCsvInfo->copyToOption->escapeChar, strValToWrite); } - serializer->writeBufferData(copyToOption->quoteChar); + serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar); serializer->writeBufferData(strValToWrite); - serializer->writeBufferData(copyToOption->quoteChar); + serializer->writeBufferData(copyToCsvInfo->copyToOption->quoteChar); } else { serializer->write(strData, strLen); } } -void CopyToCSVLocalState::writeRows() { +void CopyToCSVLocalState::writeRows(CopyToCSVInfo* copyToCsvInfo) { for (auto i = 0u; i < vectorsToCast.size(); i++) { std::vector> vectorToCast = {vectorsToCast[i]}; castFuncs[i](vectorToCast, *castVectors[i], nullptr); @@ -150,7 +154,7 @@ void CopyToCSVLocalState::writeRows() { for (auto i = 0; i < numRowsToWrite; i++) { for (auto j = 0u; j < castVectors.size(); j++) { if (j != 0) { - serializer->writeBufferData(copyToOption->delimiter); + serializer->writeBufferData(copyToCsvInfo->copyToOption->delimiter); } auto vector = castVectors[j]; auto pos = vector->state->isFlat() ? vector->state->selVector->selectedPositions[0] : @@ -162,7 +166,7 @@ void CopyToCSVLocalState::writeRows() { } auto strValue = vector->getValue(pos); // Note: we need blindly add quotes to VAR_LIST. - writeString(strValue.getData(), strValue.len, + writeString(copyToCsvInfo, strValue.getData(), strValue.len, CopyToCSVConstants::DEFAULT_FORCE_QUOTE || vectorsToCast[j]->dataType.getLogicalTypeID() == LogicalTypeID::VAR_LIST); } @@ -170,8 +174,25 @@ void CopyToCSVLocalState::writeRows() { } } +void CopyToCSVLocalState::writeHeader(CopyToSharedState* sharedState, CopyToCSVInfo* info) { + auto copyToCSVSharedState = reinterpret_cast(sharedState); + if (!copyToCSVSharedState->writeHeader()) { + return; + } + for (auto i = 0u; i < info->names.size(); i++) { + if (i != 0) { + serializer->writeBufferData(info->copyToOption->delimiter); + } + writeString(info, reinterpret_cast(info->names[i].c_str()), + info->names[i].length(), false /* forceQuote */); + serializer->writeBufferData(info->names[i]); + } + serializer->writeBufferData(CopyToCSVConstants::DEFAULT_CSV_NEWLINE); +} + void CopyToCSVSharedState::init(CopyToInfo* info, MemoryManager* /*mm*/) { fileInfo = FileUtils::openFile(info->fileName, O_WRONLY | O_CREAT | O_TRUNC); + outputHeader = reinterpret_cast(info)->copyToOption->hasHeader; } void CopyToCSVSharedState::writeRows(const uint8_t* data, uint64_t size) { @@ -180,5 +201,15 @@ void CopyToCSVSharedState::writeRows(const uint8_t* data, uint64_t size) { offset += size; } +bool CopyToCSVSharedState::writeHeader() { + std::lock_guard lck(mtx); + if (outputHeader) { + outputHeader = false; + return true; + } else { + return false; + } +} + } // namespace processor } // namespace kuzu diff --git a/src/processor/operator/persistent/copy_to_parquet.cpp b/src/processor/operator/persistent/copy_to_parquet.cpp index e54458980d0..487960880ac 100644 --- a/src/processor/operator/persistent/copy_to_parquet.cpp +++ b/src/processor/operator/persistent/copy_to_parquet.cpp @@ -17,7 +17,7 @@ void CopyToParquetLocalState::init( this->mm = mm; } -void CopyToParquetLocalState::sink(CopyToSharedState* sharedState) { +void CopyToParquetLocalState::sink(CopyToSharedState* sharedState, CopyToInfo* /*info*/) { ft->append(vectorsToAppend); if (ft->getTotalNumFlatTuples() > StorageConstants::NODE_GROUP_SIZE) { reinterpret_cast(sharedState)->flush(*ft); diff --git a/test/test_files/copy/copy_to_csv.test b/test/test_files/copy/copy_to_csv.test index c611410d1d7..f014502842f 100644 --- a/test/test_files/copy/copy_to_csv.test +++ b/test/test_files/copy/copy_to_csv.test @@ -5,9 +5,9 @@ -CASE TinySnbCopyToCSV --STATEMENT COPY (MATCH (p:person) RETURN [id(p)], p.*) TO "${DATABASE_PATH}/tinysnb_person.csv" +-STATEMENT COPY (MATCH (p:person) RETURN [id(p)], p.*) TO "${DATABASE_PATH}/tinysnb_person.csv" (header=true) ---- ok --STATEMENT load from "${DATABASE_PATH}/tinysnb_person.csv" return * +-STATEMENT load from "${DATABASE_PATH}/tinysnb_person.csv"(header=true) return * ---- 8 [0:0]|0|Alice|1|True|False|35|5.000000|1900-01-01|2011-08-20 11:25:30|3 years 2 days 13:02:00|[10,5]|[Aida]|[[10,8],[6,7,8]]|[96,54,86,92]|1.731000 [0:1]|2|Bob|2|True|False|30|5.100000|1900-01-01|2008-11-03 15:25:30.000526|10 years 5 months 13:00:00.000024|[12,8]|[Bobby]|[[8,9],[9,10]]|[98,42,93,88]|0.990000 @@ -20,24 +20,24 @@ -STATEMENT COPY (MATCH (m:movies) RETURN m.*) TO "${DATABASE_PATH}/movies.csv" ---- ok --STATEMENT load from "${DATABASE_PATH}/movies.csv" return * +-STATEMENT load from "${DATABASE_PATH}/movies.csv"(header=false) return * ---- 3 Sóló cón tu párejâ|126| this is a very very good movie|{rating: 5.300000, stars: 2, views: 152, release: 2011-08-20 11:25:30, film: 2012-05-11, u8: 220, u16: 20, u32: 1, u64: 180, hugedata: 1844674407370955161811111111}|\xAA\xABinteresting\x0B|{audience1=52, audience53=42}|True The 😂😃🧘🏻‍♂️🌍🌦️🍞🚗 movie|2544| the movie is very very good|{rating: 7.000000, stars: 10, views: 982, release: 2018-11-13 13:33:11, film: 2014-09-12, u8: 12, u16: 120, u32: 55, u64: 1, hugedata: -1844674407370955161511}|\xAB\xCD|{audience1=33}|8.989000 Roma|298|the movie is very interesting and funny|{rating: 1223.000000, stars: 100, views: 10003, release: 2011-02-11 16:44:22, film: 2013-02-22, u8: 1, u16: 15, u32: 200, u64: 4, hugedata: -15}|pure ascii characters|{}|254.000000 --STATEMENT COPY (MATCH (p:person)-[s:studyAt]->(o:organisation) RETURN p.ID, s.level, s.places, o.ID) TO "${DATABASE_PATH}/studyAt.csv" +-STATEMENT COPY (MATCH (p:person)-[s:studyAt]->(o:organisation) RETURN p.ID, s.level, s.places, o.ID) TO "${DATABASE_PATH}/studyAt.csv" (header=true) ---- ok --STATEMENT load from "${DATABASE_PATH}/studyAt.csv" return * +-STATEMENT load from "${DATABASE_PATH}/studyAt.csv"(header=true) return * ---- 3 0|5|[wwAewsdndweusd,wek]|1 2|120|[anew,jsdnwusklklklwewsd]|1 8|2|[awndsnjwejwen,isuhuwennjnuhuhuwewe]|1 --STATEMENT COPY (MATCH (p:person)-[e:knows]->(p1:person) RETURN p.ID, p1.ID, p.grades, p1.grades) TO "${DATABASE_PATH}/onehop.csv" +-STATEMENT COPY (MATCH (p:person)-[e:knows]->(p1:person) RETURN p.ID, p1.ID, p.grades, p1.grades) TO "${DATABASE_PATH}/onehop.csv" (header=true) ---- ok --STATEMENT load from "${DATABASE_PATH}/onehop.csv" return * +-STATEMENT load from "${DATABASE_PATH}/onehop.csv"(header=true) return * ---- 14 0|2|[96,54,86,92]|[98,42,93,88] 0|3|[96,54,86,92]|[91,75,21,95] @@ -55,23 +55,23 @@ Roma|298|the movie is very interesting and funny|{rating: 1223.000000, stars: 10 7|9|[96,59,65,88]|[43,83,67,43] -CASE CopyToWithNullAndEmptyList --STATEMENT COPY (RETURN NULL,[],[1,3,NULL,5],[[2,3],[2],NULL,[1,5,6]], [['a'], []]) TO "${DATABASE_PATH}/nullAndEmptyList.csv" +-STATEMENT COPY (RETURN NULL,[],[1,3,NULL,5],[[2,3],[2],NULL,[1,5,6]], [['a'], []]) TO "${DATABASE_PATH}/nullAndEmptyList.csv" (header=false) ---- ok --STATEMENT load from "${DATABASE_PATH}/nullAndEmptyList.csv" return * +-STATEMENT load from "${DATABASE_PATH}/nullAndEmptyList.csv" (header=false) return * ---- 1 |[]|[1,3,,5]|[[2,3],[2],,[1,5,6]]|[[a],[]] -CASE StringEscapeCopyTo --STATEMENT COPY (RETURN 100,'a string with "quotes"',5.6,'","',',') TO "${DATABASE_PATH}/string.csv" +-STATEMENT COPY (RETURN 100,'a string with "quotes"',5.6,'","',',') TO "${DATABASE_PATH}/string.csv" (header=false) ---- ok --STATEMENT LOAD FROM '${DATABASE_PATH}/string.csv' RETURN * +-STATEMENT LOAD FROM '${DATABASE_PATH}/string.csv'(header=false) RETURN * ---- 1 100|a string with "quotes"|5.600000|","|, -CASE StringCopyToWithOption --STATEMENT COPY (RETURN 100,'kuzu is # a |graph database') TO "${DATABASE_PATH}/copy_to_with_option.csv" (delim = '|', QUOTE='#', Escape = '!') +-STATEMENT COPY (RETURN 100,'kuzu is # a |graph database') TO "${DATABASE_PATH}/copy_to_with_option.csv" (delim = '|', QUOTE='#', Escape = '!', header = false) ---- ok --STATEMENT LOAD FROM '${DATABASE_PATH}/copy_to_with_option.csv' (delim = '|', QUOTE='#', Escape = '!') RETURN * +-STATEMENT LOAD FROM '${DATABASE_PATH}/copy_to_with_option.csv' (delim = '|', QUOTE='#', Escape = '!', header=false) RETURN * ---- 1 100|kuzu is # a |graph database