Skip to content

Commit

Permalink
Add loader and storage support for struct
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Apr 25, 2023
1 parent 8dc87ca commit cc7ed5e
Show file tree
Hide file tree
Showing 37 changed files with 580 additions and 174 deletions.
2 changes: 1 addition & 1 deletion dataset/tinysnb/schema.cypher
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
create node table person (ID INt64, fName StRING, gender INT64, isStudent BoOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration interval, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, PRIMARY KEY (ID));
create node table organisation (ID INT64, name STRING, orgCode INT64, mark DOUBLE, score INT64, history STRING, licenseValidInterval INTERVAL, rating DOUBLE, PRIMARY KEY (ID));
create node table movies (name STRING, length INT32, note STRING, PRIMARY KEY (name));
create node table movies (name STRING, length INT32, note STRING, description STRUCT(rating DOUBLE, views INT64), PRIMARY KEY (name));
create rel table knows (FROM person TO person, date DATE, meetTime TIMESTAMP, validInterval INTERVAL, comments STRING[], MANY_MANY);
create rel table studyAt (FROM person TO organisation, year INT64, places STRING[], length INT16,MANY_ONE);
create rel table workAt (FROM person TO organisation, year INT64, grading DOUBLE[2], rating float, MANY_ONE);
Expand Down
6 changes: 3 additions & 3 deletions dataset/tinysnb/vMovies.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Sóló cón tu párejâ,126, this is a very very good movie
The 😂😃🧘🏻‍♂️🌍🌦️🍞🚗 movie,2544, the movie is very very good
Roma,298,the movie is very interesting and funny
Sóló cón tu párejâ,126, this is a very very good movie,"{rating: 5.3, views: 152}"
The 😂😃🧘🏻‍♂️🌍🌦️🍞🚗 movie,2544, the movie is very very good,"{rating: 7, views: 982}"
Roma,298,the movie is very interesting and funny,"{rating: 1223, views: 10003}"
36 changes: 24 additions & 12 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ uint64_t SerDeser::deserializeValue(FixedListTypeInfo& value, FileInfo* fileInfo
template<>
uint64_t SerDeser::serializeValue(
const StructTypeInfo& value, FileInfo* fileInfo, uint64_t offset) {
auto childrenTypesToReturn = value.getChildrenTypes();
return serializeVector(childrenTypesToReturn, fileInfo, offset);
return serializeVector(value.fields, fileInfo, offset);
}

template<>
Expand All @@ -51,15 +50,18 @@ uint64_t SerDeser::deserializeValue(StructTypeInfo& value, FileInfo* fileInfo, u
}

template<>
uint64_t SerDeser::serializeValue(const StructField& value, FileInfo* fileInfo, uint64_t offset) {
offset = serializeValue(value.getName(), fileInfo, offset);
return serializeValue(*value.getType(), fileInfo, offset);
uint64_t SerDeser::serializeValue(
const std::unique_ptr<StructField>& value, FileInfo* fileInfo, uint64_t offset) {
offset = serializeValue<std::string>(value->name, fileInfo, offset);
return serializeValue(*value->getType(), fileInfo, offset);
}

template<>
uint64_t SerDeser::deserializeValue(StructField& value, FileInfo* fileInfo, uint64_t offset) {
offset = deserializeValue(value.name, fileInfo, offset);
return deserializeValue(*value.type, fileInfo, offset);
uint64_t SerDeser::deserializeValue(
std::unique_ptr<StructField>& value, FileInfo* fileInfo, uint64_t offset) {
value = std::make_unique<StructField>();
offset = deserializeValue<std::string>(value->name, fileInfo, offset);
return deserializeValue(*value->type, fileInfo, offset);
}

template<>
Expand Down Expand Up @@ -149,7 +151,7 @@ std::unique_ptr<StructField> StructField::copy() const {
std::unique_ptr<ExtraTypeInfo> StructTypeInfo::copy() const {
std::vector<std::unique_ptr<StructField>> structFields;
for (auto& field : fields) {
structFields.emplace_back(field->copy());
structFields.push_back(field->copy());
}
return std::make_unique<StructTypeInfo>(std::move(structFields));
}
Expand All @@ -170,6 +172,14 @@ std::vector<std::string> StructTypeInfo::getChildrenNames() const {
return childrenNames;
}

std::vector<StructField*> StructTypeInfo::getStructFields() const {
std::vector<StructField*> structFields;
for (auto& field : fields) {
structFields.push_back(field.get());
}
return structFields;
}

DataType::DataType() : typeID{ANY}, extraTypeInfo{nullptr} {}

DataType::DataType(DataTypeID typeID) : typeID{typeID}, extraTypeInfo{nullptr} {}
Expand Down Expand Up @@ -297,7 +307,7 @@ DataType Types::dataTypeFromString(const std::string& dataTypeString) {
throw Exception("Cannot parse struct type: " + dataTypeString);
}
std::istringstream iss{
dataTypeString.substr(leftBracketPos, rightBracketPos - leftBracketPos)};
dataTypeString.substr(leftBracketPos + 1, rightBracketPos - leftBracketPos - 1)};
std::vector<std::unique_ptr<StructField>> childrenTypes;
std::string field, fieldName, fieldType;
while (getline(iss, field, ',')) {
Expand Down Expand Up @@ -360,10 +370,12 @@ std::string Types::dataTypeToString(const DataType& dataType) {
case STRUCT: {
auto structTypeInfo = reinterpret_cast<StructTypeInfo*>(dataType.extraTypeInfo.get());
std::string dataTypeStr = dataTypeToString(dataType.typeID) + "(";
for (auto& childType : structTypeInfo->getChildrenTypes()) {
dataTypeStr += dataTypeToString(*childType);
auto numFields = structTypeInfo->getChildrenTypes().size();
for (auto i = 0u; i < numFields - 1; i++) {
dataTypeStr += dataTypeToString(*structTypeInfo->getChildrenTypes()[i]);
dataTypeStr += ",";
}
dataTypeStr += dataTypeToString(*structTypeInfo->getChildrenTypes()[numFields - 1]);
return dataTypeStr + ")";
}
case ANY:
Expand Down
1 change: 1 addition & 0 deletions src/function/vector_struct_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ std::unique_ptr<FunctionBindData> StructExtractVectorOperations::bindFunc(
throw common::BinderException("Key name for struct_extract must be STRING literal.");
}
auto key = ((binder::LiteralExpression&)*arguments[1]).getValue()->getValue<std::string>();
common::StringUtils::toUpper(key);
assert(definition->returnTypeID == common::ANY);
auto childrenTypes = typeInfo->getChildrenTypes();
auto childrenNames = typeInfo->getChildrenNames();
Expand Down
4 changes: 4 additions & 0 deletions src/include/common/ser_deser.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,13 @@ class SerDeser {

template<>
uint64_t SerDeser::serializeValue(const DataType& value, FileInfo* fileInfo, uint64_t offset);
template<>
uint64_t SerDeser::serializeValue(const std::string& value, FileInfo* fileInfo, uint64_t offset);

template<>
uint64_t SerDeser::deserializeValue(DataType& value, FileInfo* fileInfo, uint64_t offset);
template<>
uint64_t SerDeser::deserializeValue(std::string& value, FileInfo* fileInfo, uint64_t offset);

} // namespace common
} // namespace kuzu
14 changes: 9 additions & 5 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vector>

#include "common/api.h"
#include "common/string_utils.h"

namespace kuzu {
namespace common {
Expand All @@ -29,6 +30,7 @@ constexpr column_id_t INVALID_COLUMN_ID = INVALID_PROPERTY_ID;
using vector_idx_t = uint32_t;
constexpr vector_idx_t INVALID_VECTOR_IDX = UINT32_MAX;
using block_idx_t = uint64_t;
using field_idx_t = uint64_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand Down Expand Up @@ -116,8 +118,12 @@ class StructField {
friend class SerDeser;

public:
StructField() : type{std::make_unique<DataType>()} {}
StructField(std::string name, std::unique_ptr<DataType> type)
: name{std::move(name)}, type{std::move(type)} {}
: name{std::move(name)}, type{std::move(type)} {
// Note: struct field name is case-insensitive.
StringUtils::toUpper(this->name);
}
inline bool operator!=(const StructField& other) const { return !(*this == other); }
inline std::string getName() const { return name; }
inline DataType* getType() const { return type.get(); }
Expand All @@ -133,15 +139,13 @@ class StructTypeInfo : public ExtraTypeInfo {
friend class SerDeser;

public:
StructTypeInfo() = default;
explicit StructTypeInfo(std::vector<std::unique_ptr<StructField>> fields)
: fields{std::move(fields)} {}
StructTypeInfo() = default;

inline void addChildType(const std::string& name, const DataType& type) {
fields.emplace_back(std::make_unique<StructField>(name, std::make_unique<DataType>(type)));
}
std::vector<DataType*> getChildrenTypes() const;
std::vector<std::string> getChildrenNames() const;
std::vector<StructField*> getStructFields() const;

bool operator==(const kuzu::common::StructTypeInfo& other) const;
std::unique_ptr<ExtraTypeInfo> copy() const override;
Expand Down
16 changes: 10 additions & 6 deletions src/include/storage/copier/table_copy_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ class TableCopyExecutor {
uint64_t copy(processor::ExecutionContext* executionContext);

static void throwCopyExceptionIfNotOK(const arrow::Status& status);

static std::unique_ptr<common::Value> getArrowVarList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);
int64_t to, const common::DataType& dataType,
const common::CopyDescription& copyDescription);
static std::unique_ptr<uint8_t[]> getArrowFixedList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

int64_t to, const common::DataType& dataType,
const common::CopyDescription& copyDescription);
static std::shared_ptr<arrow::csv::StreamingReader> createCSVReader(const std::string& filePath,
common::CSVReaderConfig* csvReaderConfig, catalog::TableSchema* tableSchema);
static std::unique_ptr<parquet::arrow::FileReader> createParquetReader(
Expand All @@ -72,15 +72,19 @@ class TableCopyExecutor {

void countNumLinesNpy(const std::vector<std::string>& filePaths);

static std::vector<std::pair<int64_t, int64_t>> getListElementPos(
const std::string& l, int64_t from, int64_t to, common::CopyDescription& copyDescription);
static std::vector<std::pair<int64_t, int64_t>> getListElementPos(const std::string& l,
int64_t from, int64_t to, const common::CopyDescription& copyDescription);

inline void updateTableStatistics() {
tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows);
}

static std::shared_ptr<arrow::DataType> toArrowDataType(const common::DataType& dataType);

private:
static std::unique_ptr<common::Value> convertStringToValue(std::string element,
const common::DataType& type, const common::CopyDescription& copyDescription);

protected:
std::shared_ptr<spdlog::logger> logger;
common::CopyDescription& copyDescription;
Expand Down
13 changes: 13 additions & 0 deletions src/include/storage/in_mem_storage_structure/in_mem_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ using fill_in_mem_column_function_t = std::function<void(InMemColumn* inMemColum
class InMemColumn {

public:
InMemColumn() = default;

// For property columns.
InMemColumn(std::string fName, common::DataType dataType, uint64_t numBytesForElement,
uint64_t numElements);
Expand Down Expand Up @@ -126,6 +128,17 @@ class InMemListColumn : public InMemColumnWithOverflow {
};
};

class InMemStructColumn : public InMemColumn {

public:
InMemStructColumn(std::string fName, common::DataType dataType, uint64_t numElements);

void saveToFile() override;

private:
std::vector<std::unique_ptr<InMemColumn>> structFieldsColumns;
};

class InMemColumnFactory {

public:
Expand Down
60 changes: 55 additions & 5 deletions src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ namespace storage {

class InMemColumnChunk {
public:
InMemColumnChunk() = default;

InMemColumnChunk(common::DataType dataType, common::offset_t startOffset,
common::offset_t endOffset, uint16_t numBytesForElement, uint64_t numElementsInAPage);

virtual ~InMemColumnChunk() = default;

inline uint8_t* getPage(common::page_idx_t pageIdx) {
assert(pageIdx <= endPageIdx && pageIdx >= startPageIdx);
auto pageIdxInSet = pageIdx - startPageIdx;
Expand All @@ -32,6 +36,8 @@ class InMemColumnChunk {
return getPage(cursor.pageIdx) + elemPosInPageInBytes;
}

inline uint64_t getNumElementsInAPage() const { return numElementsInAPage; }

template<typename T, typename... Args>
void templateCopyValuesToPage(const PageElementCursor& pageCursor, arrow::Array& array,
uint64_t posInArray, uint64_t numValues, Args... args) {
Expand Down Expand Up @@ -78,18 +84,55 @@ class InMemColumnChunk {
template<typename T, typename... Args>
void setValueFromString(const char* value, uint64_t length, common::page_idx_t pageIdx,
uint64_t posInPage, Args... args) {
throw common::CopyException("Unsupported type to set element for " +
std::string(value, length) + " at pos " +
std::to_string(posInPage));
auto num = common::TypeUtils::convertStringToNumber<T>(value);
copyValue(pageIdx, posInPage, (uint8_t*)&num);
}

private:
virtual void copyStructValueToFields(arrow::Array& array, uint64_t posInArray,
const common::CopyDescription& copyDescription, common::offset_t nodeOffset,
uint64_t numValues) {
assert(false);
}

protected:
common::DataType dataType;
uint16_t numBytesForElement;
uint64_t numElementsInAPage;
common::page_idx_t startPageIdx;
common::page_idx_t endPageIdx;
std::unique_ptr<uint8_t[]> pages;
uint64_t numElementsInAPage;
};

class InMemStructColumnChunk : public InMemColumnChunk {
public:
InMemStructColumnChunk(
common::DataType dataType, common::offset_t startOffset, common::offset_t endOffset);

void copyStructValueToFields(arrow::Array& array, uint64_t posInArray,
const common::CopyDescription& copyDescription, common::offset_t nodeOffset,
uint64_t numValues) override;

std::vector<InMemColumnChunk*> getInMemColumnChunksForFields();

uint64_t getMinNumValuesLeftOnPage(common::offset_t nodeOffset);

private:
static common::field_idx_t getStructFieldIdx(
std::vector<std::string> structFieldNames, std::string structFieldName);

void copyValueToStructColumnField(common::offset_t nodeOffset,
common::field_idx_t structFieldIdx, const std::string& structFieldValue,
const common::DataType& dataType);

private:
std::vector<std::unique_ptr<InMemColumnChunk>> inMemColumnChunksForFields;
};

class InMemColumnChunkFactory {
public:
static std::unique_ptr<InMemColumnChunk> getInMemColumnChunk(common::DataType dataType,
common::offset_t startOffset, common::offset_t endOffset, uint16_t numBytesForElement,
uint64_t numElementsInAPage);
};

template<>
Expand All @@ -111,8 +154,15 @@ void InMemColumnChunk::templateCopyValuesToPage<std::string, InMemOverflowFile*,
common::CopyDescription&>(const PageElementCursor& pageCursor, arrow::Array& array,
uint64_t posInArray, uint64_t numValues, InMemOverflowFile* overflowFile,
PageByteCursor& overflowCursor, common::CopyDescription& copyDesc);
template<>
void InMemColumnChunk::templateCopyValuesToPage<std::string, common::CopyDescription&,
common::offset_t>(const PageElementCursor& pageCursor, arrow::Array& array, uint64_t posInArray,
uint64_t numValues, common::CopyDescription& copyDesc, common::offset_t nodeOffset);

template<>
void InMemColumnChunk::setValueFromString<bool>(
const char* value, uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage);
template<>
void InMemColumnChunk::setValueFromString<common::ku_string_t, InMemOverflowFile*, PageByteCursor&>(
const char* value, uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage,
InMemOverflowFile* overflowFile, PageByteCursor& overflowCursor);
Expand Down
Loading

0 comments on commit cc7ed5e

Please sign in to comment.