Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader function refactor #1979

Merged
merged 1 commit into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.8 LANGUAGES CXX)
project(Kuzu VERSION 0.0.8.1 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
48 changes: 48 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,54 @@
}
}

void ValueVector::copyFromValue(uint64_t pos, const Value& value) {
auto dstValue = valueBuffer.get() + pos * numBytesPerValue;
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::INT64: {
memcpy(dstValue, &value.val.int64Val, numBytesPerValue);
} break;
case PhysicalTypeID::INT32: {
memcpy(dstValue, &value.val.int32Val, numBytesPerValue);
} break;
case PhysicalTypeID::INT16: {
memcpy(dstValue, &value.val.int16Val, numBytesPerValue);
} break;
case PhysicalTypeID::DOUBLE: {
memcpy(dstValue, &value.val.doubleVal, numBytesPerValue);
} break;
case PhysicalTypeID::FLOAT: {
memcpy(dstValue, &value.val.floatVal, numBytesPerValue);
} break;
case PhysicalTypeID::BOOL: {
memcpy(dstValue, &value.val.booleanVal, numBytesPerValue);
} break;
case PhysicalTypeID::INTERVAL: {
memcpy(dstValue, &value.val.intervalVal, numBytesPerValue);
} break;
case PhysicalTypeID::STRING: {
StringVector::addString(
this, *(ku_string_t*)dstValue, value.strVal.data(), value.strVal.length());
} break;
case PhysicalTypeID::VAR_LIST: {
auto listEntry = reinterpret_cast<list_entry_t*>(dstValue);
auto numValues = NestedVal::getChildrenSize(&value);
*listEntry = ListVector::addList(this, numValues);
auto dstDataVector = ListVector::getDataVector(this);
for (auto i = 0u; i < numValues; ++i) {
dstDataVector->copyFromValue(listEntry->offset + i, *NestedVal::getChildVal(&value, i));
}
} break;
case PhysicalTypeID::STRUCT: {
auto structFields = StructVector::getFieldVectors(this);
for (auto i = 0u; i < structFields.size(); ++i) {
structFields[i]->copyFromValue(pos, *NestedVal::getChildVal(&value, i));

Check warning on line 166 in src/common/vector/value_vector.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/vector/value_vector.cpp#L164-L166

Added lines #L164 - L166 were not covered by tests
}
} break;
default:
throw NotImplementedException("ValueVector::copyFromValue");

Check warning on line 170 in src/common/vector/value_vector.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/vector/value_vector.cpp#L168-L170

Added lines #L168 - L170 were not covered by tests
}
}

void ValueVector::resetAuxiliaryBuffer() {
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::STRING: {
Expand Down
50 changes: 2 additions & 48 deletions src/expression_evaluator/literal_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,11 @@ bool LiteralExpressionEvaluator::select(SelectionVector& selVector) {
void LiteralExpressionEvaluator::resolveResultVector(
const processor::ResultSet& resultSet, MemoryManager* memoryManager) {
resultVector = std::make_shared<ValueVector>(*value->getDataType(), memoryManager);
resultVector->setState(DataChunkState::getSingleValueDataChunkState());
if (value->isNull()) {
resultVector->setNull(0 /* pos */, true);
} else {
copyValueToVector(resultVector->getData(), resultVector.get(), value.get());
}
resultVector->setState(DataChunkState::getSingleValueDataChunkState());
}

void LiteralExpressionEvaluator::copyValueToVector(
uint8_t* dstValue, ValueVector* dstVector, const Value* srcValue) {
auto numBytesPerValue = dstVector->getNumBytesPerValue();
switch (srcValue->getDataType()->getPhysicalType()) {
case PhysicalTypeID::INT64: {
memcpy(dstValue, &srcValue->val.int64Val, numBytesPerValue);
} break;
case PhysicalTypeID::INT32: {
memcpy(dstValue, &srcValue->val.int32Val, numBytesPerValue);
} break;
case PhysicalTypeID::INT16: {
memcpy(dstValue, &srcValue->val.int16Val, numBytesPerValue);
} break;
case PhysicalTypeID::DOUBLE: {
memcpy(dstValue, &srcValue->val.doubleVal, numBytesPerValue);
} break;
case PhysicalTypeID::FLOAT: {
memcpy(dstValue, &srcValue->val.floatVal, numBytesPerValue);
} break;
case PhysicalTypeID::BOOL: {
memcpy(dstValue, &srcValue->val.booleanVal, numBytesPerValue);
} break;
case PhysicalTypeID::INTERVAL: {
memcpy(dstValue, &srcValue->val.intervalVal, numBytesPerValue);
} break;
case PhysicalTypeID::STRING: {
StringVector::addString(
dstVector, *(ku_string_t*)dstValue, srcValue->strVal.data(), srcValue->strVal.length());
} break;
case PhysicalTypeID::VAR_LIST: {
auto listListEntry = reinterpret_cast<list_entry_t*>(dstValue);
auto numValues = NestedVal::getChildrenSize(srcValue);
*listListEntry = ListVector::addList(dstVector, numValues);
auto dstDataVector = ListVector::getDataVector(dstVector);
auto dstElements = ListVector::getListValues(dstVector, *listListEntry);
for (auto i = 0u; i < numValues; ++i) {
copyValueToVector(dstElements + i * dstDataVector->getNumBytesPerValue(), dstDataVector,
NestedVal::getChildVal(srcValue, i));
}
} break;
default:
throw NotImplementedException("Unimplemented setLiteral() for type " +
LogicalTypeUtils::dataTypeToString(dstVector->dataType));
resultVector->copyFromValue(resultVector->state->selVector->selectedPositions[0], *value);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace common {

class FileInfo;

using sel_t = uint16_t;
using sel_t = uint32_t;
using hash_t = uint64_t;
using page_idx_t = uint32_t;
using frame_idx_t = page_idx_t;
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class ValueVector {
uint8_t* dstData, const ValueVector* srcVector, const uint8_t* srcVectorData);
void copyFromVectorData(uint64_t dstPos, const ValueVector* srcVector, uint64_t srcPos);

void copyFromValue(uint64_t pos, const Value& value);

inline uint8_t* getData() const { return valueBuffer.get(); }

inline offset_t readNodeOffset(uint32_t pos) const {
Expand Down
4 changes: 0 additions & 4 deletions src/include/expression_evaluator/literal_evaluator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ class LiteralExpressionEvaluator : public ExpressionEvaluator {
void resolveResultVector(
const processor::ResultSet& resultSet, storage::MemoryManager* memoryManager) override;

private:
static void copyValueToVector(
uint8_t* dstValue, common::ValueVector* dstVector, const common::Value* srcValue);

private:
std::shared_ptr<common::Value> value;
};
Expand Down
22 changes: 11 additions & 11 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ class Reader : public PhysicalOperator {
public:
Reader(ReaderInfo readerInfo, std::shared_ptr<storage::ReaderSharedState> sharedState,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::READER, id, paramsString}, readerInfo{std::move(
readerInfo)},
sharedState{std::move(sharedState)}, leftNumRows{0}, readFuncData{nullptr} {}
: PhysicalOperator{PhysicalOperatorType::READER, id, paramsString},
readerInfo{std::move(readerInfo)}, sharedState{std::move(sharedState)}, readFuncData{
nullptr} {}

void initGlobalStateInternal(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

inline void initGlobalStateInternal(ExecutionContext* context) final {
sharedState->validate();
sharedState->countBlocks();
}
inline bool isSource() const final { return true; }

inline std::unique_ptr<PhysicalOperator> clone() final {
Expand All @@ -36,14 +36,14 @@ class Reader : public PhysicalOperator {
bool getNextTuplesInternal(ExecutionContext* context) final;

private:
void getNextNodeGroupInSerial(std::shared_ptr<arrow::Table>& table);
void getNextNodeGroupInParallel(std::shared_ptr<arrow::Table>& table);
void getNextNodeGroupInSerial();
void getNextNodeGroupInParallel();

private:
ReaderInfo readerInfo;
std::shared_ptr<storage::ReaderSharedState> sharedState;
std::vector<std::shared_ptr<arrow::RecordBatch>> leftRecordBatches;
common::row_idx_t leftNumRows;
std::unique_ptr<common::DataChunk> dataChunkToRead;
storage::LeftArrowArrays leftArrowArrays;

// For parallel reading.
std::unique_ptr<storage::ReaderFunctionData> readFuncData;
Expand Down
46 changes: 25 additions & 21 deletions src/include/storage/copier/reader_state.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/data_chunk/data_chunk.h"
#include "storage/copier/npy_reader.h"
#include "storage/copier/table_copy_utils.h"

Expand Down Expand Up @@ -68,12 +69,19 @@
common::row_idx_t rowIdx;
};

struct SerialReaderMorsel : public ReaderMorsel {
SerialReaderMorsel(common::vector_idx_t fileIdx, common::block_idx_t blockIdx,
common::row_idx_t rowIdx, std::shared_ptr<arrow::Table> table)
: ReaderMorsel{fileIdx, blockIdx, rowIdx}, table{std::move(table)} {}
class LeftArrowArrays {

Check warning on line 72 in src/include/storage/copier/reader_state.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/copier/reader_state.h#L72

Added line #L72 was not covered by tests
public:
explicit LeftArrowArrays() : leftNumRows{0} {}

inline uint64_t getLeftNumRows() const { return leftNumRows; }

void appendFromDataChunk(common::DataChunk* dataChunk);

std::shared_ptr<arrow::Table> table;
void appendToDataChunk(common::DataChunk* dataChunk, uint64_t numRowsToAppend);

private:
common::row_idx_t leftNumRows;
std::vector<arrow::ArrayVector> leftArrays;
};

using validate_func_t =
Expand All @@ -84,8 +92,8 @@
using count_blocks_func_t =
std::function<std::vector<FileBlocksInfo>(std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema)>;
using read_rows_func_t = std::function<arrow::RecordBatchVector(
const ReaderFunctionData& functionData, common::block_idx_t blockIdx)>;
using read_rows_func_t = std::function<void(
const ReaderFunctionData& functionData, common::block_idx_t blockIdx, common::DataChunk*)>;

struct ReaderFunctions {
static validate_func_t getValidateFunc(common::CopyDescription::FileType fileType);
Expand Down Expand Up @@ -121,12 +129,12 @@
common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);

static arrow::RecordBatchVector readRowsFromCSVFile(
const ReaderFunctionData& functionData, common::block_idx_t blockIdx);
static arrow::RecordBatchVector readRowsFromParquetFile(
const ReaderFunctionData& functionData, common::block_idx_t blockIdx);
static arrow::RecordBatchVector readRowsFromNPYFile(
const ReaderFunctionData& functionData, common::block_idx_t blockIdx);
static void readRowsFromCSVFile(const ReaderFunctionData& functionData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromParquetFile(const ReaderFunctionData& functionData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromNPYFile(const ReaderFunctionData& functionData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
};

class ReaderSharedState {
Expand All @@ -137,8 +145,7 @@
std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema)
: fileType{fileType}, filePaths{std::move(filePaths)}, csvReaderConfig{csvReaderConfig},
tableSchema{tableSchema}, numRows{0}, currFileIdx{0}, currBlockIdx{0}, currRowIdx{0},
leftRecordBatches{}, leftNumRows{0} {
tableSchema{tableSchema}, numRows{0}, currFileIdx{0}, currBlockIdx{0}, currRowIdx{0} {
validateFunc = ReaderFunctions::getValidateFunc(fileType);
initFunc = ReaderFunctions::getInitDataFunc(fileType);
countBlocksFunc = ReaderFunctions::getCountBlocksFunc(fileType);
Expand All @@ -148,7 +155,7 @@
void validate();
void countBlocks();

std::unique_ptr<ReaderMorsel> getSerialMorsel();
std::unique_ptr<ReaderMorsel> getSerialMorsel(common::DataChunk* vectorsToRead);
std::unique_ptr<ReaderMorsel> getParallelMorsel();

inline void lock() { mtx.lock(); }
Expand All @@ -158,9 +165,6 @@
private:
std::unique_ptr<ReaderMorsel> getMorselOfNextBlock();

static std::shared_ptr<arrow::Table> constructTableFromBatches(
std::vector<std::shared_ptr<arrow::RecordBatch>>& recordBatches);

public:
std::mutex mtx;

Expand All @@ -182,8 +186,8 @@
common::block_idx_t currBlockIdx;
common::row_idx_t currRowIdx;

std::vector<std::shared_ptr<arrow::RecordBatch>> leftRecordBatches;
common::row_idx_t leftNumRows;
private:
LeftArrowArrays leftArrowArrays;
};

} // namespace storage
Expand Down
11 changes: 7 additions & 4 deletions src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ class RelCopier {
static void indexLookup(arrow::Array* pkArray, const common::LogicalType& pkColumnType,
PrimaryKeyIndex* pkIndex, common::offset_t* offsets);

void copyRelColumnsOrCountRelListsSize(common::row_idx_t rowIdx,
arrow::RecordBatch* recordBatch, common::RelDataDirection direction,
void copyRelColumnsOrCountRelListsSize(common::row_idx_t rowIdx, arrow::ArrayVector& arrays,
common::RelDataDirection direction,
const std::vector<std::unique_ptr<arrow::Array>>& pkOffsets);

void copyRelColumns(common::row_idx_t rowIdx, arrow::RecordBatch* recordBatch,
void copyRelColumns(common::row_idx_t rowIdx, arrow::ArrayVector& arrays,
common::RelDataDirection direction,
const std::vector<std::unique_ptr<arrow::Array>>& pkOffsets);
void countRelListsSize(common::RelDataDirection direction,
const std::vector<std::unique_ptr<arrow::Array>>& pkOffsets);
void copyRelLists(common::row_idx_t rowIdx, arrow::RecordBatch* recordBatch,
void copyRelLists(common::row_idx_t rowIdx, arrow::ArrayVector& arrays,
common::RelDataDirection direction,
const std::vector<std::unique_ptr<arrow::Array>>& pkOffsets);
void checkViolationOfRelColumn(
Expand All @@ -84,6 +84,9 @@ class RelCopier {
std::unique_ptr<storage::ReaderFunctionData> readFuncData;
storage::read_rows_func_t readFunc;
storage::init_reader_data_func_t initFunc;

protected:
std::unique_ptr<common::DataChunk> dataChunkToRead;
};

class RelListsCounterAndColumnCopier : public RelCopier {
Expand Down
8 changes: 4 additions & 4 deletions src/include/storage/storage_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ using storage_version_t = uint64_t;

struct StorageVersionInfo {
static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.0.8", 17}, {"0.0.7.1", 16}, {"0.0.7", 15}, {"0.0.6.5", 14}, {"0.0.6.4", 13},
{"0.0.6.3", 12}, {"0.0.6.2", 11}, {"0.0.6.1", 10}, {"0.0.6", 9}, {"0.0.5", 8},
{"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3},
{"0.0.3.1", 2}, {"0.0.3", 1}};
return {{"0.0.8.1", 17}, {"0.0.8", 17}, {"0.0.7.1", 16}, {"0.0.7", 15}, {"0.0.6.5", 14},
{"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11}, {"0.0.6.1", 10}, {"0.0.6", 9},
{"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4},
{"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
}

static storage_version_t getStorageVersion();
Expand Down
Loading