Skip to content

Commit

Permalink
Remove arrow from npy reader
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Sep 26, 2023
1 parent 4628776 commit 97d3dd3
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 79 deletions.
12 changes: 1 addition & 11 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ expression_vector Binder::bindExpectedNodeFileColumns(
columns.push_back(createVariable(columnName, stringType));
}
} break;
case FileType::NPY:
case FileType::PARQUET:
case FileType::CSV: {
for (auto& property : tableSchema->properties) {
Expand All @@ -208,17 +209,6 @@ expression_vector Binder::bindExpectedNodeFileColumns(
columns.push_back(createVariable(property->getName(), *property->getDataType()));
}
} break;
case FileType::NPY: {
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
}
readerConfig.columnNames.push_back(property->getName());
readerConfig.columnTypes.push_back(property->getDataType()->copy());
columns.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
} break;
default: {
throw NotImplementedException{"Binder::bindCopyNodeColumns"};
}
Expand Down
9 changes: 3 additions & 6 deletions src/include/processor/operator/persistent/reader/npy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
#include <unordered_map>
#include <vector>

#include "common/data_chunk/data_chunk.h"
#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include <arrow/array.h>
#include <arrow/buffer.h>
#include <arrow/record_batch.h>

namespace kuzu {
namespace processor {
Expand All @@ -25,8 +23,7 @@ class NpyReader {

inline size_t getNumRows() const { return shape[0]; }

std::shared_ptr<arrow::DataType> getArrowType() const;
std::shared_ptr<arrow::RecordBatch> readBlock(common::block_idx_t blockIdx) const;
void readBlock(common::block_idx_t blockIdx, common::ValueVector* vectorToRead) const;

// Used in tests only.
inline common::LogicalTypeID getType() const { return type; }
Expand Down Expand Up @@ -54,7 +51,7 @@ class NpyMultiFileReader {
public:
explicit NpyMultiFileReader(const std::vector<std::string>& filePaths);

std::shared_ptr<arrow::RecordBatch> readBlock(common::block_idx_t blockIdx) const;
void readBlock(common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead) const;

private:
std::vector<std::unique_ptr<NpyReader>> fileReaders;
Expand Down
4 changes: 3 additions & 1 deletion src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class ColumnChunk {

common::offset_t getOffsetInBuffer(common::offset_t pos) const;

void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk);
virtual void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk);

protected:
common::LogicalType dataType;
Expand Down Expand Up @@ -261,6 +261,8 @@ class FixedListColumnChunk : public ColumnChunk {
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void write(const common::Value& fixedListVal, uint64_t posToWrite) final;

void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk) override;
};

class SerialColumnChunk : public ColumnChunk {
Expand Down
57 changes: 7 additions & 50 deletions src/processor/operator/persistent/reader/npy/npy_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,47 +207,13 @@ void NpyReader::validate(const LogicalType& type_, offset_t numRows) {
}
}

std::shared_ptr<arrow::DataType> NpyReader::getArrowType() const {
auto thisType = getType();
if (thisType == LogicalTypeID::DOUBLE) {
return arrow::float64();
} else if (thisType == LogicalTypeID::FLOAT) {
return arrow::float32();
} else if (thisType == LogicalTypeID::INT64) {
return arrow::int64();
} else if (thisType == LogicalTypeID::INT32) {
return arrow::int32();
} else if (thisType == LogicalTypeID::INT16) {
return arrow::int16();
} else {
throw CopyException("File type does not match any Arrow data type");
}
}

std::shared_ptr<arrow::RecordBatch> NpyReader::readBlock(block_idx_t blockIdx) const {
void NpyReader::readBlock(block_idx_t blockIdx, common::ValueVector* vectorToRead) const {
uint64_t rowNumber = DEFAULT_VECTOR_CAPACITY * blockIdx;
auto rowPointer = getPointerToRow(rowNumber);
auto arrowType = getArrowType();
auto numRowsToRead = std::min(DEFAULT_VECTOR_CAPACITY, getNumRows() - rowNumber);
auto buffer = std::make_shared<arrow::Buffer>(
rowPointer, numRowsToRead * arrowType->byte_width() * getNumElementsPerRow());
std::shared_ptr<arrow::Field> field;
std::shared_ptr<arrow::Array> arr;
if (getNumDimensions() > 1) {
auto elementField = std::make_shared<arrow::Field>(defaultFieldName, arrowType);
auto fixedListArrowType = arrow::fixed_size_list(elementField, (int32_t)numRowsToRead);
field = std::make_shared<arrow::Field>(defaultFieldName, fixedListArrowType);
auto valuesArr = std::make_shared<arrow::PrimitiveArray>(
arrowType, numRowsToRead * getNumElementsPerRow(), buffer);
arr = arrow::FixedSizeListArray::FromArrays(valuesArr, (int32_t)getNumElementsPerRow())
.ValueOrDie();
} else {
field = std::make_shared<arrow::Field>(defaultFieldName, arrowType);
arr = std::make_shared<arrow::PrimitiveArray>(arrowType, numRowsToRead, buffer);
}
auto schema =
std::make_shared<arrow::Schema>(std::vector<std::shared_ptr<arrow::Field>>{field});
return arrow::RecordBatch::Make(schema, (int64_t)numRowsToRead, {arr});
memcpy(
vectorToRead->getData(), rowPointer, numRowsToRead * vectorToRead->getNumBytesPerValue());
vectorToRead->state->selVector->selectedSize = numRowsToRead;
}

NpyMultiFileReader::NpyMultiFileReader(const std::vector<std::string>& filePaths) {
Expand All @@ -256,20 +222,11 @@ NpyMultiFileReader::NpyMultiFileReader(const std::vector<std::string>& filePaths
}
}

std::shared_ptr<arrow::RecordBatch> NpyMultiFileReader::readBlock(block_idx_t blockIdx) const {
void NpyMultiFileReader::readBlock(block_idx_t blockIdx, common::DataChunk* dataChunkToRead) const {
assert(fileReaders.size() > 1);
auto resultArrowBatch = fileReaders[0]->readBlock(blockIdx);
for (int fileIdx = 1; fileIdx < fileReaders.size(); fileIdx++) {
auto nextArrowBatch = fileReaders[fileIdx]->readBlock(blockIdx);
auto result = resultArrowBatch->AddColumn(
fileIdx, std::to_string(fileIdx), nextArrowBatch->column(0));
if (result.ok()) {
resultArrowBatch = result.ValueOrDie();
} else {
throw CopyException("Failed to read NPY file.");
}
for (auto i = 0u; i < fileReaders.size(); i++) {
fileReaders[i]->readBlock(blockIdx, dataChunkToRead->getValueVector(i).get());
}
return resultArrowBatch;
}

} // namespace processor
Expand Down
7 changes: 1 addition & 6 deletions src/processor/operator/persistent/reader_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,7 @@ void ReaderFunctions::readRowsFromNodeParquetFile(const ReaderFunctionData& func
void ReaderFunctions::readRowsFromNPYFile(const ReaderFunctionData& functionData,
common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead) {
auto& readerData = reinterpret_cast<const NPYReaderFunctionData&>(functionData);
auto recordBatch = readerData.reader->readBlock(blockIdx);
for (auto i = 0u; i < dataChunkToRead->getNumValueVectors(); i++) {
ArrowColumnVector::setArrowColumn(dataChunkToRead->getValueVector(i).get(),
std::make_shared<arrow::ChunkedArray>(recordBatch->column((int)i)));
}
dataChunkToRead->state->selVector->selectedSize = recordBatch->num_rows();
readerData.reader->readBlock(blockIdx, dataChunkToRead);
}

void ReaderFunctions::readRowsFromRDFFile(const ReaderFunctionData& functionData,
Expand Down
2 changes: 2 additions & 0 deletions src/processor/operator/persistent/reader_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ using namespace kuzu::catalog;
using namespace kuzu::common;
using namespace kuzu::storage;

#include "arrow/array.h"

namespace kuzu {
namespace processor {

Expand Down
29 changes: 24 additions & 5 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,17 @@ void FixedListColumnChunk::write(const Value& fixedListVal, uint64_t posToWrite)
}
}

void FixedListColumnChunk::copyVectorToBuffer(
common::ValueVector* vector, common::offset_t startPosInChunk) {
auto vectorDataToWriteFrom = vector->getData();
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
nullChunk->setNull(startPosInChunk + i, vector->isNull(pos));
memcpy(buffer.get() + getOffsetInBuffer(startPosInChunk + i),
vectorDataToWriteFrom + pos * numBytesPerValue, numBytesPerValue);
}
}

std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
const LogicalType& dataType, CSVReaderConfig* csvReaderConfig) {
auto csvReaderConfigCopy = csvReaderConfig ? csvReaderConfig->copy() : nullptr;
Expand Down Expand Up @@ -698,11 +709,19 @@ void ColumnChunk::copyVectorToBuffer(
common::ValueVector* vector, common::offset_t startPosInChunk) {
auto bufferToWrite = buffer.get() + startPosInChunk * numBytesPerValue;
auto vectorDataToWriteFrom = vector->getData();
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
nullChunk->setNull(startPosInChunk + i, vector->isNull(pos));
memcpy(bufferToWrite, vectorDataToWriteFrom + pos * numBytesPerValue, numBytesPerValue);
bufferToWrite += numBytesPerValue;
if (vector->state->selVector->isUnfiltered()) {
memcpy(bufferToWrite, vectorDataToWriteFrom,
vector->state->selVector->selectedSize * numBytesPerValue);
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
nullChunk->setNull(startPosInChunk + i, vector->isNull(i));
}
} else {
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
nullChunk->setNull(startPosInChunk + i, vector->isNull(pos));
memcpy(bufferToWrite, vectorDataToWriteFrom + pos * numBytesPerValue, numBytesPerValue);
bufferToWrite += numBytesPerValue;
}
}
}

Expand Down

0 comments on commit 97d3dd3

Please sign in to comment.