Skip to content

Commit

Permalink
add support for rel-update
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Jan 5, 2023
1 parent 5111139 commit 29a1531
Show file tree
Hide file tree
Showing 25 changed files with 812 additions and 669 deletions.
111 changes: 64 additions & 47 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ using namespace kuzu::common;
namespace kuzu {
namespace processor {

typedef uint64_t ft_tuple_idx_t;
typedef uint32_t ft_col_idx_t;
typedef uint32_t ft_col_offset_t;
typedef uint32_t ft_block_idx_t;
typedef uint32_t ft_block_offset_t;

struct BlockAppendingInfo {
BlockAppendingInfo(uint8_t* data, uint64_t numTuplesToAppend)
: data{data}, numTuplesToAppend{numTuplesToAppend} {}
Expand Down Expand Up @@ -43,8 +49,8 @@ class DataBlock {
}
inline void resetToZero() { memset(block->data, 0, LARGE_PAGE_SIZE); }

static void copyTuples(DataBlock* blockToCopyFrom, uint32_t tupleIdxToCopyFrom,
DataBlock* blockToCopyInto, uint32_t tupleIdxToCopyInfo, uint32_t numTuplesToCopy,
static void copyTuples(DataBlock* blockToCopyFrom, ft_tuple_idx_t tupleIdxToCopyFrom,
DataBlock* blockToCopyInto, ft_tuple_idx_t tupleIdxToCopyTo, uint32_t numTuplesToCopy,
uint32_t numBytesPerTuple);

public:
Expand Down Expand Up @@ -73,7 +79,7 @@ class DataBlockCollection {
inline void append(unique_ptr<DataBlockCollection> other) { append(std::move(other->blocks)); }
inline bool isEmpty() { return blocks.empty(); }
inline vector<unique_ptr<DataBlock>>& getBlocks() { return blocks; }
inline DataBlock* getBlock(uint32_t blockIdx) { return blocks[blockIdx].get(); }
inline DataBlock* getBlock(ft_block_idx_t blockIdx) { return blocks[blockIdx].get(); }
inline uint64_t getNumBlocks() const { return blocks.size(); }

void merge(DataBlockCollection& other);
Expand Down Expand Up @@ -127,17 +133,17 @@ class FactorizedTableSchema {

void appendColumn(unique_ptr<ColumnSchema> column);

inline ColumnSchema* getColumn(uint32_t idx) const { return columns[idx].get(); }
inline ColumnSchema* getColumn(ft_col_idx_t idx) const { return columns[idx].get(); }

inline uint32_t getNumColumns() const { return columns.size(); }

inline uint32_t getNullMapOffset() const { return numBytesForDataPerTuple; }
inline ft_col_offset_t getNullMapOffset() const { return numBytesForDataPerTuple; }

inline uint32_t getNumBytesPerTuple() const { return numBytesPerTuple; }

inline uint32_t getColOffset(uint32_t idx) const { return colOffsets[idx]; }
inline ft_col_offset_t getColOffset(ft_col_idx_t idx) const { return colOffsets[idx]; }

inline void setMayContainsNullsToTrue(uint32_t idx) {
inline void setMayContainsNullsToTrue(ft_col_idx_t idx) {
assert(idx < columns.size());
columns[idx]->setMayContainsNullsToTrue();
}
Expand All @@ -156,7 +162,7 @@ class FactorizedTableSchema {
uint32_t numBytesForDataPerTuple = 0;
uint32_t numBytesForNullMapPerTuple = 0;
uint32_t numBytesPerTuple = 0;
vector<uint32_t> colOffsets;
vector<ft_col_offset_t> colOffsets;
};

class FlatTupleIterator;
Expand All @@ -177,22 +183,23 @@ class FactorizedTable {

// This function scans numTuplesToScan of rows to vectors starting at tupleIdx. Callers are
// responsible for making sure all the parameters are valid.
inline void scan(vector<shared_ptr<ValueVector>>& vectors, uint64_t tupleIdx,
inline void scan(vector<shared_ptr<ValueVector>>& vectors, ft_tuple_idx_t tupleIdx,
uint64_t numTuplesToScan) const {
vector<uint32_t> colIdxes(tableSchema->getNumColumns());
iota(colIdxes.begin(), colIdxes.end(), 0);
scan(vectors, tupleIdx, numTuplesToScan, colIdxes);
}
inline bool isEmpty() const { return getNumTuples() == 0; }
void scan(vector<shared_ptr<ValueVector>>& vectors, uint64_t tupleIdx, uint64_t numTuplesToScan,
vector<uint32_t>& colIdxToScan) const;
void scan(vector<shared_ptr<ValueVector>>& vectors, ft_tuple_idx_t tupleIdx,
uint64_t numTuplesToScan, vector<uint32_t>& colIdxToScan) const;
// TODO(Guodong): Unify these two interfaces along with `readUnflatCol`.
void lookup(vector<shared_ptr<ValueVector>>& vectors, vector<uint32_t>& colIdxesToScan,
uint8_t** tuplesToRead, uint64_t startPos, uint64_t numTuplesToRead) const;
void lookup(vector<shared_ptr<ValueVector>>& vectors, const SelectionVector* selVector,
vector<uint32_t>& colIdxesToScan, uint8_t* tupleToRead) const;
void lookup(vector<shared_ptr<ValueVector>>& vectors, vector<uint32_t>& colIdxesToScan,
vector<uint64_t>& tupleIdxesToRead, uint64_t startPos, uint64_t numTuplesToRead) const;
vector<ft_tuple_idx_t>& tupleIdxesToRead, uint64_t startPos,
uint64_t numTuplesToRead) const;

// When we merge two factorizedTables, we need to update the hasNoNullGuarantee based on
// other factorizedTable.
Expand All @@ -202,92 +209,102 @@ class FactorizedTable {
inline InMemOverflowBuffer* getInMemOverflowBuffer() const { return inMemOverflowBuffer.get(); }

bool hasUnflatCol() const;
inline bool hasUnflatCol(vector<uint32_t>& colIdxes) const {
inline bool hasUnflatCol(vector<ft_col_idx_t>& colIdxes) const {
return any_of(colIdxes.begin(), colIdxes.end(),
[this](uint64_t colIdx) { return !tableSchema->getColumn(colIdx)->isFlat(); });
[this](ft_col_idx_t colIdx) { return !tableSchema->getColumn(colIdx)->isFlat(); });
}

inline uint64_t getNumTuples() const { return numTuples; }
uint64_t getTotalNumFlatTuples() const;
uint64_t getNumFlatTuples(uint64_t tupleIdx) const;
uint64_t getNumFlatTuples(ft_tuple_idx_t tupleIdx) const;

inline vector<unique_ptr<DataBlock>>& getTupleDataBlocks() {
return flatTupleBlockCollection->getBlocks();
}
inline const FactorizedTableSchema* getTableSchema() const { return tableSchema.get(); }

template<typename TYPE>
inline TYPE getData(uint32_t blockIdx, uint32_t blockOffset, uint32_t colOffset) const {
inline TYPE getData(
ft_block_idx_t blockIdx, ft_block_offset_t blockOffset, ft_col_offset_t colOffset) const {
return *((TYPE*)getCell(blockIdx, blockOffset, colOffset));
}

uint8_t* getTuple(uint64_t tupleIdx) const;
uint8_t* getTuple(ft_tuple_idx_t tupleIdx) const;

void updateFlatCell(uint8_t* tuplePtr, uint32_t colIdx, ValueVector* valueVector, uint32_t pos);
inline void updateFlatCellNoNull(uint8_t* ftTuplePtr, uint32_t colIdx, void* dataBuf) {
void updateFlatCell(
uint8_t* tuplePtr, ft_col_idx_t colIdx, ValueVector* valueVector, uint32_t pos);
inline void updateFlatCellNoNull(uint8_t* ftTuplePtr, ft_col_idx_t colIdx, void* dataBuf) {
memcpy(ftTuplePtr + tableSchema->getColOffset(colIdx), dataBuf,
tableSchema->getColumn(colIdx)->getNumBytes());
}

inline uint64_t getNumTuplesPerBlock() const { return numTuplesPerBlock; }

inline bool hasNoNullGuarantee(uint32_t colIdx) const {
inline bool hasNoNullGuarantee(ft_col_idx_t colIdx) const {
return tableSchema->getColumn(colIdx)->hasNoNullGuarantee();
}

bool isOverflowColNull(const uint8_t* nullBuffer, uint32_t tupleIdx, uint32_t colIdx) const;
bool isNonOverflowColNull(const uint8_t* nullBuffer, uint32_t colIdx) const;
void setNonOverflowColNull(uint8_t* nullBuffer, uint32_t colIdx);
void copySingleValueToVector(ft_tuple_idx_t tupleIdx, ft_col_idx_t colIdx,
shared_ptr<ValueVector> valueVector, uint32_t posInVector) const;
bool isOverflowColNull(
const uint8_t* nullBuffer, ft_tuple_idx_t tupleIdx, ft_col_idx_t colIdx) const;
bool isNonOverflowColNull(const uint8_t* nullBuffer, ft_col_idx_t colIdx) const;
void setNonOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t colIdx);
// Note: this function also resets the overflow ptr of list and string to point to a buffer
// inside overflowFileOfInMemList.
void copyToInMemList(uint32_t colIdx, vector<uint64_t>& tupleIdxesToRead, uint8_t* data,
NullMask* nullMask, uint64_t startElemPosInList, DiskOverflowFile* overflowFileOfInMemList,
const DataType& type, NodeIDCompressionScheme* nodeIDCompressionScheme) const;
void copyToInMemList(ft_col_idx_t colIdx, vector<ft_tuple_idx_t>& tupleIdxesToRead,
uint8_t* data, NullMask* nullMask, uint64_t startElemPosInList,
DiskOverflowFile* overflowFileOfInMemList, const DataType& type,
NodeIDCompressionScheme* nodeIDCompressionScheme) const;
void clear();
int64_t findValueInFlatColumn(uint64_t colIdx, int64_t value) const;
int64_t findValueInFlatColumn(ft_col_idx_t colIdx, int64_t value) const;

private:
static bool isNull(const uint8_t* nullMapBuffer, uint32_t idx);
void setNull(uint8_t* nullBuffer, uint32_t idx);
void setOverflowColNull(uint8_t* nullBuffer, uint32_t colIdx, uint32_t tupleIdx);
static bool isNull(const uint8_t* nullMapBuffer, ft_col_idx_t idx);
void setNull(uint8_t* nullBuffer, ft_col_idx_t idx);
void setOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t colIdx, ft_tuple_idx_t tupleIdx);

uint64_t computeNumTuplesToAppend(const vector<shared_ptr<ValueVector>>& vectorsToAppend) const;

inline uint8_t* getCell(uint32_t blockIdx, uint32_t blockOffset, uint32_t colOffset) const {
inline uint8_t* getCell(
ft_block_idx_t blockIdx, ft_block_offset_t blockOffset, ft_col_offset_t colOffset) const {
return flatTupleBlockCollection->getBlock(blockIdx)->getData() +
blockOffset * tableSchema->getNumBytesPerTuple() + colOffset;
}
inline pair<uint64_t, uint64_t> getBlockIdxAndTupleIdxInBlock(uint64_t tupleIdx) const {
inline pair<ft_block_idx_t, ft_block_offset_t> getBlockIdxAndTupleIdxInBlock(
uint64_t tupleIdx) const {
return make_pair(tupleIdx / numTuplesPerBlock, tupleIdx % numTuplesPerBlock);
}

vector<BlockAppendingInfo> allocateFlatTupleBlocks(uint64_t numTuplesToAppend);
uint8_t* allocateUnflatTupleBlock(uint32_t numBytes);
void copyFlatVectorToFlatColumn(
const ValueVector& vector, const BlockAppendingInfo& blockAppendInfo, uint32_t colIdx);
const ValueVector& vector, const BlockAppendingInfo& blockAppendInfo, ft_col_idx_t colIdx);
void copyUnflatVectorToFlatColumn(const ValueVector& vector,
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples, uint32_t colIdx);
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples, ft_col_idx_t colIdx);
inline void copyVectorToFlatColumn(const ValueVector& vector,
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples, uint32_t colIdx) {
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples,
ft_col_idx_t colIdx) {
vector.state->isFlat() ?
copyFlatVectorToFlatColumn(vector, blockAppendInfo, colIdx) :
copyUnflatVectorToFlatColumn(vector, blockAppendInfo, numAppendedTuples, colIdx);
}
void copyVectorToUnflatColumn(
const ValueVector& vector, const BlockAppendingInfo& blockAppendInfo, uint32_t colIdx);
const ValueVector& vector, const BlockAppendingInfo& blockAppendInfo, ft_col_idx_t colIdx);
void copyVectorToColumn(const ValueVector& vector, const BlockAppendingInfo& blockAppendInfo,
uint64_t numAppendedTuples, uint32_t colIdx);
overflow_value_t appendVectorToUnflatTupleBlocks(const ValueVector& vector, uint32_t colIdx);
uint64_t numAppendedTuples, ft_col_idx_t colIdx);
overflow_value_t appendVectorToUnflatTupleBlocks(
const ValueVector& vector, ft_col_idx_t colIdx);

// TODO(Guodong): Unify these two `readUnflatCol()` with a (possibly templated) copy executor.
void readUnflatCol(uint8_t** tuplesToRead, uint32_t colIdx, ValueVector& vector) const;
void readUnflatCol(uint8_t** tuplesToRead, ft_col_idx_t colIdx, ValueVector& vector) const;
void readUnflatCol(const uint8_t* tupleToRead, const SelectionVector* selVector,
uint32_t colIdx, ValueVector& vector) const;
ft_col_idx_t colIdx, ValueVector& vector) const;
void readFlatColToFlatVector(
uint8_t** tuplesToRead, uint32_t colIdx, ValueVector& vector) const;
void readFlatColToUnflatVector(uint8_t** tuplesToRead, uint32_t colIdx, ValueVector& vector,
uint8_t** tuplesToRead, ft_col_idx_t colIdx, ValueVector& vector) const;
void readFlatColToUnflatVector(uint8_t** tuplesToRead, ft_col_idx_t colIdx, ValueVector& vector,
uint64_t numTuplesToRead) const;
inline void readFlatCol(uint8_t** tuplesToRead, uint32_t colIdx, ValueVector& vector,
inline void readFlatCol(uint8_t** tuplesToRead, ft_col_idx_t colIdx, ValueVector& vector,
uint64_t numTuplesToRead) const {
vector.state->isFlat() ?
readFlatColToFlatVector(tuplesToRead, colIdx, vector) :
Expand Down Expand Up @@ -329,9 +346,9 @@ class FlatTupleIterator {
->set(valueBuffer, columnDataTypes[flatTupleValIdx]);
}

void readUnflatColToFlatTuple(uint64_t flatTupleValIdx, uint8_t* valueBuffer);
void readUnflatColToFlatTuple(ft_col_idx_t colIdx, uint8_t* valueBuffer);

void readFlatColToFlatTuple(uint32_t colIdx, uint8_t* valueBuffer);
void readFlatColToFlatTuple(ft_col_idx_t colIdx, uint8_t* valueBuffer);

// We put pair(UINT64_MAX, UINT64_MAX) in all invalid entries in
// FlatTuplePositionsInDataChunk.
Expand All @@ -354,8 +371,8 @@ class FlatTupleIterator {
FactorizedTable& factorizedTable;
uint8_t* currentTupleBuffer;
uint64_t numFlatTuples;
uint32_t nextFlatTupleIdx;
uint64_t nextTupleIdx;
ft_tuple_idx_t nextFlatTupleIdx;
ft_tuple_idx_t nextTupleIdx;
// This field stores the (nextIdxToReadInDataChunk, numElementsInDataChunk) of each dataChunk.
vector<pair<uint64_t, uint64_t>> flatTuplePositionsInDataChunk;

Expand Down
Loading

0 comments on commit 29a1531

Please sign in to comment.