Skip to content

Commit

Permalink
Merge pull request #1979 from kuzudb/csv-reader
Browse files Browse the repository at this point in the history
Reader function refactor
  • Loading branch information
acquamarin committed Aug 31, 2023
2 parents 957820b + a1997be commit a260a35
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 227 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.8 LANGUAGES CXX)
project(Kuzu VERSION 0.0.8.1 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
48 changes: 48 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,54 @@ void ValueVector::copyFromVectorData(
}
}

void ValueVector::copyFromValue(uint64_t pos, const Value& value) {
auto dstValue = valueBuffer.get() + pos * numBytesPerValue;
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::INT64: {
memcpy(dstValue, &value.val.int64Val, numBytesPerValue);
} break;
case PhysicalTypeID::INT32: {
memcpy(dstValue, &value.val.int32Val, numBytesPerValue);
} break;
case PhysicalTypeID::INT16: {
memcpy(dstValue, &value.val.int16Val, numBytesPerValue);
} break;
case PhysicalTypeID::DOUBLE: {
memcpy(dstValue, &value.val.doubleVal, numBytesPerValue);
} break;
case PhysicalTypeID::FLOAT: {
memcpy(dstValue, &value.val.floatVal, numBytesPerValue);
} break;
case PhysicalTypeID::BOOL: {
memcpy(dstValue, &value.val.booleanVal, numBytesPerValue);
} break;
case PhysicalTypeID::INTERVAL: {
memcpy(dstValue, &value.val.intervalVal, numBytesPerValue);
} break;
case PhysicalTypeID::STRING: {
StringVector::addString(
this, *(ku_string_t*)dstValue, value.strVal.data(), value.strVal.length());
} break;
case PhysicalTypeID::VAR_LIST: {
auto listEntry = reinterpret_cast<list_entry_t*>(dstValue);
auto numValues = NestedVal::getChildrenSize(&value);
*listEntry = ListVector::addList(this, numValues);
auto dstDataVector = ListVector::getDataVector(this);
for (auto i = 0u; i < numValues; ++i) {
dstDataVector->copyFromValue(listEntry->offset + i, *NestedVal::getChildVal(&value, i));
}
} break;
case PhysicalTypeID::STRUCT: {
auto structFields = StructVector::getFieldVectors(this);
for (auto i = 0u; i < structFields.size(); ++i) {
structFields[i]->copyFromValue(pos, *NestedVal::getChildVal(&value, i));
}
} break;
default:
throw NotImplementedException("ValueVector::copyFromValue");
}
}

void ValueVector::resetAuxiliaryBuffer() {
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::STRING: {
Expand Down
50 changes: 2 additions & 48 deletions src/expression_evaluator/literal_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,11 @@ bool LiteralExpressionEvaluator::select(SelectionVector& selVector) {
void LiteralExpressionEvaluator::resolveResultVector(
const processor::ResultSet& resultSet, MemoryManager* memoryManager) {
resultVector = std::make_shared<ValueVector>(*value->getDataType(), memoryManager);
resultVector->setState(DataChunkState::getSingleValueDataChunkState());
if (value->isNull()) {
resultVector->setNull(0 /* pos */, true);
} else {
copyValueToVector(resultVector->getData(), resultVector.get(), value.get());
}
resultVector->setState(DataChunkState::getSingleValueDataChunkState());
}

void LiteralExpressionEvaluator::copyValueToVector(
uint8_t* dstValue, ValueVector* dstVector, const Value* srcValue) {
auto numBytesPerValue = dstVector->getNumBytesPerValue();
switch (srcValue->getDataType()->getPhysicalType()) {
case PhysicalTypeID::INT64: {
memcpy(dstValue, &srcValue->val.int64Val, numBytesPerValue);
} break;
case PhysicalTypeID::INT32: {
memcpy(dstValue, &srcValue->val.int32Val, numBytesPerValue);
} break;
case PhysicalTypeID::INT16: {
memcpy(dstValue, &srcValue->val.int16Val, numBytesPerValue);
} break;
case PhysicalTypeID::DOUBLE: {
memcpy(dstValue, &srcValue->val.doubleVal, numBytesPerValue);
} break;
case PhysicalTypeID::FLOAT: {
memcpy(dstValue, &srcValue->val.floatVal, numBytesPerValue);
} break;
case PhysicalTypeID::BOOL: {
memcpy(dstValue, &srcValue->val.booleanVal, numBytesPerValue);
} break;
case PhysicalTypeID::INTERVAL: {
memcpy(dstValue, &srcValue->val.intervalVal, numBytesPerValue);
} break;
case PhysicalTypeID::STRING: {
StringVector::addString(
dstVector, *(ku_string_t*)dstValue, srcValue->strVal.data(), srcValue->strVal.length());
} break;
case PhysicalTypeID::VAR_LIST: {
auto listListEntry = reinterpret_cast<list_entry_t*>(dstValue);
auto numValues = NestedVal::getChildrenSize(srcValue);
*listListEntry = ListVector::addList(dstVector, numValues);
auto dstDataVector = ListVector::getDataVector(dstVector);
auto dstElements = ListVector::getListValues(dstVector, *listListEntry);
for (auto i = 0u; i < numValues; ++i) {
copyValueToVector(dstElements + i * dstDataVector->getNumBytesPerValue(), dstDataVector,
NestedVal::getChildVal(srcValue, i));
}
} break;
default:
throw NotImplementedException("Unimplemented setLiteral() for type " +
LogicalTypeUtils::dataTypeToString(dstVector->dataType));
resultVector->copyFromValue(resultVector->state->selVector->selectedPositions[0], *value);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace common {

class FileInfo;

using sel_t = uint16_t;
using sel_t = uint32_t;
using hash_t = uint64_t;
using page_idx_t = uint32_t;
using frame_idx_t = page_idx_t;
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class ValueVector {
uint8_t* dstData, const ValueVector* srcVector, const uint8_t* srcVectorData);
void copyFromVectorData(uint64_t dstPos, const ValueVector* srcVector, uint64_t srcPos);

void copyFromValue(uint64_t pos, const Value& value);

inline uint8_t* getData() const { return valueBuffer.get(); }

inline offset_t readNodeOffset(uint32_t pos) const {
Expand Down
4 changes: 0 additions & 4 deletions src/include/expression_evaluator/literal_evaluator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ class LiteralExpressionEvaluator : public ExpressionEvaluator {
void resolveResultVector(
const processor::ResultSet& resultSet, storage::MemoryManager* memoryManager) override;

private:
static void copyValueToVector(
uint8_t* dstValue, common::ValueVector* dstVector, const common::Value* srcValue);

private:
std::shared_ptr<common::Value> value;
};
Expand Down
22 changes: 11 additions & 11 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ class Reader : public PhysicalOperator {
public:
Reader(ReaderInfo readerInfo, std::shared_ptr<storage::ReaderSharedState> sharedState,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::READER, id, paramsString}, readerInfo{std::move(
readerInfo)},
sharedState{std::move(sharedState)}, leftNumRows{0}, readFuncData{nullptr} {}
: PhysicalOperator{PhysicalOperatorType::READER, id, paramsString},
readerInfo{std::move(readerInfo)}, sharedState{std::move(sharedState)}, readFuncData{
nullptr} {}

void initGlobalStateInternal(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

inline void initGlobalStateInternal(ExecutionContext* context) final {
sharedState->validate();
sharedState->countBlocks();
}
inline bool isSource() const final { return true; }

inline std::unique_ptr<PhysicalOperator> clone() final {
Expand All @@ -36,14 +36,14 @@ class Reader : public PhysicalOperator {
bool getNextTuplesInternal(ExecutionContext* context) final;

private:
void getNextNodeGroupInSerial(std::shared_ptr<arrow::Table>& table);
void getNextNodeGroupInParallel(std::shared_ptr<arrow::Table>& table);
void getNextNodeGroupInSerial();
void getNextNodeGroupInParallel();

private:
ReaderInfo readerInfo;
std::shared_ptr<storage::ReaderSharedState> sharedState;
std::vector<std::shared_ptr<arrow::RecordBatch>> leftRecordBatches;
common::row_idx_t leftNumRows;
std::unique_ptr<common::DataChunk> dataChunkToRead;
storage::LeftArrowArrays leftArrowArrays;

// For parallel reading.
std::unique_ptr<storage::ReaderFunctionData> readFuncData;
Expand Down
46 changes: 25 additions & 21 deletions src/include/storage/copier/reader_state.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/data_chunk/data_chunk.h"
#include "storage/copier/npy_reader.h"
#include "storage/copier/table_copy_utils.h"

Expand Down Expand Up @@ -68,12 +69,19 @@ struct ReaderMorsel {
common::row_idx_t rowIdx;
};

struct SerialReaderMorsel : public ReaderMorsel {
SerialReaderMorsel(common::vector_idx_t fileIdx, common::block_idx_t blockIdx,
common::row_idx_t rowIdx, std::shared_ptr<arrow::Table> table)
: ReaderMorsel{fileIdx, blockIdx, rowIdx}, table{std::move(table)} {}
class LeftArrowArrays {
public:
explicit LeftArrowArrays() : leftNumRows{0} {}

inline uint64_t getLeftNumRows() const { return leftNumRows; }

void appendFromDataChunk(common::DataChunk* dataChunk);

std::shared_ptr<arrow::Table> table;
void appendToDataChunk(common::DataChunk* dataChunk, uint64_t numRowsToAppend);

private:
common::row_idx_t leftNumRows;
std::vector<arrow::ArrayVector> leftArrays;
};

using validate_func_t =
Expand All @@ -84,8 +92,8 @@ using init_reader_data_func_t = std::function<std::unique_ptr<ReaderFunctionData
using count_blocks_func_t =
std::function<std::vector<FileBlocksInfo>(std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema)>;
using read_rows_func_t = std::function<arrow::RecordBatchVector(
const ReaderFunctionData& functionData, common::block_idx_t blockIdx)>;
using read_rows_func_t = std::function<void(
const ReaderFunctionData& functionData, common::block_idx_t blockIdx, common::DataChunk*)>;

struct ReaderFunctions {
static validate_func_t getValidateFunc(common::CopyDescription::FileType fileType);
Expand Down Expand Up @@ -121,12 +129,12 @@ struct ReaderFunctions {
common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);

static arrow::RecordBatchVector readRowsFromCSVFile(
const ReaderFunctionData& functionData, common::block_idx_t blockIdx);
static arrow::RecordBatchVector readRowsFromParquetFile(
const ReaderFunctionData& functionData, common::block_idx_t blockIdx);
static arrow::RecordBatchVector readRowsFromNPYFile(
const ReaderFunctionData& functionData, common::block_idx_t blockIdx);
static void readRowsFromCSVFile(const ReaderFunctionData& functionData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromParquetFile(const ReaderFunctionData& functionData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromNPYFile(const ReaderFunctionData& functionData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
};

class ReaderSharedState {
Expand All @@ -137,8 +145,7 @@ class ReaderSharedState {
std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema)
: fileType{fileType}, filePaths{std::move(filePaths)}, csvReaderConfig{csvReaderConfig},
tableSchema{tableSchema}, numRows{0}, currFileIdx{0}, currBlockIdx{0}, currRowIdx{0},
leftRecordBatches{}, leftNumRows{0} {
tableSchema{tableSchema}, numRows{0}, currFileIdx{0}, currBlockIdx{0}, currRowIdx{0} {
validateFunc = ReaderFunctions::getValidateFunc(fileType);
initFunc = ReaderFunctions::getInitDataFunc(fileType);
countBlocksFunc = ReaderFunctions::getCountBlocksFunc(fileType);
Expand All @@ -148,7 +155,7 @@ class ReaderSharedState {
void validate();
void countBlocks();

std::unique_ptr<ReaderMorsel> getSerialMorsel();
std::unique_ptr<ReaderMorsel> getSerialMorsel(common::DataChunk* vectorsToRead);
std::unique_ptr<ReaderMorsel> getParallelMorsel();

inline void lock() { mtx.lock(); }
Expand All @@ -158,9 +165,6 @@ class ReaderSharedState {
private:
std::unique_ptr<ReaderMorsel> getMorselOfNextBlock();

static std::shared_ptr<arrow::Table> constructTableFromBatches(
std::vector<std::shared_ptr<arrow::RecordBatch>>& recordBatches);

public:
std::mutex mtx;

Expand All @@ -182,8 +186,8 @@ class ReaderSharedState {
common::block_idx_t currBlockIdx;
common::row_idx_t currRowIdx;

std::vector<std::shared_ptr<arrow::RecordBatch>> leftRecordBatches;
common::row_idx_t leftNumRows;
private:
LeftArrowArrays leftArrowArrays;
};

} // namespace storage
Expand Down
11 changes: 7 additions & 4 deletions src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ class RelCopier {
static void indexLookup(arrow::Array* pkArray, const common::LogicalType& pkColumnType,
PrimaryKeyIndex* pkIndex, common::offset_t* offsets);

void copyRelColumnsOrCountRelListsSize(common::row_idx_t rowIdx,
arrow::RecordBatch* recordBatch, common::RelDataDirection direction,
void copyRelColumnsOrCountRelListsSize(common::row_idx_t rowIdx, arrow::ArrayVector& arrays,
common::RelDataDirection direction,
const std::vector<std::unique_ptr<arrow::Array>>& pkOffsets);

void copyRelColumns(common::row_idx_t rowIdx, arrow::RecordBatch* recordBatch,
void copyRelColumns(common::row_idx_t rowIdx, arrow::ArrayVector& arrays,
common::RelDataDirection direction,
const std::vector<std::unique_ptr<arrow::Array>>& pkOffsets);
void countRelListsSize(common::RelDataDirection direction,
const std::vector<std::unique_ptr<arrow::Array>>& pkOffsets);
void copyRelLists(common::row_idx_t rowIdx, arrow::RecordBatch* recordBatch,
void copyRelLists(common::row_idx_t rowIdx, arrow::ArrayVector& arrays,
common::RelDataDirection direction,
const std::vector<std::unique_ptr<arrow::Array>>& pkOffsets);
void checkViolationOfRelColumn(
Expand All @@ -84,6 +84,9 @@ class RelCopier {
std::unique_ptr<storage::ReaderFunctionData> readFuncData;
storage::read_rows_func_t readFunc;
storage::init_reader_data_func_t initFunc;

protected:
std::unique_ptr<common::DataChunk> dataChunkToRead;
};

class RelListsCounterAndColumnCopier : public RelCopier {
Expand Down
8 changes: 4 additions & 4 deletions src/include/storage/storage_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ using storage_version_t = uint64_t;

struct StorageVersionInfo {
static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.0.8", 17}, {"0.0.7.1", 16}, {"0.0.7", 15}, {"0.0.6.5", 14}, {"0.0.6.4", 13},
{"0.0.6.3", 12}, {"0.0.6.2", 11}, {"0.0.6.1", 10}, {"0.0.6", 9}, {"0.0.5", 8},
{"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3},
{"0.0.3.1", 2}, {"0.0.3", 1}};
return {{"0.0.8.1", 17}, {"0.0.8", 17}, {"0.0.7.1", 16}, {"0.0.7", 15}, {"0.0.6.5", 14},
{"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11}, {"0.0.6.1", 10}, {"0.0.6", 9},
{"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4},
{"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
}

static storage_version_t getStorageVersion();
Expand Down
Loading

0 comments on commit a260a35

Please sign in to comment.