Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update rel property #1149

Merged
merged 1 commit into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 64 additions & 47 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ using namespace kuzu::common;
namespace kuzu {
namespace processor {

typedef uint64_t ft_tuple_idx_t;
typedef uint32_t ft_col_idx_t;
typedef uint32_t ft_col_offset_t;
typedef uint32_t ft_block_idx_t;
typedef uint32_t ft_block_offset_t;

struct BlockAppendingInfo {
BlockAppendingInfo(uint8_t* data, uint64_t numTuplesToAppend)
: data{data}, numTuplesToAppend{numTuplesToAppend} {}
Expand Down Expand Up @@ -43,8 +49,8 @@ class DataBlock {
}
inline void resetToZero() { memset(block->data, 0, LARGE_PAGE_SIZE); }

static void copyTuples(DataBlock* blockToCopyFrom, uint32_t tupleIdxToCopyFrom,
DataBlock* blockToCopyInto, uint32_t tupleIdxToCopyInfo, uint32_t numTuplesToCopy,
static void copyTuples(DataBlock* blockToCopyFrom, ft_tuple_idx_t tupleIdxToCopyFrom,
DataBlock* blockToCopyInto, ft_tuple_idx_t tupleIdxToCopyTo, uint32_t numTuplesToCopy,
uint32_t numBytesPerTuple);

public:
Expand Down Expand Up @@ -73,7 +79,7 @@ class DataBlockCollection {
inline void append(unique_ptr<DataBlockCollection> other) { append(std::move(other->blocks)); }
inline bool isEmpty() { return blocks.empty(); }
inline vector<unique_ptr<DataBlock>>& getBlocks() { return blocks; }
inline DataBlock* getBlock(uint32_t blockIdx) { return blocks[blockIdx].get(); }
inline DataBlock* getBlock(ft_block_idx_t blockIdx) { return blocks[blockIdx].get(); }
inline uint64_t getNumBlocks() const { return blocks.size(); }

void merge(DataBlockCollection& other);
Expand Down Expand Up @@ -127,17 +133,17 @@ class FactorizedTableSchema {

void appendColumn(unique_ptr<ColumnSchema> column);

inline ColumnSchema* getColumn(uint32_t idx) const { return columns[idx].get(); }
inline ColumnSchema* getColumn(ft_col_idx_t idx) const { return columns[idx].get(); }

inline uint32_t getNumColumns() const { return columns.size(); }

inline uint32_t getNullMapOffset() const { return numBytesForDataPerTuple; }
inline ft_col_offset_t getNullMapOffset() const { return numBytesForDataPerTuple; }

inline uint32_t getNumBytesPerTuple() const { return numBytesPerTuple; }

inline uint32_t getColOffset(uint32_t idx) const { return colOffsets[idx]; }
inline ft_col_offset_t getColOffset(ft_col_idx_t idx) const { return colOffsets[idx]; }

inline void setMayContainsNullsToTrue(uint32_t idx) {
inline void setMayContainsNullsToTrue(ft_col_idx_t idx) {
assert(idx < columns.size());
columns[idx]->setMayContainsNullsToTrue();
}
Expand All @@ -156,7 +162,7 @@ class FactorizedTableSchema {
uint32_t numBytesForDataPerTuple = 0;
uint32_t numBytesForNullMapPerTuple = 0;
uint32_t numBytesPerTuple = 0;
vector<uint32_t> colOffsets;
vector<ft_col_offset_t> colOffsets;
};

class FlatTupleIterator;
Expand All @@ -177,22 +183,23 @@ class FactorizedTable {

// This function scans numTuplesToScan of rows to vectors starting at tupleIdx. Callers are
// responsible for making sure all the parameters are valid.
inline void scan(vector<shared_ptr<ValueVector>>& vectors, uint64_t tupleIdx,
inline void scan(vector<shared_ptr<ValueVector>>& vectors, ft_tuple_idx_t tupleIdx,
uint64_t numTuplesToScan) const {
vector<uint32_t> colIdxes(tableSchema->getNumColumns());
iota(colIdxes.begin(), colIdxes.end(), 0);
scan(vectors, tupleIdx, numTuplesToScan, colIdxes);
}
inline bool isEmpty() const { return getNumTuples() == 0; }
void scan(vector<shared_ptr<ValueVector>>& vectors, uint64_t tupleIdx, uint64_t numTuplesToScan,
vector<uint32_t>& colIdxToScan) const;
void scan(vector<shared_ptr<ValueVector>>& vectors, ft_tuple_idx_t tupleIdx,
uint64_t numTuplesToScan, vector<uint32_t>& colIdxToScan) const;
// TODO(Guodong): Unify these two interfaces along with `readUnflatCol`.
void lookup(vector<shared_ptr<ValueVector>>& vectors, vector<uint32_t>& colIdxesToScan,
uint8_t** tuplesToRead, uint64_t startPos, uint64_t numTuplesToRead) const;
void lookup(vector<shared_ptr<ValueVector>>& vectors, const SelectionVector* selVector,
vector<uint32_t>& colIdxesToScan, uint8_t* tupleToRead) const;
void lookup(vector<shared_ptr<ValueVector>>& vectors, vector<uint32_t>& colIdxesToScan,
vector<uint64_t>& tupleIdxesToRead, uint64_t startPos, uint64_t numTuplesToRead) const;
vector<ft_tuple_idx_t>& tupleIdxesToRead, uint64_t startPos,
uint64_t numTuplesToRead) const;

// When we merge two factorizedTables, we need to update the hasNoNullGuarantee based on
// other factorizedTable.
Expand All @@ -202,92 +209,102 @@ class FactorizedTable {
inline InMemOverflowBuffer* getInMemOverflowBuffer() const { return inMemOverflowBuffer.get(); }

bool hasUnflatCol() const;
inline bool hasUnflatCol(vector<uint32_t>& colIdxes) const {
inline bool hasUnflatCol(vector<ft_col_idx_t>& colIdxes) const {
return any_of(colIdxes.begin(), colIdxes.end(),
[this](uint64_t colIdx) { return !tableSchema->getColumn(colIdx)->isFlat(); });
[this](ft_col_idx_t colIdx) { return !tableSchema->getColumn(colIdx)->isFlat(); });
}

inline uint64_t getNumTuples() const { return numTuples; }
uint64_t getTotalNumFlatTuples() const;
uint64_t getNumFlatTuples(uint64_t tupleIdx) const;
uint64_t getNumFlatTuples(ft_tuple_idx_t tupleIdx) const;

inline vector<unique_ptr<DataBlock>>& getTupleDataBlocks() {
return flatTupleBlockCollection->getBlocks();
}
inline const FactorizedTableSchema* getTableSchema() const { return tableSchema.get(); }

template<typename TYPE>
inline TYPE getData(uint32_t blockIdx, uint32_t blockOffset, uint32_t colOffset) const {
inline TYPE getData(
ft_block_idx_t blockIdx, ft_block_offset_t blockOffset, ft_col_offset_t colOffset) const {
return *((TYPE*)getCell(blockIdx, blockOffset, colOffset));
}

uint8_t* getTuple(uint64_t tupleIdx) const;
uint8_t* getTuple(ft_tuple_idx_t tupleIdx) const;

void updateFlatCell(uint8_t* tuplePtr, uint32_t colIdx, ValueVector* valueVector, uint32_t pos);
inline void updateFlatCellNoNull(uint8_t* ftTuplePtr, uint32_t colIdx, void* dataBuf) {
void updateFlatCell(
uint8_t* tuplePtr, ft_col_idx_t colIdx, ValueVector* valueVector, uint32_t pos);
inline void updateFlatCellNoNull(uint8_t* ftTuplePtr, ft_col_idx_t colIdx, void* dataBuf) {
memcpy(ftTuplePtr + tableSchema->getColOffset(colIdx), dataBuf,
tableSchema->getColumn(colIdx)->getNumBytes());
}

inline uint64_t getNumTuplesPerBlock() const { return numTuplesPerBlock; }

inline bool hasNoNullGuarantee(uint32_t colIdx) const {
inline bool hasNoNullGuarantee(ft_col_idx_t colIdx) const {
return tableSchema->getColumn(colIdx)->hasNoNullGuarantee();
}

bool isOverflowColNull(const uint8_t* nullBuffer, uint32_t tupleIdx, uint32_t colIdx) const;
bool isNonOverflowColNull(const uint8_t* nullBuffer, uint32_t colIdx) const;
void setNonOverflowColNull(uint8_t* nullBuffer, uint32_t colIdx);
void copySingleValueToVector(ft_tuple_idx_t tupleIdx, ft_col_idx_t colIdx,
shared_ptr<ValueVector> valueVector, uint32_t posInVector) const;
bool isOverflowColNull(
const uint8_t* nullBuffer, ft_tuple_idx_t tupleIdx, ft_col_idx_t colIdx) const;
bool isNonOverflowColNull(const uint8_t* nullBuffer, ft_col_idx_t colIdx) const;
void setNonOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t colIdx);
// Note: this function also resets the overflow ptr of list and string to point to a buffer
// inside overflowFileOfInMemList.
void copyToInMemList(uint32_t colIdx, vector<uint64_t>& tupleIdxesToRead, uint8_t* data,
NullMask* nullMask, uint64_t startElemPosInList, DiskOverflowFile* overflowFileOfInMemList,
const DataType& type, NodeIDCompressionScheme* nodeIDCompressionScheme) const;
void copyToInMemList(ft_col_idx_t colIdx, vector<ft_tuple_idx_t>& tupleIdxesToRead,
uint8_t* data, NullMask* nullMask, uint64_t startElemPosInList,
DiskOverflowFile* overflowFileOfInMemList, const DataType& type,
NodeIDCompressionScheme* nodeIDCompressionScheme) const;
void clear();
int64_t findValueInFlatColumn(uint64_t colIdx, int64_t value) const;
int64_t findValueInFlatColumn(ft_col_idx_t colIdx, int64_t value) const;

private:
static bool isNull(const uint8_t* nullMapBuffer, uint32_t idx);
void setNull(uint8_t* nullBuffer, uint32_t idx);
void setOverflowColNull(uint8_t* nullBuffer, uint32_t colIdx, uint32_t tupleIdx);
static bool isNull(const uint8_t* nullMapBuffer, ft_col_idx_t idx);
void setNull(uint8_t* nullBuffer, ft_col_idx_t idx);
void setOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t colIdx, ft_tuple_idx_t tupleIdx);

uint64_t computeNumTuplesToAppend(const vector<shared_ptr<ValueVector>>& vectorsToAppend) const;

inline uint8_t* getCell(uint32_t blockIdx, uint32_t blockOffset, uint32_t colOffset) const {
inline uint8_t* getCell(
ft_block_idx_t blockIdx, ft_block_offset_t blockOffset, ft_col_offset_t colOffset) const {
return flatTupleBlockCollection->getBlock(blockIdx)->getData() +
blockOffset * tableSchema->getNumBytesPerTuple() + colOffset;
}
inline pair<uint64_t, uint64_t> getBlockIdxAndTupleIdxInBlock(uint64_t tupleIdx) const {
inline pair<ft_block_idx_t, ft_block_offset_t> getBlockIdxAndTupleIdxInBlock(
uint64_t tupleIdx) const {
return make_pair(tupleIdx / numTuplesPerBlock, tupleIdx % numTuplesPerBlock);
}

vector<BlockAppendingInfo> allocateFlatTupleBlocks(uint64_t numTuplesToAppend);
uint8_t* allocateUnflatTupleBlock(uint32_t numBytes);
void copyFlatVectorToFlatColumn(
const ValueVector& vector, const BlockAppendingInfo& blockAppendInfo, uint32_t colIdx);
const ValueVector& vector, const BlockAppendingInfo& blockAppendInfo, ft_col_idx_t colIdx);
void copyUnflatVectorToFlatColumn(const ValueVector& vector,
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples, uint32_t colIdx);
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples, ft_col_idx_t colIdx);
inline void copyVectorToFlatColumn(const ValueVector& vector,
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples, uint32_t colIdx) {
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples,
ft_col_idx_t colIdx) {
vector.state->isFlat() ?
copyFlatVectorToFlatColumn(vector, blockAppendInfo, colIdx) :
copyUnflatVectorToFlatColumn(vector, blockAppendInfo, numAppendedTuples, colIdx);
}
void copyVectorToUnflatColumn(
const ValueVector& vector, const BlockAppendingInfo& blockAppendInfo, uint32_t colIdx);
const ValueVector& vector, const BlockAppendingInfo& blockAppendInfo, ft_col_idx_t colIdx);
void copyVectorToColumn(const ValueVector& vector, const BlockAppendingInfo& blockAppendInfo,
uint64_t numAppendedTuples, uint32_t colIdx);
overflow_value_t appendVectorToUnflatTupleBlocks(const ValueVector& vector, uint32_t colIdx);
uint64_t numAppendedTuples, ft_col_idx_t colIdx);
overflow_value_t appendVectorToUnflatTupleBlocks(
const ValueVector& vector, ft_col_idx_t colIdx);

// TODO(Guodong): Unify these two `readUnflatCol()` with a (possibly templated) copy executor.
void readUnflatCol(uint8_t** tuplesToRead, uint32_t colIdx, ValueVector& vector) const;
void readUnflatCol(uint8_t** tuplesToRead, ft_col_idx_t colIdx, ValueVector& vector) const;
void readUnflatCol(const uint8_t* tupleToRead, const SelectionVector* selVector,
uint32_t colIdx, ValueVector& vector) const;
ft_col_idx_t colIdx, ValueVector& vector) const;
void readFlatColToFlatVector(
uint8_t** tuplesToRead, uint32_t colIdx, ValueVector& vector) const;
void readFlatColToUnflatVector(uint8_t** tuplesToRead, uint32_t colIdx, ValueVector& vector,
uint8_t** tuplesToRead, ft_col_idx_t colIdx, ValueVector& vector) const;
void readFlatColToUnflatVector(uint8_t** tuplesToRead, ft_col_idx_t colIdx, ValueVector& vector,
uint64_t numTuplesToRead) const;
inline void readFlatCol(uint8_t** tuplesToRead, uint32_t colIdx, ValueVector& vector,
inline void readFlatCol(uint8_t** tuplesToRead, ft_col_idx_t colIdx, ValueVector& vector,
uint64_t numTuplesToRead) const {
vector.state->isFlat() ?
readFlatColToFlatVector(tuplesToRead, colIdx, vector) :
Expand Down Expand Up @@ -329,9 +346,9 @@ class FlatTupleIterator {
->set(valueBuffer, columnDataTypes[flatTupleValIdx]);
}

void readUnflatColToFlatTuple(uint64_t flatTupleValIdx, uint8_t* valueBuffer);
void readUnflatColToFlatTuple(ft_col_idx_t colIdx, uint8_t* valueBuffer);

void readFlatColToFlatTuple(uint32_t colIdx, uint8_t* valueBuffer);
void readFlatColToFlatTuple(ft_col_idx_t colIdx, uint8_t* valueBuffer);

// We put pair(UINT64_MAX, UINT64_MAX) in all invalid entries in
// FlatTuplePositionsInDataChunk.
Expand All @@ -354,8 +371,8 @@ class FlatTupleIterator {
FactorizedTable& factorizedTable;
uint8_t* currentTupleBuffer;
uint64_t numFlatTuples;
uint32_t nextFlatTupleIdx;
uint64_t nextTupleIdx;
ft_tuple_idx_t nextFlatTupleIdx;
ft_tuple_idx_t nextTupleIdx;
// This field stores the (nextIdxToReadInDataChunk, numElementsInDataChunk) of each dataChunk.
vector<pair<uint64_t, uint64_t>> flatTuplePositionsInDataChunk;

Expand Down
Loading