Skip to content

Commit

Permalink
Merge pull request #1697 from kuzudb/npy-reader-updates
Browse files Browse the repository at this point in the history
Read NPY chunk at a time
  • Loading branch information
aziz-mu committed Jun 21, 2023
2 parents 74ddc68 + 94d01a1 commit 3c5f892
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/include/storage/copier/npy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "common/exception.h"
#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include <arrow/array.h>
#include <arrow/buffer.h>
#include <arrow/record_batch.h>

namespace kuzu {
namespace storage {
Expand All @@ -25,6 +28,9 @@ class NpyReader {

inline size_t getNumRows() const { return shape[0]; }

std::shared_ptr<arrow::DataType> getArrowType() const;
std::shared_ptr<arrow::RecordBatch> readBlock(common::block_idx_t blockIdx) const;

// Used in tests only.
inline common::LogicalTypeID getType() const { return type; }
inline std::vector<size_t> const& getShape() const { return shape; }
Expand All @@ -45,6 +51,7 @@ class NpyReader {
size_t dataOffset;
std::vector<size_t> shape;
common::LogicalTypeID type;
static inline const std::string defaultFieldName = "NPY_FIELD";
};

} // namespace storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <arrow/array/array_base.h>
#include <arrow/array/array_binary.h>
#include <arrow/array/array_primitive.h>
#include <arrow/record_batch.h>
#include <arrow/scalar.h>

namespace kuzu {
Expand Down Expand Up @@ -44,6 +45,7 @@ class InMemColumnChunk {
inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }
inline uint64_t getNumBytes() const { return numBytes; }
inline InMemColumnChunk* getNullChunk() { return nullChunk.get(); }
void copyArrowBatch(std::shared_ptr<arrow::RecordBatch> batch);
virtual void copyArrowArray(arrow::Array& arrowArray, arrow::Array* nodeOffsets = nullptr);
virtual void flush(common::FileInfo* walFileInfo);

Expand Down
2 changes: 2 additions & 0 deletions src/storage/copier/node_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ void NPYNodeCopier::executeInternal(std::unique_ptr<CopyMorsel> morsel) {
std::vector<std::unique_ptr<InMemColumnChunk>> columnChunks(1);
columnChunks[0] =
columns[columnToCopy]->getInMemColumnChunk(morsel->tupleIdx, endNodeOffset, &copyDesc);
auto batch = reader->readBlock(morsel->blockIdx);
columnChunks[0]->copyArrowBatch(batch);
for (auto i = 0u; i < morsel->numTuples; i++) {
columnChunks[0]->setValueAtPos(reader->getPointerToRow(morsel->tupleIdx + i), i);
}
Expand Down
32 changes: 32 additions & 0 deletions src/storage/copier/npy_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,5 +205,37 @@ void NpyReader::validate(LogicalType& type_, offset_t numRows, const std::string
filePath, tableName));
}
}

std::shared_ptr<arrow::DataType> NpyReader::getArrowType() const {
auto thisType = getType();
if (thisType == LogicalTypeID::DOUBLE) {
return arrow::float64();
} else if (thisType == LogicalTypeID::FLOAT) {
return arrow::float32();
} else if (thisType == LogicalTypeID::INT64) {
return arrow::int64();
} else if (thisType == LogicalTypeID::INT32) {
return arrow::int32();
} else if (thisType == LogicalTypeID::INT16) {
return arrow::int16();
}
}

std::shared_ptr<arrow::RecordBatch> NpyReader::readBlock(common::block_idx_t blockIdx) const {
uint64_t rowNumber = CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY * blockIdx;
auto rowPointer = getPointerToRow(rowNumber);
auto buffer =
std::make_shared<arrow::Buffer>(rowPointer, CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY);
auto arrowType = getArrowType();
int64_t length = std::min(CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY, getNumRows() - rowNumber);
auto arr = std::make_shared<arrow::PrimitiveArray>(arrowType, length, buffer);
auto field = std::make_shared<arrow::Field>(defaultFieldName, arrowType);
auto schema =
std::make_shared<arrow::Schema>(std::vector<std::shared_ptr<arrow::Field>>{field});
std::shared_ptr<arrow::RecordBatch> result;
result = arrow::RecordBatch::Make(schema, length, {arr});
return result;
}

} // namespace storage
} // namespace kuzu
5 changes: 5 additions & 0 deletions src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ uint32_t InMemColumnChunk::getDataTypeSizeInColumn(common::LogicalType& dataType
}
}

void InMemColumnChunk::copyArrowBatch(std::shared_ptr<arrow::RecordBatch> batch) {
assert(batch->num_columns() == 1);
copyArrowArray(*batch->column(0), nullptr /* nodeOffsets */);
}

void InMemColumnChunk::copyArrowArray(arrow::Array& arrowArray, arrow::Array* nodeOffsets) {
switch (arrowArray.type_id()) {
case arrow::Type::BOOL: {
Expand Down

0 comments on commit 3c5f892

Please sign in to comment.