Skip to content

Commit

Permalink
Implement loading for blob-type
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Jun 20, 2023
1 parent 90d7b3f commit 2b75b18
Show file tree
Hide file tree
Showing 31 changed files with 259 additions and 70 deletions.
14 changes: 7 additions & 7 deletions dataset/tinysnb/eMeets.csv
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
0,2,"[7.82,3.54]",5
2,5,"[2.87,4.23]",2
3,7,"[3.65,8.44]",3
7,3,"[2.11,3.1]",7
8,3,"[2.2,9.0]",9
9,3,"[3,5.2]",11
10,2,"[3.5,1.1]",13
0,2,"[7.82,3.54]",5,"\\xAA\\xBB\\xCC\\xDD"
2,5,"[2.87,4.23]",2,"NO hex code"
3,7,"[3.65,8.44]",3,"MIXED \\xAC with ASCII \\x0A"
7,3,"[2.11,3.1]",7,"\\xA1\\x2A"
8,3,"[2.2,9.0]",9,"\\x3A\\xA3"
9,3,"[3,5.2]",11,"NO hex code"
10,2,"[3.5,1.1]",13,"\\x3A\\xA3"
4 changes: 2 additions & 2 deletions dataset/tinysnb/schema.cypher
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
create node table person (ID INt64, fName StRING, gender INT64, isStudent BoOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration interval, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, PRIMARY KEY (ID));
create node table organisation (ID INT64, name STRING, orgCode INT64, mark DOUBLE, score INT64, history STRING, licenseValidInterval INTERVAL, rating DOUBLE, state STRUCT(revenue INT16, location STRING[], stock STRUCT(price INT64[], volume INT64)), PRIMARY KEY (ID));
create node table movies (name STRING, length INT32, note STRING, description STRUCT(rating DOUBLE, views INT64, release TIMESTAMP, film DATE), PRIMARY KEY (name));
create node table movies (name STRING, length INT32, note STRING, description STRUCT(rating DOUBLE, views INT64, release TIMESTAMP, film DATE), content BYTEA, PRIMARY KEY (name));
create rel table knows (FROM person TO person, date DATE, meetTime TIMESTAMP, validInterval INTERVAL, comments STRING[], MANY_MANY);
create rel table studyAt (FROM person TO organisation, year INT64, places STRING[], length INT16,MANY_ONE);
create rel table workAt (FROM person TO organisation, year INT64, grading DOUBLE[2], rating float, MANY_ONE);
create rel table meets (FROM person TO person, location FLOAT[2], times INT, MANY_ONE);
create rel table meets (FROM person TO person, location FLOAT[2], times INT, data BYTEA, MANY_ONE);
create rel table marries (FROM person TO person, usedAddress STRING[], address INT16[2], note STRING, ONE_ONE);
6 changes: 3 additions & 3 deletions dataset/tinysnb/vMovies.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Sóló cón tu párejâ,126, this is a very very good movie,"{rating: 5.3, views: 152, release: 2011-08-20 11:25:30, film: 2012-05-11}"
The 😂😃🧘🏻‍♂️🌍🌦️🍞🚗 movie,2544, the movie is very very good,"{rating: 7, views: 982, release: 2018-11-13 13:33:11, film: 2014-09-12}"
Roma,298,the movie is very interesting and funny,"{rating: 1223, views: 10003, release: 2011-02-11 16:44:22, film: 2013-02-22}"
Sóló cón tu párejâ,126, this is a very very good movie,"{rating: 5.3, views: 152, release: 2011-08-20 11:25:30, film: 2012-05-11}","\\xAA\\xABinteresting\\x0B"
The 😂😃🧘🏻‍♂️🌍🌦️🍞🚗 movie,2544, the movie is very very good,"{rating: 7, views: 982, release: 2018-11-13 13:33:11, film: 2014-09-12}","\\xAB\\xCD"
Roma,298,the movie is very interesting and funny,"{rating: 1223, views: 10003, release: 2011-02-11 16:44:22, film: 2013-02-22}","pure ascii characters"
13 changes: 13 additions & 0 deletions src/common/string_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,18 @@ std::string StringUtils::extractStringBetween(
return input.substr(posStart, posEnd - posStart);
}

std::string StringUtils::removeEscapedCharacters(const std::string& input) {
std::string resultStr;
for (auto i = 1u; i < input.length() - 1; i++) {
// Antlr4 already guarantees that the character followed by the escaped character is
// valid. So we can safely skip the escaped character.
if (input[i] == '\\') {
i++;
}
resultStr += input[i];
}
return resultStr;
}

} // namespace common
} // namespace kuzu
18 changes: 9 additions & 9 deletions src/common/types/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,27 @@ uint64_t Blob::getBlobSize(const ku_string_t& blob) {
return blobSize;
}

void Blob::fromString(ku_string_t& str, uint8_t* resultBuffer) {
uint64_t Blob::fromString(const char* str, uint64_t length, uint8_t* resultBuffer) {
auto resultPos = 0u;
auto blobData = str.getData();
for (auto i = 0u; i < str.len; i++) {
if (blobData[i] == '\\') {
validateHexCode(blobData, str.len, i);
for (auto i = 0u; i < length; i++) {
if (str[i] == '\\') {
validateHexCode(reinterpret_cast<const uint8_t*>(str), length, i);
auto firstByte =
HexFormatConstants::HEX_MAP[blobData[i + HexFormatConstants::FIRST_BYTE_POS]];
HexFormatConstants::HEX_MAP[str[i + HexFormatConstants::FIRST_BYTE_POS]];
auto secondByte =
HexFormatConstants::HEX_MAP[blobData[i + HexFormatConstants::SECOND_BYTES_POS]];
HexFormatConstants::HEX_MAP[str[i + HexFormatConstants::SECOND_BYTES_POS]];
resultBuffer[resultPos++] =
(firstByte << HexFormatConstants::NUM_BYTES_TO_SHIFT_FOR_FIRST_BYTE) + secondByte;
i += HexFormatConstants::LENGTH - 1;
} else if (blobData[i] <= 127) {
resultBuffer[resultPos++] = blobData[i];
} else if (str[i] <= 127) {
resultBuffer[resultPos++] = str[i];
} else {
throw ConversionException(
"Invalid byte encountered in STRING -> BLOB conversion. All non-ascii characters "
"must be escaped with hex codes (e.g. \\xAA)");
}
}
return resultPos;
}

std::string Blob::toString(blob_t& blob) {
Expand Down
20 changes: 13 additions & 7 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ LogicalTypeID LogicalTypeUtils::dataTypeIDFromString(const std::string& dataType
return LogicalTypeID::FLOAT;
} else if ("BOOLEAN" == dataTypeIDString) {
return LogicalTypeID::BOOL;
} else if ("BYTEA" == dataTypeIDString || "BLOB" == dataTypeIDString) {
return LogicalTypeID::BLOB;
} else if ("STRING" == dataTypeIDString) {
return LogicalTypeID::STRING;
} else if ("DATE" == dataTypeIDString) {
Expand Down Expand Up @@ -530,22 +532,26 @@ std::vector<LogicalType> LogicalTypeUtils::getAllValidComparableLogicalTypes() {
LogicalType{LogicalTypeID::INT16}, LogicalType{LogicalTypeID::DOUBLE},
LogicalType{LogicalTypeID::FLOAT}, LogicalType{LogicalTypeID::DATE},
LogicalType{LogicalTypeID::TIMESTAMP}, LogicalType{LogicalTypeID::INTERVAL},
LogicalType{LogicalTypeID::STRING}, LogicalType{LogicalTypeID::SERIAL}};
LogicalType{LogicalTypeID::BLOB}, LogicalType{LogicalTypeID::STRING},
LogicalType{LogicalTypeID::SERIAL}};
}

std::vector<LogicalTypeID> LogicalTypeUtils::getNumericalLogicalTypeIDs() {
return std::vector<LogicalTypeID>{LogicalTypeID::INT64, LogicalTypeID::INT32,
LogicalTypeID::INT16, LogicalTypeID::DOUBLE, LogicalTypeID::FLOAT, LogicalTypeID::SERIAL};
}

std::vector<LogicalTypeID> LogicalTypeUtils::getAllValidLogicTypeIDs() {
std::vector<LogicalType> LogicalTypeUtils::getAllValidLogicTypes() {
// TODO(Ziyi): Add FIX_LIST,STRUCT,MAP type to allValidTypeID when we support functions on
// FIXED_LIST,STRUCT,MAP.
return std::vector<LogicalTypeID>{LogicalTypeID::INTERNAL_ID, LogicalTypeID::BOOL,
LogicalTypeID::INT64, LogicalTypeID::INT32, LogicalTypeID::INT16, LogicalTypeID::DOUBLE,
LogicalTypeID::STRING, LogicalTypeID::DATE, LogicalTypeID::TIMESTAMP,
LogicalTypeID::INTERVAL, LogicalTypeID::VAR_LIST, LogicalTypeID::FLOAT,
LogicalTypeID::SERIAL};
return std::vector<LogicalType>{LogicalType{LogicalTypeID::INTERNAL_ID},
LogicalType{LogicalTypeID::BOOL}, LogicalType{LogicalTypeID::INT64},
LogicalType{LogicalTypeID::INT32}, LogicalType{LogicalTypeID::INT16},
LogicalType{LogicalTypeID::DOUBLE}, LogicalType{LogicalTypeID::STRING},
LogicalType{LogicalTypeID::BLOB}, LogicalType{LogicalTypeID::DATE},
LogicalType{LogicalTypeID::TIMESTAMP}, LogicalType{LogicalTypeID::INTERVAL},
LogicalType{LogicalTypeID::VAR_LIST}, LogicalType{LogicalTypeID::FLOAT},
LogicalType{LogicalTypeID::SERIAL}};
}

std::vector<std::string> LogicalTypeUtils::parseStructFields(const std::string& structTypeStr) {
Expand Down
13 changes: 3 additions & 10 deletions src/function/built_in_aggregate_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,11 @@ void BuiltInAggregateFunctions::registerCountStar() {

void BuiltInAggregateFunctions::registerCount() {
std::vector<std::unique_ptr<AggregateFunctionDefinition>> definitions;
LogicalType inputType;
for (auto& typeID : LogicalTypeUtils::getAllValidLogicTypeIDs()) {
if (typeID == LogicalTypeID::VAR_LIST) {
inputType = LogicalType(
typeID, std::make_unique<VarListTypeInfo>(std::make_unique<LogicalType>()));
} else {
inputType = LogicalType(typeID);
}
for (auto& type : LogicalTypeUtils::getAllValidLogicTypes()) {
for (auto isDistinct : std::vector<bool>{true, false}) {
definitions.push_back(std::make_unique<AggregateFunctionDefinition>(COUNT_FUNC_NAME,
std::vector<LogicalTypeID>{typeID}, LogicalTypeID::INT64,
AggregateFunctionUtil::getCountFunction(inputType, isDistinct), isDistinct));
std::vector<LogicalTypeID>{type.getLogicalTypeID()}, LogicalTypeID::INT64,
AggregateFunctionUtil::getCountFunction(type, isDistinct), isDistinct));
}
}
aggregateFunctions.insert({COUNT_FUNC_NAME, std::move(definitions)});
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/string_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class StringUtils {

static std::string extractStringBetween(const std::string& input, char delimiterStart,
char delimiterEnd, bool includeDelimiter = false);

static std::string removeEscapedCharacters(const std::string& input);
};

} // namespace common
Expand Down
16 changes: 8 additions & 8 deletions src/include/common/types/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@ struct blob_t {
};

struct HexFormatConstants {
static constexpr const char PREFIX[] = "\\\\x";
static constexpr const uint64_t PREFIX_LENGTH = 3;
static constexpr const uint64_t FIRST_BYTE_POS = PREFIX_LENGTH;
static constexpr const uint64_t SECOND_BYTES_POS = PREFIX_LENGTH + 1;
static constexpr const uint64_t LENGTH = 5;
static constexpr const uint64_t NUM_BYTES_TO_SHIFT_FOR_FIRST_BYTE = 4;
static constexpr const uint64_t SECOND_BYTE_MASK = 0x0F;
// map of integer -> hex value.
static constexpr const char* HEX_TABLE = "0123456789ABCDEF";
// reverse map of byte -> integer value, or -1 for invalid hex values.
static const int HEX_MAP[256];
static constexpr const uint64_t NUM_BYTES_TO_SHIFT_FOR_FIRST_BYTE = 4;
static constexpr const uint64_t SECOND_BYTE_MASK = 0x0F;
static constexpr const char PREFIX[] = "\\x";
static constexpr const uint64_t PREFIX_LENGTH = 2;
static constexpr const uint64_t FIRST_BYTE_POS = PREFIX_LENGTH;
static constexpr const uint64_t SECOND_BYTES_POS = PREFIX_LENGTH + 1;
static constexpr const uint64_t LENGTH = 4;
};

struct Blob {
static std::string toString(blob_t& blob);

static uint64_t getBlobSize(const ku_string_t& blob);

static void fromString(ku_string_t& str, uint8_t* resultBuffer);
static uint64_t fromString(const char* str, uint64_t length, uint8_t* resultBuffer);

private:
static void validateHexCode(const uint8_t* blobStr, uint64_t length, uint64_t curPos);
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class LogicalTypeUtils {
static bool isNumerical(const LogicalType& dataType);
static std::vector<LogicalType> getAllValidComparableLogicalTypes();
static std::vector<LogicalTypeID> getNumericalLogicalTypeIDs();
static std::vector<LogicalTypeID> getAllValidLogicTypeIDs();
static std::vector<LogicalType> getAllValidLogicTypes();

private:
static LogicalTypeID dataTypeIDFromString(const std::string& dataTypeIDString);
Expand Down
6 changes: 5 additions & 1 deletion src/include/function/cast/cast_operations.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ struct CastToBlob {
common::StringVector::getInMemOverflowBuffer(&resultVector)
->allocateSpace(result.value.len));
}
common::Blob::fromString(input, result.value.getDataWritable());
common::Blob::fromString(reinterpret_cast<const char*>(input.getData()), input.len,
result.value.getDataWritable());
if (!common::ku_string_t::isShortString(result.value.len)) {
memcpy(result.value.prefix, result.value.getData(), common::ku_string_t::PREFIX_LENGTH);
}
}
};

Expand Down
10 changes: 5 additions & 5 deletions src/include/storage/in_mem_storage_structure/in_mem_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ class InMemColumn {

std::unique_ptr<InMemColumnChunk> getInMemColumnChunk(common::offset_t startNodeOffset,
common::offset_t endNodeOffset, const common::CopyDescription* copyDescription) {
switch (dataType.getLogicalTypeID()) {
case common::LogicalTypeID::STRING:
case common::LogicalTypeID::VAR_LIST: {
switch (dataType.getPhysicalType()) {
case common::PhysicalTypeID::STRING:
case common::PhysicalTypeID::VAR_LIST: {
return std::make_unique<InMemColumnChunkWithOverflow>(
dataType, startNodeOffset, endNodeOffset, copyDescription, inMemOverflowFile.get());
}
case common::LogicalTypeID::FIXED_LIST: {
case common::PhysicalTypeID::FIXED_LIST: {
return std::make_unique<InMemFixedListColumnChunk>(
dataType, startNodeOffset, endNodeOffset, copyDescription);
}
case common::LogicalTypeID::STRUCT: {
case common::PhysicalTypeID::STRUCT: {
auto inMemStructColumnChunk = std::make_unique<InMemStructColumnChunk>(
dataType, startNodeOffset, endNodeOffset, copyDescription);
for (auto& fieldColumn : childColumns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ class InMemColumnChunkWithOverflow : public InMemColumnChunk {
common::offset_t endNodeOffset, const common::CopyDescription* copyDescription,
InMemOverflowFile* inMemOverflowFile)
: InMemColumnChunk{std::move(dataType), startNodeOffset, endNodeOffset, copyDescription},
inMemOverflowFile{inMemOverflowFile} {}
inMemOverflowFile{inMemOverflowFile}, blobBuffer{std::make_unique<uint8_t[]>(
common::BufferPoolConstants::PAGE_4KB_SIZE)} {}

void copyArrowArray(arrow::Array& array, arrow::Array* nodeOffsets = nullptr) final;

Expand All @@ -106,6 +107,7 @@ class InMemColumnChunkWithOverflow : public InMemColumnChunk {
storage::InMemOverflowFile* inMemOverflowFile;
// TODO(Ziyi/Guodong): Fix this for rel columns.
PageByteCursor overflowCursor;
std::unique_ptr<uint8_t[]> blobBuffer;
};

class InMemStructColumnChunk : public InMemColumnChunk {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class InMemListsWithOverflow : public InMemLists {
std::unique_ptr<InMemOverflowFile> overflowInMemFile;
// TODO(Guodong/Ziyi): Fix for concurrent writes.
PageByteCursor overflowCursor;
std::unique_ptr<uint8_t[]> blobBuffer;
};

class InMemAdjLists : public InMemLists {
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/storage_structure/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class ColumnFactory {
case common::LogicalTypeID::INTERVAL:
case common::LogicalTypeID::FIXED_LIST:
return std::make_unique<Column>(structureIDAndFName, logicalType, bufferManager, wal);
case common::LogicalTypeID::BLOB:
case common::LogicalTypeID::STRING:
return std::make_unique<StringPropertyColumn>(
structureIDAndFName, logicalType, bufferManager, wal);
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/storage_structure/lists/lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class ListsFactory {
return std::make_unique<Lists>(structureIDAndFName, dataType,
storage::StorageUtils::getDataTypeSize(dataType), adjListsHeaders, bufferManager,
wal, listsUpdatesStore);
case common::LogicalTypeID::BLOB:
case common::LogicalTypeID::STRING:
return std::make_unique<StringPropertyLists>(
structureIDAndFName, adjListsHeaders, bufferManager, wal, listsUpdatesStore);
Expand Down
2 changes: 1 addition & 1 deletion src/parser/transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ Transformer::transformParsingOptions(CypherParser::KU_ParsingOptionsContext& ctx

std::string Transformer::transformStringLiteral(antlr4::tree::TerminalNode& stringLiteral) {
auto str = stringLiteral.getText();
return str.substr(1, str.size() - 2);
return common::StringUtils::removeEscapedCharacters(str);
}

} // namespace parser
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/order_by/order_by.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void OrderBy::initGlobalStateInternal(kuzu::processor::ExecutionContext* context
auto tableSchema = populateTableSchema();
for (auto i = 0u; i < orderByDataInfo.keysPosAndType.size(); ++i) {
auto [dataPos, dataType] = orderByDataInfo.keysPosAndType[i];
if (LogicalTypeID::STRING == dataType.getLogicalTypeID()) {
if (PhysicalTypeID::STRING == dataType.getPhysicalType()) {
// If this is a string column, we need to find the factorizedTable offset for this
// column.
auto factorizedTableColIdx = 0ul;
Expand Down
1 change: 1 addition & 0 deletions src/storage/copier/table_copy_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ std::shared_ptr<arrow::DataType> TableCopyUtils::toArrowDataType(const LogicalTy
case LogicalTypeID::INTERVAL:
case LogicalTypeID::FIXED_LIST:
case LogicalTypeID::VAR_LIST:
case LogicalTypeID::BLOB:
case LogicalTypeID::STRING:
case LogicalTypeID::STRUCT: {
return arrow::utf8();
Expand Down
8 changes: 4 additions & 4 deletions src/storage/in_mem_storage_structure/in_mem_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ namespace storage {
InMemColumn::InMemColumn(std::string filePath, LogicalType dataType, bool requireNullBits)
: filePath{std::move(filePath)}, dataType{std::move(dataType)} {
// TODO(Guodong): Separate this as a function.
switch (this->dataType.getLogicalTypeID()) {
case LogicalTypeID::STRUCT: {
switch (this->dataType.getPhysicalType()) {
case PhysicalTypeID::STRUCT: {
auto fieldTypes = common::StructType::getFieldTypes(&this->dataType);
childColumns.reserve(fieldTypes.size());
for (auto i = 0u; i < fieldTypes.size(); i++) {
Expand All @@ -22,8 +22,8 @@ InMemColumn::InMemColumn(std::string filePath, LogicalType dataType, bool requir
true /* hasNull */));
}
} break;
case LogicalTypeID::STRING:
case LogicalTypeID::VAR_LIST: {
case PhysicalTypeID::STRING:
case PhysicalTypeID::VAR_LIST: {
inMemOverflowFile =
std::make_unique<InMemOverflowFile>(StorageUtils::getOverflowFileName(this->filePath));
fileHandle = std::make_unique<FileHandle>(
Expand Down
14 changes: 14 additions & 0 deletions src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ void InMemColumnChunkWithOverflow::copyArrowArray(arrow::Array& array, arrow::Ar
case common::LogicalTypeID::STRING: {
templateCopyValuesAsStringToPageWithOverflow<ku_string_t>(array, nodeOffsets);
} break;
case common::LogicalTypeID::BLOB: {
templateCopyValuesAsStringToPageWithOverflow<blob_t>(array, nodeOffsets);
} break;
case common::LogicalTypeID::VAR_LIST: {
templateCopyValuesAsStringToPageWithOverflow<ku_list_t>(array, nodeOffsets);
} break;
Expand Down Expand Up @@ -303,6 +306,17 @@ void InMemColumnChunkWithOverflow::setValWithOverflow<ku_string_t>(
setValue(val, pos);
}

// BLOB
template<>
void InMemColumnChunkWithOverflow::setValWithOverflow<blob_t>(
const char* value, uint64_t length, uint64_t pos) {
auto blobLen = Blob::fromString(
value, std::min(length, BufferPoolConstants::PAGE_4KB_SIZE), blobBuffer.get());
auto blobVal = inMemOverflowFile->copyString(
reinterpret_cast<const char*>(blobBuffer.get()), blobLen, overflowCursor);
setValue(blobVal, pos);
}

// VAR_LIST
template<>
void InMemColumnChunkWithOverflow::setValWithOverflow<ku_list_t>(
Expand Down
Loading

0 comments on commit 2b75b18

Please sign in to comment.