Skip to content

Commit

Permalink
move child chunks and columns to struct
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Oct 11, 2023
1 parent fe7de00 commit 84568fb
Show file tree
Hide file tree
Showing 19 changed files with 312 additions and 381 deletions.
14 changes: 3 additions & 11 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,14 @@ namespace processor {
class CopyNodeSharedState {
public:
CopyNodeSharedState(uint64_t& numRows, catalog::NodeTableSchema* tableSchema,
storage::NodeTable* table, storage::MemoryManager* memoryManager, bool isCopyRdf,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);
storage::NodeTable* table, storage::MemoryManager* memoryManager, bool isCopyRdf);

inline void initialize(const std::string& directory) { initializePrimaryKey(directory); };

inline common::offset_t getNextNodeGroupIdx() {
std::unique_lock<std::mutex> lck{mtx};
return getNextNodeGroupIdxWithoutLock();
}
inline void setNextNodeGroupIdx(common::node_group_idx_t nextNodeGroupIdx) {
std::unique_lock<std::mutex> lck{mtx};
if (nextNodeGroupIdx > currentNodeGroupIdx) {
currentNodeGroupIdx = nextNodeGroupIdx;
}
}

inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; }

Expand All @@ -50,7 +43,6 @@ class CopyNodeSharedState {
// The sharedNodeGroup is to accumulate left data within local node groups in CopyNode ops.
std::unique_ptr<storage::NodeGroup> sharedNodeGroup;
bool isCopyRdf;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig; // TODO: remove this
};

struct CopyNodeInfo {
Expand Down Expand Up @@ -78,8 +70,8 @@ class CopyNode : public Sink {
for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
localNodeGroup = std::make_unique<storage::NodeGroup>(sharedState->tableSchema,
sharedState->csvReaderConfig.get(), sharedState->table->compressionEnabled());
localNodeGroup = std::make_unique<storage::NodeGroup>(
sharedState->tableSchema, sharedState->table->compressionEnabled());
}

inline bool canParallel() const final { return !copyNodeInfo.containsSerial; }
Expand Down
47 changes: 12 additions & 35 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct BaseColumnChunkMetadata {
common::page_idx_t pageIdx;
common::page_idx_t numPages;

BaseColumnChunkMetadata() : pageIdx{common::INVALID_PAGE_IDX}, numPages{0} {}
BaseColumnChunkMetadata() : BaseColumnChunkMetadata{common::INVALID_PAGE_IDX, 0} {}
BaseColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages)
: pageIdx(pageIdx), numPages(numPages) {}
virtual ~BaseColumnChunkMetadata() = default;
Expand Down Expand Up @@ -61,9 +61,8 @@ class ColumnChunk {

// ColumnChunks must be initialized after construction, so this constructor should only be used
// through the ColumnChunkFactory
explicit ColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool enableCompression = true,
bool hasNullChunk = true);
explicit ColumnChunk(
common::LogicalType dataType, bool enableCompression = true, bool hasNullChunk = true);

virtual ~ColumnChunk() = default;

Expand All @@ -75,12 +74,7 @@ class ColumnChunk {
inline NullColumnChunk* getNullChunk() { return nullChunk.get(); }
inline common::LogicalType getDataType() const { return dataType; }

inline common::vector_idx_t getNumChildren() const { return childrenChunks.size(); }
inline ColumnChunk* getChild(common::vector_idx_t idx) {
assert(idx < childrenChunks.size());
return childrenChunks[idx].get();
}
virtual inline uint64_t getBufferSize() const { return numBytesPerValue * capacity; }
uint64_t getBufferSize() const;

virtual void resetToEmpty();

Expand All @@ -89,9 +83,6 @@ class ColumnChunk {

virtual void append(common::ValueVector* vector, common::offset_t startPosInChunk);

void append(
common::ValueVector* vector, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);

Expand All @@ -113,7 +104,7 @@ class ColumnChunk {

// numValues must be at least the number of values the ColumnChunk was first initialized
// with
virtual void resize(uint64_t numValues);
virtual void resize(uint64_t newCapacity);

template<typename T>
inline void setValue(T val, common::offset_t pos) {
Expand All @@ -131,7 +122,8 @@ class ColumnChunk {

protected:
// Initializes the data buffer. Is (and should be) only called in constructor.
virtual void initialize(common::offset_t capacity);
void initializeBuffer(common::offset_t capacity);
void initializeFunction(bool enableCompression);

common::offset_t getOffsetInBuffer(common::offset_t pos) const;

Expand All @@ -144,8 +136,6 @@ class ColumnChunk {
uint64_t capacity;
std::unique_ptr<uint8_t[]> buffer;
std::unique_ptr<NullColumnChunk> nullChunk;
std::vector<std::unique_ptr<ColumnChunk>> childrenChunks;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig;
uint64_t numValues;
std::function<ColumnChunkMetadata(
const uint8_t*, uint64_t, BMFileHandle*, common::page_idx_t, const ColumnChunkMetadata&)>
Expand All @@ -169,33 +159,20 @@ inline bool ColumnChunk::getValue(common::offset_t pos) const {
// Stored as bitpacked booleans in-memory and on-disk
class BoolColumnChunk : public ColumnChunk {
public:
BoolColumnChunk(
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool hasNullChunk = true)
: ColumnChunk(common::LogicalType(common::LogicalTypeID::BOOL), std::move(csvReaderConfig),
explicit BoolColumnChunk(bool hasNullChunk = true)
: ColumnChunk(common::LogicalType(common::LogicalTypeID::BOOL),
// Booleans are always compressed
false /* enableCompression */, hasNullChunk) {}

void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) override;

void resize(uint64_t capacity) final;

protected:
inline uint64_t numBytesForValues(common::offset_t numValues) const {
// 8 values per byte, and we need a buffer size which is a multiple of 8 bytes
return ceil(numValues / 8.0 / 8.0) * 8;
}

void initialize(common::offset_t capacity) final;
};

class NullColumnChunk : public BoolColumnChunk {
public:
NullColumnChunk()
: BoolColumnChunk(nullptr /*copyDescription*/, false /*hasNullChunk*/), mayHaveNullValue{
false} {}
NullColumnChunk() : BoolColumnChunk(false /*hasNullChunk*/), mayHaveNullValue{false} {}
// Maybe this should be combined with BoolColumnChunk if the only difference is these functions?
inline bool isNull(common::offset_t pos) const { return getValue<bool>(pos); }
inline void setNull(common::offset_t pos, bool isNull) {
Expand Down Expand Up @@ -228,8 +205,8 @@ class NullColumnChunk : public BoolColumnChunk {
};

struct ColumnChunkFactory {
static std::unique_ptr<ColumnChunk> createColumnChunk(const common::LogicalType& dataType,
bool enableCompression, common::CSVReaderConfig* csvReaderConfig = nullptr);
static std::unique_ptr<ColumnChunk> createColumnChunk(
const common::LogicalType& dataType, bool enableCompression);
};

} // namespace storage
Expand Down
9 changes: 2 additions & 7 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class NodeColumn {
common::ValueVector* resultVector);
virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0);
common::ValueVector* resultVector, uint64_t offsetInVector);
virtual void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk);
virtual void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);
Expand All @@ -63,16 +63,12 @@ class NodeColumn {
inline uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
return metadataDA->getNumElements(transaction->getType());
}
inline NodeColumn* getChildColumn(common::vector_idx_t childIdx) {
assert(childIdx < childrenColumns.size());
return childrenColumns[childIdx].get();
}

virtual void checkpointInMemory();
virtual void rollbackInMemory();

void populateWithDefaultVal(const catalog::Property& property, NodeColumn* nodeColumn,
common::ValueVector* defaultValueVector, uint64_t numNodeGroups);
common::ValueVector* defaultValueVector, uint64_t numNodeGroups) const;

inline CompressionMetadata getCompressionMetadata(
common::node_group_idx_t nodeGroupIdx, transaction::TransactionType transaction) const {
Expand Down Expand Up @@ -124,7 +120,6 @@ class NodeColumn {
WAL* wal;
std::unique_ptr<InMemDiskArray<ColumnChunkMetadata>> metadataDA;
std::unique_ptr<NodeColumn> nullColumn;
std::vector<std::unique_ptr<NodeColumn>> childrenColumns;
read_values_to_vector_func_t readToVectorFunc;
write_values_from_vector_func_t writeFromVectorFunc;
read_values_to_page_func_t readToPageFunc;
Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ class TableData;

class NodeGroup {
public:
explicit NodeGroup(catalog::TableSchema* schema, common::CSVReaderConfig* csvReaderConfig,
bool enableCompression);
NodeGroup(catalog::TableSchema* schema, bool enableCompression);
explicit NodeGroup(TableData* table);

inline void setNodeGroupIdx(uint64_t nodeGroupIdx_) { this->nodeGroupIdx = nodeGroupIdx_; }
Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/store/string_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ namespace storage {

class StringColumnChunk : public ColumnChunk {
public:
StringColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);
explicit StringColumnChunk(common::LogicalType dataType);

void resetToEmpty() final;
void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;
Expand Down
13 changes: 11 additions & 2 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@ namespace storage {

class StructColumnChunk : public ColumnChunk {
public:
StructColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool enableCompression);
StructColumnChunk(common::LogicalType dataType, bool enableCompression);

inline ColumnChunk* getChild(common::vector_idx_t childIdx) {
assert(childIdx < childChunks.size());
return childChunks[childIdx].get();
}

protected:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;

void resize(uint64_t newCapacity) final;

private:
void write(const common::Value& val, uint64_t posToWrite) final;

private:
std::vector<std::unique_ptr<ColumnChunk>> childChunks;
};

} // namespace storage
Expand Down
15 changes: 14 additions & 1 deletion src/include/storage/store/struct_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@ class StructNodeColumn : public NodeColumn {

void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;
common::ValueVector* resultVector, uint64_t offsetInVector) final;

void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) final;

void checkpointInMemory() final;
void rollbackInMemory() final;

inline NodeColumn* getChild(common::vector_idx_t childIdx) {
assert(childIdx < childColumns.size());
return childColumns[childIdx].get();
}

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
Expand All @@ -24,6 +34,9 @@ class StructNodeColumn : public NodeColumn {
common::ValueVector* resultVector) final;
void writeInternal(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom) final;

private:
std::vector<std::unique_ptr<NodeColumn>> childColumns;
};

} // namespace storage
Expand Down
5 changes: 2 additions & 3 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ struct VarListDataColumnChunk {
: dataColumnChunk{std::move(dataChunk)}, capacity{
common::StorageConstants::NODE_GROUP_SIZE} {}

void reset();
void reset() const;

void resizeBuffer(uint64_t numValues);

Expand All @@ -31,8 +31,7 @@ struct VarListDataColumnChunk {

class VarListColumnChunk : public ColumnChunk {
public:
VarListColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool enableCompression);
VarListColumnChunk(common::LogicalType dataType, bool enableCompression);

inline ColumnChunk* getDataColumnChunk() const {
return varListDataColumnChunk.dataColumnChunk.get();
Expand Down
6 changes: 3 additions & 3 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(
// Map copy node.
auto nodeTable = storageManager.getNodesStore().getNodeTable(tableSchema->tableID);
bool isCopyRdf = readerConfig->fileType == FileType::TURTLE;
auto copyNodeSharedState = std::make_shared<CopyNodeSharedState>(
reader->getSharedState()->getNumRowsRef(), tableSchema, nodeTable, memoryManager, isCopyRdf,
readerConfig->csvReaderConfig->copy());
auto copyNodeSharedState =
std::make_shared<CopyNodeSharedState>(reader->getSharedState()->getNumRowsRef(),
tableSchema, nodeTable, memoryManager, isCopyRdf);
CopyNodeInfo copyNodeDataInfo{readerInfo->dataColumnsPos, nodeTable,
&storageManager.getRelsStore(), catalog, storageManager.getWAL(),
copyFromInfo->containsSerial};
Expand Down
5 changes: 2 additions & 3 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ namespace kuzu {
namespace processor {

CopyNodeSharedState::CopyNodeSharedState(uint64_t& numRows, NodeTableSchema* tableSchema,
NodeTable* table, MemoryManager* memoryManager, bool isCopyRdf,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig)
NodeTable* table, MemoryManager* memoryManager, bool isCopyRdf)
: numRows{numRows}, tableSchema{tableSchema}, table{table}, pkColumnID{0}, hasLoggedWAL{false},
currentNodeGroupIdx{0}, isCopyRdf{isCopyRdf}, csvReaderConfig{std::move(csvReaderConfig)} {
currentNodeGroupIdx{0}, isCopyRdf{isCopyRdf} {
auto ftTableSchema = std::make_unique<FactorizedTableSchema>();
ftTableSchema->appendColumn(
std::make_unique<ColumnSchema>(false /* flat */, 0 /* dataChunkPos */,
Expand Down
5 changes: 3 additions & 2 deletions src/storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,13 @@ void VarListLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
StructLocalColumn::StructLocalColumn(NodeColumn* column, bool enableCompression)
: LocalColumn{column, enableCompression} {
assert(column->getDataType().getPhysicalType() == PhysicalTypeID::STRUCT);
auto dataType = column->getDataType();
auto structColumn = static_cast<StructNodeColumn*>(column);
auto dataType = structColumn->getDataType();
auto structFields = StructType::getFields(&dataType);
fields.resize(structFields.size());
for (auto i = 0u; i < structFields.size(); i++) {
fields[i] =
LocalColumnFactory::createLocalColumn(column->getChildColumn(i), enableCompression);
LocalColumnFactory::createLocalColumn(structColumn->getChild(i), enableCompression);
}
}

Expand Down
Loading

0 comments on commit 84568fb

Please sign in to comment.