Skip to content

Commit

Permalink
Remove copyNonNullDataWithSameTypeIntoPos
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Jun 17, 2023
1 parent 994daff commit c28273f
Show file tree
Hide file tree
Showing 18 changed files with 172 additions and 167 deletions.
65 changes: 46 additions & 19 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <stdexcept>

#include "common/exception.h"
#include "common/null_buffer.h"
#include "common/ser_deser.h"
#include "common/string_utils.h"
#include "common/types/types_include.h"
Expand Down Expand Up @@ -41,6 +42,29 @@ std::string PhysicalTypeUtils::physicalTypeToString(PhysicalTypeID physicalType)
}
}

uint32_t PhysicalTypeUtils::getFixedTypeSize(PhysicalTypeID physicalType) {
switch (physicalType) {
case PhysicalTypeID::BOOL:
return sizeof(bool);
case PhysicalTypeID::INT64:
return sizeof(int64_t);
case PhysicalTypeID::INT32:
return sizeof(int32_t);
case PhysicalTypeID::INT16:
return sizeof(int16_t);
case PhysicalTypeID::DOUBLE:
return sizeof(double_t);
case PhysicalTypeID::FLOAT:
return sizeof(float_t);
case PhysicalTypeID::INTERVAL:
return sizeof(interval_t);
case PhysicalTypeID::INTERNAL_ID:
return sizeof(internalID_t);
default:
throw NotImplementedException{"PhysicalTypeUtils::getFixedTypeSize."};

Check warning on line 64 in src/common/types/types.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/types/types.cpp#L63-L64

Added lines #L63 - L64 were not covered by tests
}
}

bool VarListTypeInfo::operator==(const kuzu::common::VarListTypeInfo& other) const {
return *childType == *other.childType;
}
Expand Down Expand Up @@ -456,26 +480,29 @@ std::string LogicalTypeUtils::dataTypesToString(const std::vector<LogicalTypeID>
return result;
}

uint32_t LogicalTypeUtils::getFixedTypeSize(kuzu::common::PhysicalTypeID physicalType) {
switch (physicalType) {
case PhysicalTypeID::BOOL:
return sizeof(bool);
case PhysicalTypeID::INT64:
return sizeof(int64_t);
case PhysicalTypeID::INT32:
return sizeof(int32_t);
case PhysicalTypeID::INT16:
return sizeof(int16_t);
case PhysicalTypeID::DOUBLE:
return sizeof(double_t);
case PhysicalTypeID::FLOAT:
return sizeof(float_t);
case PhysicalTypeID::INTERVAL:
return sizeof(interval_t);
case PhysicalTypeID::INTERNAL_ID:
return sizeof(internalID_t);
uint32_t LogicalTypeUtils::getRowLayoutSize(const LogicalType& type) {
switch (type.getPhysicalType()) {
case PhysicalTypeID::STRING: {
return sizeof(ku_string_t);
}
case PhysicalTypeID::FIXED_LIST: {
return getRowLayoutSize(*FixedListType::getChildType(&type)) *
FixedListType::getNumElementsInList(&type);
}
case PhysicalTypeID::VAR_LIST: {
return sizeof(ku_list_t);
}
case PhysicalTypeID::STRUCT: {
uint32_t size = 0;
auto fieldsTypes = StructType::getFieldTypes(&type);
for (auto fieldType : fieldsTypes) {
size += getRowLayoutSize(*fieldType);
}
size += NullBuffer::getNumBytesForNullValues(fieldsTypes.size());
return size;
}
default:
throw NotImplementedException{"Cannot infer the size of a variable dataType."};
return PhysicalTypeUtils::getFixedTypeSize(type.getPhysicalType());
}
}

Expand Down
57 changes: 56 additions & 1 deletion src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "common/vector/value_vector.h"

#include "common/null_buffer.h"
#include "common/vector/auxiliary_buffer.h"

namespace kuzu {
Expand Down Expand Up @@ -51,6 +52,24 @@ void ValueVector::setValue(uint32_t pos, T val) {
((T*)valueBuffer.get())[pos] = val;
}

void ValueVector::copyFromRowData(uint32_t pos, const uint8_t* rowData) {
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::STRUCT: {
StructVector::copyFromRowData(this, pos, rowData);
} break;
case PhysicalTypeID::VAR_LIST: {
ListVector::copyFromRowData(this, pos, rowData);
} break;
case PhysicalTypeID::STRING: {
StringVector::addString(this, pos, *(ku_string_t*)rowData);
} break;
default: {
auto dataTypeSize = LogicalTypeUtils::getRowLayoutSize(dataType);
memcpy(getData() + pos * dataTypeSize, rowData, dataTypeSize);
}
}
}

void ValueVector::resetAuxiliaryBuffer() {
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::STRING: {
Expand Down Expand Up @@ -93,7 +112,7 @@ uint32_t ValueVector::getDataTypeSize(const LogicalType& type) {
return 0;
}
default: {
return LogicalTypeUtils::getFixedTypeSize(type.getPhysicalType());
return PhysicalTypeUtils::getFixedTypeSize(type.getPhysicalType());
}
}
}
Expand Down Expand Up @@ -180,5 +199,41 @@ void StringVector::addString(
}
}

void ListVector::copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData) {
assert(vector->dataType.getPhysicalType() == PhysicalTypeID::VAR_LIST);
auto srcKuList = *(ku_list_t*)rowData;
auto srcNullBytes = reinterpret_cast<uint8_t*>(srcKuList.overflowPtr);
auto srcListValues = srcNullBytes + NullBuffer::getNumBytesForNullValues(srcKuList.size);
auto dstListEntry = ListVector::addList(vector, srcKuList.size);
vector->setValue<list_entry_t>(pos, dstListEntry);
auto resultDataVector = common::ListVector::getDataVector(vector);
auto rowLayoutSize = LogicalTypeUtils::getRowLayoutSize(resultDataVector->dataType);
for (auto i = 0u; i < srcKuList.size; i++) {
auto dstListValuePos = dstListEntry.offset + i;
if (NullBuffer::isNull(srcNullBytes, i)) {
resultDataVector->setNull(dstListValuePos, true);
} else {
resultDataVector->copyFromRowData(dstListValuePos, srcListValues);
}
srcListValues += rowLayoutSize;
}
}

void StructVector::copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData) {
assert(vector->dataType.getPhysicalType() == PhysicalTypeID::STRUCT);
auto structFields = getFieldVectors(vector);
auto structNullBytes = rowData;
auto structValues = structNullBytes + NullBuffer::getNumBytesForNullValues(structFields.size());
for (auto i = 0u; i < structFields.size(); i++) {
auto structField = structFields[i];
if (NullBuffer::isNull(structNullBytes, i)) {
structField->setNull(pos, true /* isNull */);
} else {
structField->copyFromRowData(pos, structValues);
}
structValues += LogicalTypeUtils::getRowLayoutSize(structField->dataType);
}
}

} // namespace common
} // namespace kuzu
59 changes: 4 additions & 55 deletions src/common/vector/value_vector_utils.cpp
Original file line number Diff line number Diff line change
@@ -1,59 +1,10 @@
#include "common/vector/value_vector_utils.h"

#include "common/null_buffer.h"
#include "processor/result/factorized_table.h"

using namespace kuzu;
using namespace common;

void ValueVectorUtils::copyNonNullDataWithSameTypeIntoPos(
ValueVector& resultVector, uint64_t pos, const uint8_t* srcData) {
switch (resultVector.dataType.getPhysicalType()) {
case PhysicalTypeID::STRUCT: {
auto structFields = StructVector::getFieldVectors(&resultVector);
auto structNullBytes = srcData;
auto structValues =
structNullBytes + NullBuffer::getNumBytesForNullValues(structFields.size());
for (auto i = 0u; i < structFields.size(); i++) {
auto structField = structFields[i];
if (NullBuffer::isNull(structNullBytes, i)) {
structField->setNull(pos, true /* isNull */);
} else {
copyNonNullDataWithSameTypeIntoPos(*structField, pos, structValues);
}
structValues += processor::FactorizedTable::getDataTypeSize(structField->dataType);
}
} break;
case PhysicalTypeID::VAR_LIST: {
auto srcKuList = *(ku_list_t*)srcData;
auto srcNullBytes = reinterpret_cast<uint8_t*>(srcKuList.overflowPtr);
auto srcListValues = srcNullBytes + NullBuffer::getNumBytesForNullValues(srcKuList.size);
auto dstListEntry = ListVector::addList(&resultVector, srcKuList.size);
resultVector.setValue<list_entry_t>(pos, dstListEntry);
auto resultDataVector = common::ListVector::getDataVector(&resultVector);
auto numBytesPerValue =
processor::FactorizedTable::getDataTypeSize(resultDataVector->dataType);
for (auto i = 0u; i < srcKuList.size; i++) {
auto dstListValuePos = dstListEntry.offset + i;
if (NullBuffer::isNull(srcNullBytes, i)) {
resultDataVector->setNull(dstListValuePos, true);
} else {
copyNonNullDataWithSameTypeIntoPos(
*resultDataVector, dstListValuePos, srcListValues);
}
srcListValues += numBytesPerValue;
}
} break;
case PhysicalTypeID::STRING: {
StringVector::addString(&resultVector, pos, *(ku_string_t*)srcData);
} break;
default: {
auto dataTypeSize = processor::FactorizedTable::getDataTypeSize(resultVector.dataType);
memcpy(resultVector.getData() + pos * dataTypeSize, srcData, dataTypeSize);
}
}
}

void ValueVectorUtils::copyNonNullDataWithSameTypeOutFromPos(const ValueVector& srcVector,
uint64_t pos, uint8_t* dstData, InMemOverflowBuffer& dstOverflowBuffer) {
switch (srcVector.dataType.getPhysicalType()) {
Expand All @@ -73,7 +24,7 @@ void ValueVectorUtils::copyNonNullDataWithSameTypeOutFromPos(const ValueVector&
copyNonNullDataWithSameTypeOutFromPos(
*structField, pos, structValues, dstOverflowBuffer);
}
structValues += processor::FactorizedTable::getDataTypeSize(structField->dataType);
structValues += LogicalTypeUtils::getRowLayoutSize(structField->dataType);
}
} break;
case PhysicalTypeID::VAR_LIST: {
Expand All @@ -82,8 +33,7 @@ void ValueVectorUtils::copyNonNullDataWithSameTypeOutFromPos(const ValueVector&
ku_list_t dstList;
dstList.size = srcListEntry.size;
auto dstListOverflowSize =
processor::FactorizedTable::getDataTypeSize(srcListDataVector->dataType) *
dstList.size +
LogicalTypeUtils::getRowLayoutSize(srcListDataVector->dataType) * dstList.size +
NullBuffer::getNumBytesForNullValues(dstList.size);
dstList.overflowPtr =
reinterpret_cast<uint64_t>(dstOverflowBuffer.allocateSpace(dstListOverflowSize));
Expand All @@ -97,8 +47,7 @@ void ValueVectorUtils::copyNonNullDataWithSameTypeOutFromPos(const ValueVector&
copyNonNullDataWithSameTypeOutFromPos(
*srcListDataVector, srcListEntry.offset + i, dstListValues, dstOverflowBuffer);
}
dstListValues +=
processor::FactorizedTable::getDataTypeSize(srcListDataVector->dataType);
dstListValues += LogicalTypeUtils::getRowLayoutSize(srcListDataVector->dataType);
}
memcpy(dstData, &dstList, sizeof(dstList));
} break;
Expand All @@ -114,7 +63,7 @@ void ValueVectorUtils::copyNonNullDataWithSameTypeOutFromPos(const ValueVector&
}
} break;
default: {
auto dataTypeSize = processor::FactorizedTable::getDataTypeSize(srcVector.dataType);
auto dataTypeSize = LogicalTypeUtils::getRowLayoutSize(srcVector.dataType);
memcpy(dstData, srcVector.getData() + pos * dataTypeSize, dataTypeSize);
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ enum class PhysicalTypeID : uint8_t {
STRUCT = 23,
};

struct PhysicalTypeUtils {
static std::string physicalTypeToString(PhysicalTypeID physicalType);
};

class LogicalType;

class ExtraTypeInfo {
Expand Down Expand Up @@ -310,14 +306,19 @@ struct UnionType {
}
};

struct PhysicalTypeUtils {
static std::string physicalTypeToString(PhysicalTypeID physicalType);
static uint32_t getFixedTypeSize(PhysicalTypeID physicalType);
};

class LogicalTypeUtils {
public:
KUZU_API static std::string dataTypeToString(const LogicalType& dataType);
KUZU_API static std::string dataTypeToString(LogicalTypeID dataTypeID);
static std::string dataTypesToString(const std::vector<LogicalType>& dataTypes);
static std::string dataTypesToString(const std::vector<LogicalTypeID>& dataTypeIDs);
KUZU_API static LogicalType dataTypeFromString(const std::string& dataTypeString);
static uint32_t getFixedTypeSize(kuzu::common::PhysicalTypeID physicalType);
static uint32_t getRowLayoutSize(const LogicalType& logicalType);
static bool isNumerical(const LogicalType& dataType);
static std::vector<LogicalType> getAllValidComparableLogicalTypes();
static std::vector<LogicalTypeID> getNumericalLogicalTypeIDs();
Expand Down
5 changes: 5 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ValueVector {
}
template<typename T>
void setValue(uint32_t pos, T val);
void copyFromRowData(uint32_t pos, const uint8_t* rowData);

inline uint8_t* getData() const { return valueBuffer.get(); }

Expand Down Expand Up @@ -122,6 +123,8 @@ class ListVector {
return reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->addList(listSize);
}

static void copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData);
};

class StructVector {
Expand Down Expand Up @@ -150,6 +153,8 @@ class StructVector {
vector->getData() + vector->getNumBytesPerValue() * DEFAULT_VECTOR_CAPACITY),
0);
}

static void copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData);
};

class UnionVector {
Expand Down
4 changes: 0 additions & 4 deletions src/include/common/vector/value_vector_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ namespace common {

class ValueVectorUtils {
public:
// These two functions assume that the given uint8_t* srcData/dstData are pointing to a data
// with the same data type as this ValueVector.
static void copyNonNullDataWithSameTypeIntoPos(
ValueVector& resultVector, uint64_t pos, const uint8_t* srcData);
static void copyNonNullDataWithSameTypeOutFromPos(const ValueVector& srcVector, uint64_t pos,
uint8_t* dstData, InMemOverflowBuffer& dstOverflowBuffer);
static void copyValue(uint8_t* dstValue, common::ValueVector& dstVector,
Expand Down
4 changes: 2 additions & 2 deletions src/include/function/aggregate/collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ struct CollectFunction {
outputVector->setValue<common::list_entry_t>(pos, listEntry);
auto outputDataVector = common::ListVector::getDataVector(outputVector);
for (auto i = 0u; i < listEntry.size; i++) {
common::ValueVectorUtils::copyNonNullDataWithSameTypeIntoPos(
*outputDataVector, listEntry.offset + i, factorizedTable->getTuple(i));
outputDataVector->copyFromRowData(
listEntry.offset + i, factorizedTable->getTuple(i));
}
// CollectStates are stored in factorizedTable entries. When the factorizedTable is
// destructed, the destructor of CollectStates won't be called. Therefore, we need to
Expand Down
1 change: 0 additions & 1 deletion src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ class FactorizedTable {
storage::DiskOverflowFile* overflowFileOfInMemList, const common::LogicalType& type) const;
void clear();
int64_t findValueInFlatColumn(ft_col_idx_t colIdx, int64_t value) const;
static uint32_t getDataTypeSize(const common::LogicalType& type);

private:
void setOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t colIdx, ft_tuple_idx_t tupleIdx);
Expand Down
2 changes: 1 addition & 1 deletion src/processor/mapper/map_expressions_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalExpressionsScanToPhysica
for (auto& expression : expressions) {
tableSchema->appendColumn(
std::make_unique<ColumnSchema>(false, 0 /* all expressions are in the same datachunk */,
FactorizedTable::getDataTypeSize(expression->dataType)));
LogicalTypeUtils::getRowLayoutSize(expression->dataType)));
auto expressionEvaluator = expressionMapper.mapExpression(expression, *inSchema);
// expression can be evaluated statically and does not require an actual resultset to init
expressionEvaluator->init(ResultSet(0) /* dummy resultset */, memoryManager);
Expand Down
7 changes: 4 additions & 3 deletions src/processor/mapper/map_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using namespace kuzu::binder;
using namespace kuzu::planner;
using namespace kuzu::common;

namespace kuzu {
namespace processor {
Expand All @@ -20,7 +21,7 @@ std::unique_ptr<HashJoinBuildInfo> PlanMapper::createHashBuildInfo(
keyGroupPosSet.insert(pos.dataChunkPos);
// Keys are always stored in flat column.
auto columnSchema = std::make_unique<ColumnSchema>(false /* isUnFlat */, pos.dataChunkPos,
FactorizedTable::getDataTypeSize(key->dataType));
LogicalTypeUtils::getRowLayoutSize(key->dataType));
tableSchema->appendColumn(std::move(columnSchema));
keysPos.push_back(pos);
}
Expand All @@ -34,7 +35,7 @@ std::unique_ptr<HashJoinBuildInfo> PlanMapper::createHashBuildInfo(
// payloads must also be stored as flat.
// 2. payload is in flat chunk
columnSchema = std::make_unique<ColumnSchema>(false /* isUnFlat */, pos.dataChunkPos,
FactorizedTable::getDataTypeSize(payload->dataType));
LogicalTypeUtils::getRowLayoutSize(payload->dataType));
} else {
columnSchema = std::make_unique<ColumnSchema>(
true /* isUnFlat */, pos.dataChunkPos, (uint32_t)sizeof(common::overflow_value_t));
Expand All @@ -44,7 +45,7 @@ std::unique_ptr<HashJoinBuildInfo> PlanMapper::createHashBuildInfo(
}
auto pointerType = common::LogicalType(common::LogicalTypeID::INT64);
auto pointerColumn = std::make_unique<ColumnSchema>(false /* isUnFlat */,
INVALID_DATA_CHUNK_POS, FactorizedTable::getDataTypeSize(pointerType));
INVALID_DATA_CHUNK_POS, LogicalTypeUtils::getRowLayoutSize(pointerType));
tableSchema->appendColumn(std::move(pointerColumn));
return std::make_unique<HashJoinBuildInfo>(
std::move(keysPos), std::move(payloadsPos), std::move(tableSchema));
Expand Down
Loading

0 comments on commit c28273f

Please sign in to comment.