Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Jun 1, 2023
1 parent 04ac622 commit 011255c
Show file tree
Hide file tree
Showing 13 changed files with 64 additions and 49 deletions.
11 changes: 2 additions & 9 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,13 @@ void ValueVector::initializeValueBuffer() {
}
}

void ArrowColumnVector::setStartOffsetAndColumn(kuzu::common::ValueVector* vector,
kuzu::common::offset_t startOffset, std::shared_ptr<arrow::Array> column) {
void ArrowColumnVector::setArrowColumn(
kuzu::common::ValueVector* vector, std::shared_ptr<arrow::Array> column) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
arrowColumnBuffer->startOffset = startOffset;
arrowColumnBuffer->column = std::move(column);
}

offset_t ArrowColumnVector::getEndOffset(kuzu::common::ValueVector* vector) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
return arrowColumnBuffer->startOffset + arrowColumnBuffer->column->length() - 1;
}

template void ValueVector::setValue<nodeID_t>(uint32_t pos, nodeID_t val);
template void ValueVector::setValue<bool>(uint32_t pos, bool val);
template void ValueVector::setValue<int64_t>(uint32_t pos, int64_t val);
Expand Down
1 change: 0 additions & 1 deletion src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class ArrowColumnAuxiliaryBuffer : public AuxiliaryBuffer {
friend class ArrowColumnVector;

private:
common::offset_t startOffset;
std::shared_ptr<arrow::Array> column;
};

Expand Down
13 changes: 3 additions & 10 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,12 @@ class StructVector {

class ArrowColumnVector {
public:
static inline offset_t getStartOffset(ValueVector* vector) {
return reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->startOffset;
}

static inline std::shared_ptr<arrow::Array> getColumn(ValueVector* vector) {
static inline std::shared_ptr<arrow::Array> getArrowColumn(ValueVector* vector) {
return reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->column;
}

static void setStartOffsetAndColumn(kuzu::common::ValueVector* vector, offset_t startOffset,
std::shared_ptr<arrow::Array> column);

static offset_t getEndOffset(ValueVector* vector);
static void setArrowColumn(
kuzu::common::ValueVector* vector, std::shared_ptr<arrow::Array> column);
};

class NodeIDVector {
Expand Down
19 changes: 13 additions & 6 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ class LogicalCopy : public LogicalOperator {
public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector arrowColumnExpressions,
std::shared_ptr<binder::Expression> offsetExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
arrowColumnExpressions{std::move(arrowColumnExpressions)}, outputExpression{std::move(
outputExpression)} {}
: LogicalOperator{LogicalOperatorType::COPY}, copyDescription{copyDescription},
tableID{tableID}, tableName{std::move(tableName)}, arrowColumnExpressions{std::move(
arrowColumnExpressions)},
offsetExpression{std::move(offsetExpression)}, outputExpression{
std::move(outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

Expand All @@ -28,6 +30,10 @@ class LogicalCopy : public LogicalOperator {
return arrowColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getOffsetExpression() const {
return offsetExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}
Expand All @@ -36,8 +42,8 @@ class LogicalCopy : public LogicalOperator {
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(
copyDescription, tableID, tableName, arrowColumnExpressions, outputExpression);
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, arrowColumnExpressions,
offsetExpression, outputExpression);
}

private:
Expand All @@ -46,6 +52,7 @@ class LogicalCopy : public LogicalOperator {
// Used for printing only.
std::string tableName;
binder::expression_vector arrowColumnExpressions;
std::shared_ptr<binder::Expression> offsetExpression;
std::shared_ptr<binder::Expression> outputExpression;
};

Expand Down
8 changes: 6 additions & 2 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ class CopyNodeSharedState {
struct CopyNodeLocalState {
CopyNodeLocalState(common::CopyDescription copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
std::vector<DataPos> arrowColumnPoses)
DataPos offsetVectorPos, std::vector<DataPos> arrowColumnPoses)
: copyDesc{std::move(copyDesc)}, table{table}, relsStore{relsStore}, catalog{catalog},
wal{wal}, arrowColumnPoses{std::move(arrowColumnPoses)} {}
wal{wal}, offsetVectorPos{std::move(offsetVectorPos)}, arrowColumnPoses{
std::move(arrowColumnPoses)} {}

common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
DataPos offsetVectorPos;
common::ValueVector* offsetVector;
std::vector<DataPos> arrowColumnPoses;
std::vector<common::ValueVector*> arrowColumnVectors;
};
Expand All @@ -60,6 +63,7 @@ class CopyNode : public Sink {
localState{std::move(localState)}, sharedState{std::move(sharedState)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
localState->offsetVector = resultSet->getValueVector(localState->offsetVectorPos).get();
for (auto& arrowColumnPos : localState->arrowColumnPoses) {
localState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
Expand Down
10 changes: 6 additions & 4 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ class ReadCSVSharedState : public ReadFileSharedState {

class ReadCSV : public ReadFile {
public:
ReadCSV(std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
uint32_t id, const std::string& paramsString)
: ReadFile{std::move(arrowColumnPoses), std::move(sharedState),
ReadCSV(std::vector<DataPos> arrowColumnPoses, DataPos offsetVectorPos,
std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{std::move(arrowColumnPoses), std::move(offsetVectorPos), std::move(sharedState),
PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
Expand All @@ -48,7 +49,8 @@ class ReadCSV : public ReadFile {
}

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadCSV>(arrowColumnPoses, sharedState, id, paramsString);
return std::make_unique<ReadCSV>(
arrowColumnPoses, offsetVectorPos, sharedState, id, paramsString);
}
};

Expand Down
10 changes: 7 additions & 3 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ class ReadFileSharedState {

class ReadFile : public PhysicalOperator {
public:
ReadFile(std::vector<DataPos> arrowColumnPoses,
ReadFile(std::vector<DataPos> arrowColumnPoses, DataPos offsetVectorPos,
std::shared_ptr<ReadFileSharedState> sharedState, PhysicalOperatorType operatorType,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString},
arrowColumnPoses{std::move(arrowColumnPoses)}, sharedState{std::move(sharedState)} {}
: PhysicalOperator{operatorType, id, paramsString}, arrowColumnPoses{std::move(
arrowColumnPoses)},
offsetVectorPos{std::move(offsetVectorPos)}, sharedState{std::move(sharedState)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
offsetVector = resultSet->getValueVector(offsetVectorPos).get();
for (auto& arrowColumnPos : arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
Expand All @@ -76,6 +78,8 @@ class ReadFile : public PhysicalOperator {
protected:
std::shared_ptr<ReadFileSharedState> sharedState;
std::vector<DataPos> arrowColumnPoses;
DataPos offsetVectorPos;
common::ValueVector* offsetVector;
std::vector<common::ValueVector*> arrowColumnVectors;
};

Expand Down
7 changes: 4 additions & 3 deletions src/include/processor/operator/copy/read_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ class ReadParquetSharedState : public ReadFileSharedState {

class ReadParquet : public ReadFile {
public:
ReadParquet(std::vector<DataPos> arrowColumnPoses,
ReadParquet(std::vector<DataPos> arrowColumnPoses, DataPos offsetVectorPos,
std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{std::move(arrowColumnPoses), std::move(sharedState),
: ReadFile{std::move(arrowColumnPoses), std::move(offsetVectorPos), std::move(sharedState),
PhysicalOperatorType::READ_PARQUET, id, paramsString} {}

std::shared_ptr<arrow::RecordBatch> readTuples(std::unique_ptr<ReadFileMorsel> morsel) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadParquet>(arrowColumnPoses, sharedState, id, paramsString);
return std::make_unique<ReadParquet>(
arrowColumnPoses, offsetVectorPos, sharedState, id, paramsString);
}

private:
Expand Down
4 changes: 3 additions & 1 deletion src/planner/operator/logical_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ void LogicalCopy::computeFactorizedSchema() {
createEmptySchema();
auto groupPos = schema->createGroup();
schema->insertToGroupAndScope(arrowColumnExpressions, groupPos);
schema->insertToGroupAndScope(offsetExpression, groupPos);
schema->insertToGroupAndScope(outputExpression, groupPos);
schema->setGroupAsSingleState(groupPos);
}
Expand All @@ -15,8 +16,9 @@ void LogicalCopy::computeFlatSchema() {
createEmptySchema();
schema->createGroup();
schema->insertToGroupAndScope(arrowColumnExpressions, 0);
schema->insertToGroupAndScope(offsetExpression, 0);
schema->insertToGroupAndScope(outputExpression, 0);
}

} // namespace planner
} // namespace kuzu
} // namespace kuzu
2 changes: 2 additions & 0 deletions src/planner/planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ std::unique_ptr<LogicalPlan> Planner::planCopy(
}
auto copyCSV = make_shared<LogicalCopy>(copyCSVClause.getCopyDescription(),
copyCSVClause.getTableID(), copyCSVClause.getTableName(), std::move(arrowColumnExpressions),
std::make_shared<VariableExpression>(
common::LogicalType{common::LogicalTypeID::INT64}, "startOffset", "startOffset"),
copyCSVClause.getStatementResult()->getSingleExpressionToCollect());
plan->setLastOperator(std::move(copyCSV));
return plan;
Expand Down
13 changes: 8 additions & 5 deletions src/processor/mapper/map_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,27 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyToPhysical(
for (auto& arrowColumnPos : arrowColumnExpressions) {
arrowColumnPoses.emplace_back(outSchema->getExpressionPos(*arrowColumnPos));
}
auto offsetExpression = copy->getOffsetExpression();
auto offsetVectorPos = DataPos(outSchema->getExpressionPos(*offsetExpression));
if (copy->getCopyDescription().fileType == common::CopyDescription::FileType::CSV) {
readFileSharedState = std::make_shared<ReadCSVSharedState>(
*copy->getCopyDescription().csvReaderConfig,
catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID()),
copy->getCopyDescription().filePaths);
readFile = std::make_unique<ReadCSV>(arrowColumnPoses, readFileSharedState,
getOperatorID(), copy->getExpressionsForPrinting());
readFile = std::make_unique<ReadCSV>(arrowColumnPoses, offsetVectorPos,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
} else {
readFileSharedState =
std::make_shared<ReadParquetSharedState>(copy->getCopyDescription().filePaths);
readFile = std::make_unique<ReadParquet>(arrowColumnPoses, readFileSharedState,
getOperatorID(), copy->getExpressionsForPrinting());
readFile = std::make_unique<ReadParquet>(arrowColumnPoses, offsetVectorPos,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
}
auto copyNodeSharedState =
std::make_shared<CopyNodeSharedState>(readFileSharedState->numRows, memoryManager);
auto localState = std::make_unique<CopyNodeLocalState>(copy->getCopyDescription(),
storageManager.getNodesStore().getNodeTable(copy->getTableID()),
&storageManager.getRelsStore(), catalog, storageManager.getWAL(), arrowColumnPoses);
&storageManager.getRelsStore(), catalog, storageManager.getWAL(), offsetVectorPos,
arrowColumnPoses);
auto copyNode = std::make_unique<CopyNode>(std::move(localState), copyNodeSharedState,
std::make_unique<ResultSetDescriptor>(copy->getSchema()), std::move(readFile),
getOperatorID(), copy->getExpressionsForPrinting());
Expand Down
10 changes: 7 additions & 3 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) {
while (children[0]->getNextTuple(context)) {
std::vector<std::unique_ptr<storage::InMemColumnChunk>> columnChunks;
columnChunks.reserve(sharedState->columns.size());
auto offsetVector = localState->offsetVector;
auto startOffset =
common::ArrowColumnVector::getStartOffset(localState->arrowColumnVectors[0]);
auto endOffset = common::ArrowColumnVector::getEndOffset(localState->arrowColumnVectors[0]);
offsetVector->getValue<int64_t>(offsetVector->state->selVector->selectedPositions[0]);
auto endOffset =
startOffset +
common::ArrowColumnVector::getArrowColumn(localState->arrowColumnVectors[0])->length() -
1;
for (auto i = 0u; i < sharedState->columns.size(); i++) {
auto columnChunk = sharedState->columns[i]->getInMemColumnChunk(
startOffset, endOffset, &localState->copyDesc);
columnChunk->copyArrowArray(
*common::ArrowColumnVector::getColumn(localState->arrowColumnVectors[i]));
*common::ArrowColumnVector::getArrowColumn(localState->arrowColumnVectors[i]));
columnChunks.push_back(std::move(columnChunk));
}
flushChunksAndPopulatePKIndex(columnChunks, startOffset, endOffset);
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/copy/read_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ bool ReadFile::getNextTuplesInternal(kuzu::processor::ExecutionContext* context)
return false;
}
auto startOffset = morsel->nodeOffset;
offsetVector->setValue(offsetVector->state->selVector->selectedPositions[0], startOffset);
auto recordBatch = readTuples(std::move(morsel));
for (auto i = 0u; i < arrowColumnVectors.size(); i++) {
common::ArrowColumnVector::setStartOffsetAndColumn(
arrowColumnVectors[i], startOffset, recordBatch->column((int)i));
common::ArrowColumnVector::setArrowColumn(
arrowColumnVectors[i], recordBatch->column((int)i));
}
return true;
}
Expand Down

0 comments on commit 011255c

Please sign in to comment.