Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move child chunks and columns to struct #2173

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,14 @@ namespace processor {
class CopyNodeSharedState {
public:
CopyNodeSharedState(uint64_t& numRows, catalog::NodeTableSchema* tableSchema,
storage::NodeTable* table, storage::MemoryManager* memoryManager, bool isCopyRdf,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);
storage::NodeTable* table, storage::MemoryManager* memoryManager, bool isCopyRdf);

inline void initialize(const std::string& directory) { initializePrimaryKey(directory); };

inline common::offset_t getNextNodeGroupIdx() {
std::unique_lock<std::mutex> lck{mtx};
return getNextNodeGroupIdxWithoutLock();
}
inline void setNextNodeGroupIdx(common::node_group_idx_t nextNodeGroupIdx) {
std::unique_lock<std::mutex> lck{mtx};
if (nextNodeGroupIdx > currentNodeGroupIdx) {
currentNodeGroupIdx = nextNodeGroupIdx;
}
}

inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; }

Expand All @@ -50,7 +43,6 @@ class CopyNodeSharedState {
// The sharedNodeGroup is to accumulate left data within local node groups in CopyNode ops.
std::unique_ptr<storage::NodeGroup> sharedNodeGroup;
bool isCopyRdf;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig; // TODO: remove this
};

struct CopyNodeInfo {
Expand Down Expand Up @@ -78,8 +70,8 @@ class CopyNode : public Sink {
for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
localNodeGroup = std::make_unique<storage::NodeGroup>(sharedState->tableSchema,
sharedState->csvReaderConfig.get(), sharedState->table->compressionEnabled());
localNodeGroup = std::make_unique<storage::NodeGroup>(
sharedState->tableSchema, sharedState->table->compressionEnabled());
}

inline bool canParallel() const final { return !copyNodeInfo.containsSerial; }
Expand Down
47 changes: 12 additions & 35 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct BaseColumnChunkMetadata {
common::page_idx_t pageIdx;
common::page_idx_t numPages;

BaseColumnChunkMetadata() : pageIdx{common::INVALID_PAGE_IDX}, numPages{0} {}
BaseColumnChunkMetadata() : BaseColumnChunkMetadata{common::INVALID_PAGE_IDX, 0} {}
BaseColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages)
: pageIdx(pageIdx), numPages(numPages) {}
virtual ~BaseColumnChunkMetadata() = default;
Expand Down Expand Up @@ -61,9 +61,8 @@ class ColumnChunk {

// ColumnChunks must be initialized after construction, so this constructor should only be used
// through the ColumnChunkFactory
explicit ColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool enableCompression = true,
bool hasNullChunk = true);
explicit ColumnChunk(
common::LogicalType dataType, bool enableCompression = true, bool hasNullChunk = true);

virtual ~ColumnChunk() = default;

Expand All @@ -75,12 +74,7 @@ class ColumnChunk {
inline NullColumnChunk* getNullChunk() { return nullChunk.get(); }
inline common::LogicalType getDataType() const { return dataType; }

inline common::vector_idx_t getNumChildren() const { return childrenChunks.size(); }
inline ColumnChunk* getChild(common::vector_idx_t idx) {
assert(idx < childrenChunks.size());
return childrenChunks[idx].get();
}
virtual inline uint64_t getBufferSize() const { return numBytesPerValue * capacity; }
uint64_t getBufferSize() const;

virtual void resetToEmpty();

Expand All @@ -89,9 +83,6 @@ class ColumnChunk {

virtual void append(common::ValueVector* vector, common::offset_t startPosInChunk);

void append(
common::ValueVector* vector, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);

Expand All @@ -113,7 +104,7 @@ class ColumnChunk {

// numValues must be at least the number of values the ColumnChunk was first initialized
// with
virtual void resize(uint64_t numValues);
virtual void resize(uint64_t newCapacity);

template<typename T>
inline void setValue(T val, common::offset_t pos) {
Expand All @@ -131,7 +122,8 @@ class ColumnChunk {

protected:
// Initializes the data buffer. Is (and should be) only called in constructor.
virtual void initialize(common::offset_t capacity);
void initializeBuffer(common::offset_t capacity);
void initializeFunction(bool enableCompression);

common::offset_t getOffsetInBuffer(common::offset_t pos) const;

Expand All @@ -144,8 +136,6 @@ class ColumnChunk {
uint64_t capacity;
std::unique_ptr<uint8_t[]> buffer;
std::unique_ptr<NullColumnChunk> nullChunk;
std::vector<std::unique_ptr<ColumnChunk>> childrenChunks;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig;
uint64_t numValues;
std::function<ColumnChunkMetadata(
const uint8_t*, uint64_t, BMFileHandle*, common::page_idx_t, const ColumnChunkMetadata&)>
Expand All @@ -169,33 +159,20 @@ inline bool ColumnChunk::getValue(common::offset_t pos) const {
// Stored as bitpacked booleans in-memory and on-disk
class BoolColumnChunk : public ColumnChunk {
public:
BoolColumnChunk(
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool hasNullChunk = true)
: ColumnChunk(common::LogicalType(common::LogicalTypeID::BOOL), std::move(csvReaderConfig),
explicit BoolColumnChunk(bool hasNullChunk = true)
: ColumnChunk(common::LogicalType(common::LogicalTypeID::BOOL),
// Booleans are always compressed
false /* enableCompression */, hasNullChunk) {}

void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) override;

void resize(uint64_t capacity) final;

protected:
inline uint64_t numBytesForValues(common::offset_t numValues) const {
// 8 values per byte, and we need a buffer size which is a multiple of 8 bytes
return ceil(numValues / 8.0 / 8.0) * 8;
}

void initialize(common::offset_t capacity) final;
};

class NullColumnChunk : public BoolColumnChunk {
public:
NullColumnChunk()
: BoolColumnChunk(nullptr /*copyDescription*/, false /*hasNullChunk*/), mayHaveNullValue{
false} {}
NullColumnChunk() : BoolColumnChunk(false /*hasNullChunk*/), mayHaveNullValue{false} {}
// Maybe this should be combined with BoolColumnChunk if the only difference is these functions?
inline bool isNull(common::offset_t pos) const { return getValue<bool>(pos); }
inline void setNull(common::offset_t pos, bool isNull) {
Expand Down Expand Up @@ -228,8 +205,8 @@ class NullColumnChunk : public BoolColumnChunk {
};

struct ColumnChunkFactory {
static std::unique_ptr<ColumnChunk> createColumnChunk(const common::LogicalType& dataType,
bool enableCompression, common::CSVReaderConfig* csvReaderConfig = nullptr);
static std::unique_ptr<ColumnChunk> createColumnChunk(
const common::LogicalType& dataType, bool enableCompression);
};

} // namespace storage
Expand Down
9 changes: 2 additions & 7 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class NodeColumn {
common::ValueVector* resultVector);
virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0);
common::ValueVector* resultVector, uint64_t offsetInVector);
virtual void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk);
virtual void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);
Expand All @@ -63,16 +63,12 @@ class NodeColumn {
inline uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
return metadataDA->getNumElements(transaction->getType());
}
inline NodeColumn* getChildColumn(common::vector_idx_t childIdx) {
assert(childIdx < childrenColumns.size());
return childrenColumns[childIdx].get();
}

virtual void checkpointInMemory();
virtual void rollbackInMemory();

void populateWithDefaultVal(const catalog::Property& property, NodeColumn* nodeColumn,
common::ValueVector* defaultValueVector, uint64_t numNodeGroups);
common::ValueVector* defaultValueVector, uint64_t numNodeGroups) const;

inline CompressionMetadata getCompressionMetadata(
common::node_group_idx_t nodeGroupIdx, transaction::TransactionType transaction) const {
Expand Down Expand Up @@ -124,7 +120,6 @@ class NodeColumn {
WAL* wal;
std::unique_ptr<InMemDiskArray<ColumnChunkMetadata>> metadataDA;
std::unique_ptr<NodeColumn> nullColumn;
std::vector<std::unique_ptr<NodeColumn>> childrenColumns;
read_values_to_vector_func_t readToVectorFunc;
write_values_from_vector_func_t writeFromVectorFunc;
read_values_to_page_func_t readToPageFunc;
Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ class TableData;

class NodeGroup {
public:
explicit NodeGroup(catalog::TableSchema* schema, common::CSVReaderConfig* csvReaderConfig,
bool enableCompression);
NodeGroup(catalog::TableSchema* schema, bool enableCompression);
explicit NodeGroup(TableData* table);

inline void setNodeGroupIdx(uint64_t nodeGroupIdx_) { this->nodeGroupIdx = nodeGroupIdx_; }
Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/store/string_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ namespace storage {

class StringColumnChunk : public ColumnChunk {
public:
StringColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);
explicit StringColumnChunk(common::LogicalType dataType);

void resetToEmpty() final;
void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;
Expand Down
13 changes: 11 additions & 2 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@ namespace storage {

class StructColumnChunk : public ColumnChunk {
public:
StructColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool enableCompression);
StructColumnChunk(common::LogicalType dataType, bool enableCompression);

inline ColumnChunk* getChild(common::vector_idx_t childIdx) {
assert(childIdx < childChunks.size());
return childChunks[childIdx].get();
}

protected:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;

void resize(uint64_t newCapacity) final;

private:
void write(const common::Value& val, uint64_t posToWrite) final;

private:
std::vector<std::unique_ptr<ColumnChunk>> childChunks;
};

} // namespace storage
Expand Down
15 changes: 14 additions & 1 deletion src/include/storage/store/struct_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@ class StructNodeColumn : public NodeColumn {

void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;
common::ValueVector* resultVector, uint64_t offsetInVector) final;

void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) final;

void checkpointInMemory() final;
void rollbackInMemory() final;

inline NodeColumn* getChild(common::vector_idx_t childIdx) {
assert(childIdx < childColumns.size());
return childColumns[childIdx].get();
}

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
Expand All @@ -24,6 +34,9 @@ class StructNodeColumn : public NodeColumn {
common::ValueVector* resultVector) final;
void writeInternal(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom) final;

private:
std::vector<std::unique_ptr<NodeColumn>> childColumns;
};

} // namespace storage
Expand Down
5 changes: 2 additions & 3 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ struct VarListDataColumnChunk {
: dataColumnChunk{std::move(dataChunk)}, capacity{
common::StorageConstants::NODE_GROUP_SIZE} {}

void reset();
void reset() const;

void resizeBuffer(uint64_t numValues);

Expand All @@ -31,8 +31,7 @@ struct VarListDataColumnChunk {

class VarListColumnChunk : public ColumnChunk {
public:
VarListColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool enableCompression);
VarListColumnChunk(common::LogicalType dataType, bool enableCompression);

inline ColumnChunk* getDataColumnChunk() const {
return varListDataColumnChunk.dataColumnChunk.get();
Expand Down
6 changes: 3 additions & 3 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(
// Map copy node.
auto nodeTable = storageManager.getNodesStore().getNodeTable(tableSchema->tableID);
bool isCopyRdf = readerConfig->fileType == FileType::TURTLE;
auto copyNodeSharedState = std::make_shared<CopyNodeSharedState>(
reader->getSharedState()->getNumRowsRef(), tableSchema, nodeTable, memoryManager, isCopyRdf,
readerConfig->csvReaderConfig->copy());
auto copyNodeSharedState =
std::make_shared<CopyNodeSharedState>(reader->getSharedState()->getNumRowsRef(),
tableSchema, nodeTable, memoryManager, isCopyRdf);
CopyNodeInfo copyNodeDataInfo{readerInfo->dataColumnsPos, nodeTable,
&storageManager.getRelsStore(), catalog, storageManager.getWAL(),
copyFromInfo->containsSerial};
Expand Down
5 changes: 2 additions & 3 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ namespace kuzu {
namespace processor {

CopyNodeSharedState::CopyNodeSharedState(uint64_t& numRows, NodeTableSchema* tableSchema,
NodeTable* table, MemoryManager* memoryManager, bool isCopyRdf,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig)
NodeTable* table, MemoryManager* memoryManager, bool isCopyRdf)
: numRows{numRows}, tableSchema{tableSchema}, table{table}, pkColumnID{0}, hasLoggedWAL{false},
currentNodeGroupIdx{0}, isCopyRdf{isCopyRdf}, csvReaderConfig{std::move(csvReaderConfig)} {
currentNodeGroupIdx{0}, isCopyRdf{isCopyRdf} {
auto ftTableSchema = std::make_unique<FactorizedTableSchema>();
ftTableSchema->appendColumn(
std::make_unique<ColumnSchema>(false /* flat */, 0 /* dataChunkPos */,
Expand Down
5 changes: 3 additions & 2 deletions src/storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,13 @@ void VarListLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
StructLocalColumn::StructLocalColumn(NodeColumn* column, bool enableCompression)
: LocalColumn{column, enableCompression} {
assert(column->getDataType().getPhysicalType() == PhysicalTypeID::STRUCT);
auto dataType = column->getDataType();
auto structColumn = static_cast<StructNodeColumn*>(column);
auto dataType = structColumn->getDataType();
auto structFields = StructType::getFields(&dataType);
fields.resize(structFields.size());
for (auto i = 0u; i < structFields.size(); i++) {
fields[i] =
LocalColumnFactory::createLocalColumn(column->getChildColumn(i), enableCompression);
LocalColumnFactory::createLocalColumn(structColumn->getChild(i), enableCompression);
}
}

Expand Down
Loading
Loading