Skip to content

Commit

Permalink
Merge pull request #1351 from kuzudb/ft-interface-refactor
Browse files Browse the repository at this point in the history
Refactor FT interface
  • Loading branch information
acquamarin committed Mar 7, 2023
2 parents fb9a96d + ca38b0d commit 02c4f33
Show file tree
Hide file tree
Showing 64 changed files with 379 additions and 461 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class AggregateHashTable : public BaseHashTable {
bool hasStrCol = false;
// Temporary arrays to hold intermediate results.
std::shared_ptr<common::DataChunkState> hashState;
std::shared_ptr<common::ValueVector> hashVector;
std::unique_ptr<common::ValueVector> hashVector;
std::unique_ptr<HashSlot*[]> hashSlotsToUpdateAggState;
std::unique_ptr<uint64_t[]> tmpValueIdxes;
std::unique_ptr<uint64_t[]> entryIdxesToInitialize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class HashAggregateScan : public BaseAggregateScan {
private:
std::vector<DataPos> groupByKeyVectorsPos;
std::vector<common::DataType> groupByKeyVectorDataTypes;
std::vector<std::shared_ptr<common::ValueVector>> groupByKeyVectors;
std::vector<common::ValueVector*> groupByKeyVectors;
std::shared_ptr<HashAggregateSharedState> sharedState;
std::vector<uint32_t> groupByKeyVectorsColIdxes;
};
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/cross_product.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class CrossProduct : public PhysicalOperator {
std::vector<uint32_t> colIndicesToScan;

uint64_t startIdx = 0u;
std::vector<std::shared_ptr<common::ValueVector>> vectorsToScan;
std::vector<common::ValueVector*> vectorsToScan;
};

} // namespace processor
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/hash_join/hash_join_build.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class HashJoinBuild : public Sink {
protected:
std::shared_ptr<HashJoinSharedState> sharedState;
BuildDataInfo buildDataInfo;
std::vector<std::shared_ptr<common::ValueVector>> vectorsToAppend;
std::vector<common::ValueVector*> vectorsToAppend;
std::unique_ptr<JoinHashTable> hashTable;
};

Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/hash_join/hash_join_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class HashJoinProbe : public PhysicalOperator, SelVectorOverWriter {
common::JoinType joinType;

ProbeDataInfo probeDataInfo;
std::vector<std::shared_ptr<common::ValueVector>> vectorsToReadInto;
std::vector<common::ValueVector*> vectorsToReadInto;
std::vector<uint32_t> columnIdxsToReadFrom;
std::vector<std::shared_ptr<common::ValueVector>> keyVectors;
std::vector<common::ValueVector*> keyVectors;
std::shared_ptr<common::ValueVector> markVector;
std::unique_ptr<ProbeState> probeState;
};
Expand Down
9 changes: 4 additions & 5 deletions src/include/processor/operator/hash_join/join_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ class JoinHashTable : public BaseHashTable {

virtual ~JoinHashTable() = default;

virtual void append(const std::vector<std::shared_ptr<common::ValueVector>>& vectorsToAppend);
virtual void append(const std::vector<common::ValueVector*>& vectorsToAppend);
void allocateHashSlots(uint64_t numTuples);
void buildHashSlots();
void probe(const std::vector<std::shared_ptr<common::ValueVector>>& keyVectors,
uint8_t** probedTuples);
void probe(const std::vector<common::ValueVector*>& keyVectors, uint8_t** probedTuples);

inline void lookup(std::vector<std::shared_ptr<common::ValueVector>>& vectors,
inline void lookup(std::vector<common::ValueVector*>& vectors,
std::vector<uint32_t>& colIdxesToScan, uint8_t** tuplesToRead, uint64_t startPos,
uint64_t numTuplesToRead) {
factorizedTable->lookup(vectors, colIdxesToScan, tuplesToRead, startPos, numTuplesToRead);
Expand All @@ -48,7 +47,7 @@ class JoinHashTable : public BaseHashTable {

// This function returns a boolean flag indicating if there is non-null keys after discarding.
static bool discardNullFromKeys(
const std::vector<std::shared_ptr<common::ValueVector>>& vectors, uint32_t numKeyVectors);
const std::vector<common::ValueVector*>& vectors, uint32_t numKeyVectors);

private:
uint64_t numKeyColumns;
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/intersect/intersect.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class Intersect : public PhysicalOperator {
std::vector<IntersectDataInfo> intersectDataInfos;
// payloadColumnIdxesToScanFrom and payloadVectorsToScanInto are organized by each build child.
std::vector<std::vector<uint32_t>> payloadColumnIdxesToScanFrom;
std::vector<std::vector<std::shared_ptr<common::ValueVector>>> payloadVectorsToScanInto;
std::vector<std::vector<common::ValueVector*>> payloadVectorsToScanInto;
std::shared_ptr<common::ValueVector> outKeyVector;
std::vector<std::shared_ptr<common::ValueVector>> probeKeyVectors;
std::vector<std::unique_ptr<common::SelectionVector>> intersectSelVectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class IntersectHashTable : public JoinHashTable {
storage::MemoryManager& memoryManager, std::unique_ptr<FactorizedTableSchema> tableSchema)
: JoinHashTable{memoryManager, 1 /* numKeyColumns */, std::move(tableSchema)} {}

void append(const std::vector<std::shared_ptr<common::ValueVector>>& vectorsToAppend) override;
void append(const std::vector<common::ValueVector*>& vectorsToAppend) override;
};

} // namespace processor
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/order_by/order_by.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ class OrderBy : public Sink {
OrderByDataInfo orderByDataInfo;
std::unique_ptr<OrderByKeyEncoder> orderByKeyEncoder;
std::unique_ptr<RadixSort> radixSorter;
std::vector<std::shared_ptr<common::ValueVector>> keyVectors;
std::vector<std::shared_ptr<common::ValueVector>> vectorsToAppend;
std::vector<common::ValueVector*> keyVectors;
std::vector<common::ValueVector*> vectorsToAppend;
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
std::shared_ptr<FactorizedTable> localFactorizedTable;
};
Expand Down
17 changes: 9 additions & 8 deletions src/include/processor/operator/order_by/order_by_key_encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ using encode_function_t = std::function<void(const uint8_t*, uint8_t*, bool)>;
class OrderByKeyEncoder {

public:
OrderByKeyEncoder(std::vector<std::shared_ptr<common::ValueVector>>& orderByVectors,
OrderByKeyEncoder(std::vector<common::ValueVector*>& orderByVectors,
std::vector<bool>& isAscOrder, storage::MemoryManager* memoryManager, uint8_t ftIdx,
uint32_t numTuplesPerBlockInFT, uint32_t numBytesPerTuple);

Expand All @@ -57,6 +57,8 @@ class OrderByKeyEncoder {

inline uint32_t getNumTuplesInCurBlock() const { return keyBlocks.back()->numTuples; }

static uint32_t getNumBytesPerTuple(const std::vector<common::ValueVector*>& keyVectors);

static inline uint32_t getEncodedFTBlockIdx(const uint8_t* tupleInfoPtr) {
return *(uint32_t*)tupleInfoPtr;
}
Expand Down Expand Up @@ -104,14 +106,13 @@ class OrderByKeyEncoder {
void flipBytesIfNecessary(
uint32_t keyColIdx, uint8_t* tuplePtr, uint32_t numEntriesToEncode, common::DataType& type);

void encodeFlatVector(
std::shared_ptr<common::ValueVector> vector, uint8_t* tuplePtr, uint32_t keyColIdx);
void encodeFlatVector(common::ValueVector* vector, uint8_t* tuplePtr, uint32_t keyColIdx);

void encodeUnflatVector(std::shared_ptr<common::ValueVector> vector, uint8_t* tuplePtr,
uint32_t encodedTuples, uint32_t numEntriesToEncode, uint32_t keyColIdx);
void encodeUnflatVector(common ::ValueVector* vector, uint8_t* tuplePtr, uint32_t encodedTuples,
uint32_t numEntriesToEncode, uint32_t keyColIdx);

void encodeVector(std::shared_ptr<common::ValueVector> vector, uint8_t* tuplePtr,
uint32_t encodedTuples, uint32_t numEntriesToEncode, uint32_t keyColIdx);
void encodeVector(common::ValueVector* vector, uint8_t* tuplePtr, uint32_t encodedTuples,
uint32_t numEntriesToEncode, uint32_t keyColIdx);

void encodeFTIdx(uint32_t numEntriesToEncode, uint8_t* tupleInfoPtr);

Expand All @@ -122,7 +123,7 @@ class OrderByKeyEncoder {
private:
storage::MemoryManager* memoryManager;
std::vector<std::shared_ptr<DataBlock>> keyBlocks;
std::vector<std::shared_ptr<common::ValueVector>>& orderByVectors;
std::vector<common::ValueVector*>& orderByVectors;
std::vector<bool> isAscOrder;
uint32_t numBytesPerTuple;
uint32_t maxNumTuplesPerBlock;
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/order_by/order_by_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class OrderByScan : public PhysicalOperator {
private:
std::vector<DataPos> outVectorPos;
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
std::vector<std::shared_ptr<common::ValueVector>> vectorsToRead;
std::vector<common::ValueVector*> vectorsToRead;
std::unique_ptr<MergedKeyBlockScanState> mergedKeyBlockScanState;
};

Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/result_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ResultCollector : public Sink {
private:
std::vector<std::pair<DataPos, common::DataType>> payloadsPosAndType;
std::vector<bool> isPayloadFlat;
std::vector<std::shared_ptr<common::ValueVector>> vectorsToCollect;
std::vector<common::ValueVector*> vectorsToCollect;
std::shared_ptr<FTableSharedState> sharedState;
std::unique_ptr<FactorizedTable> localTable;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ class RelTableCollection {
void resetState();
inline uint32_t getNumTablesInCollection() { return tables.size(); }

bool scan(const std::shared_ptr<common::ValueVector>& inVector,
std::vector<std::shared_ptr<common::ValueVector>>& outputVectors,
bool scan(common::ValueVector* inVector, const std::vector<common::ValueVector*>& outputVectors,
transaction::Transaction* transaction);

std::unique_ptr<RelTableCollection> clone() const;
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/scan/scan_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class ScanColumns : public PhysicalOperator {

protected:
DataPos inputNodeIDVectorPos;
std::shared_ptr<common::ValueVector> inputNodeIDVector;
common::ValueVector* inputNodeIDVector;
std::vector<DataPos> outPropertyVectorsPos;
std::vector<std::shared_ptr<common::ValueVector>> outPropertyVectors;
std::vector<common::ValueVector*> outPropertyVectors;
};

} // namespace processor
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/scan/scan_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class ScanRelTable : public PhysicalOperator {
DataPos inNodeIDVectorPos;
std::vector<DataPos> outputVectorsPos;
// vectors
std::shared_ptr<common::ValueVector> inNodeIDVector;
std::vector<std::shared_ptr<common::ValueVector>> outputVectors;
common::ValueVector* inNodeIDVector;
std::vector<common::ValueVector*> outputVectors;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BaseTableScan : public PhysicalOperator {
std::vector<DataPos> outVecPositions;
std::vector<uint32_t> colIndicesToScan;

std::vector<std::shared_ptr<common::ValueVector>> vectorsToScan;
std::vector<common::ValueVector*> vectorsToScan;
};

} // namespace processor
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/update/create.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ class CreateRel : public PhysicalOperator {

private:
struct CreateRelVectors {
std::shared_ptr<common::ValueVector> srcNodeIDVector;
std::shared_ptr<common::ValueVector> dstNodeIDVector;
std::vector<std::shared_ptr<common::ValueVector>> propertyVectors;
common::ValueVector* srcNodeIDVector;
common::ValueVector* dstNodeIDVector;
std::vector<common::ValueVector*> propertyVectors;
};

private:
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/update/delete.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ class DeleteRel : public PhysicalOperator {
private:
storage::RelsStatistics& relsStatistics;
std::vector<std::unique_ptr<DeleteRelInfo>> deleteRelInfos;
std::vector<std::shared_ptr<common::ValueVector>> srcNodeVectors;
std::vector<std::shared_ptr<common::ValueVector>> dstNodeVectors;
std::vector<std::shared_ptr<common::ValueVector>> relIDVectors;
std::vector<common::ValueVector*> srcNodeVectors;
std::vector<common::ValueVector*> dstNodeVectors;
std::vector<common::ValueVector*> relIDVectors;
};

} // namespace processor
Expand Down
13 changes: 6 additions & 7 deletions src/include/processor/operator/update/set.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,19 @@ class SetNodeProperty : public PhysicalOperator {
private:
std::vector<std::unique_ptr<SetNodePropertyInfo>> infos;

std::vector<std::shared_ptr<common::ValueVector>> nodeIDVectors;
std::vector<common::ValueVector*> nodeIDVectors;
};

struct SetRelPropertyInfo {
storage::RelTable* table;
DataPos srcNodePos;
DataPos dstNodePos;
DataPos relIDPos;
// TODO(Ziyi): see issue 1122 and do a typedef on our column & list idx.
uint64_t propertyId;
common::property_id_t propertyId;
std::unique_ptr<evaluator::BaseExpressionEvaluator> evaluator;

SetRelPropertyInfo(storage::RelTable* table, const DataPos& srcNodePos,
const DataPos& dstNodePos, const DataPos& relIDPos, uint64_t propertyId,
const DataPos& dstNodePos, const DataPos& relIDPos, common::property_id_t propertyId,
std::unique_ptr<evaluator::BaseExpressionEvaluator> evaluator)
: table{table}, srcNodePos{srcNodePos}, dstNodePos{dstNodePos}, relIDPos{relIDPos},
propertyId{propertyId}, evaluator{std::move(evaluator)} {}
Expand Down Expand Up @@ -84,9 +83,9 @@ class SetRelProperty : public PhysicalOperator {
private:
std::vector<std::unique_ptr<SetRelPropertyInfo>> infos;

std::vector<std::shared_ptr<common::ValueVector>> srcNodeVectors;
std::vector<std::shared_ptr<common::ValueVector>> dstNodeVectors;
std::vector<std::shared_ptr<common::ValueVector>> relIDVectors;
std::vector<common::ValueVector*> srcNodeVectors;
std::vector<common::ValueVector*> dstNodeVectors;
std::vector<common::ValueVector*> relIDVectors;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ class VarLengthColumnExtend : public VarLengthExtend {
// This function resets the dfsLevelInfo at level and adds the dfsLevelInfo to the
// dfsStack if the parent has adjacent nodes. The function returns true if the
// parent has adjacent nodes, otherwise returns false.
bool addDFSLevelToStackIfParentExtends(
std::shared_ptr<common::ValueVector>& parentValueVector, uint8_t level);
bool addDFSLevelToStackIfParentExtends(common::ValueVector* parentValueVector, uint8_t level);
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ namespace processor {

struct DFSLevelInfo {
DFSLevelInfo(uint8_t level, ExecutionContext& context)
: level{level}, hasBeenOutput{false}, children{std::make_shared<common::ValueVector>(
: level{level}, hasBeenOutput{false}, children{std::make_unique<common::ValueVector>(
common::INTERNAL_ID, context.memoryManager)} {};
const uint8_t level;
bool hasBeenOutput;
std::shared_ptr<common::ValueVector> children;
std::unique_ptr<common::ValueVector> children;
};

class VarLengthExtend : public PhysicalOperator {
Expand All @@ -36,8 +36,8 @@ class VarLengthExtend : public PhysicalOperator {
storage::BaseColumnOrList* storage;
uint8_t lowerBound;
uint8_t upperBound;
std::shared_ptr<common::ValueVector> boundNodeValueVector;
std::shared_ptr<common::ValueVector> nbrNodeValueVector;
common::ValueVector* boundNodeValueVector;
common::ValueVector* nbrNodeValueVector;
std::stack<std::shared_ptr<DFSLevelInfo>> dfsStack;
// The VarLenExtend has the invariant that at any point in time, there will be one DSFLevelInfo
// in the dfsStack for each level. dfsLevelInfos is a pool of DFSLevelInfos that holds
Expand Down
25 changes: 12 additions & 13 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,33 +179,32 @@ class FactorizedTable {
FactorizedTable(
storage::MemoryManager* memoryManager, std::unique_ptr<FactorizedTableSchema> tableSchema);

void append(const std::vector<std::shared_ptr<common::ValueVector>>& vectors);
void append(const std::vector<common::ValueVector*>& vectors);

//! This function appends an empty tuple to the factorizedTable and returns a pointer to that
//! tuple.
uint8_t* appendEmptyTuple();

// This function scans numTuplesToScan of rows to vectors starting at tupleIdx. Callers are
// responsible for making sure all the parameters are valid.
inline void scan(std::vector<std::shared_ptr<common::ValueVector>>& vectors,
ft_tuple_idx_t tupleIdx, uint64_t numTuplesToScan) const {
inline void scan(std::vector<common::ValueVector*>& vectors, ft_tuple_idx_t tupleIdx,
uint64_t numTuplesToScan) const {
std::vector<uint32_t> colIdxes(tableSchema->getNumColumns());
iota(colIdxes.begin(), colIdxes.end(), 0);
scan(vectors, tupleIdx, numTuplesToScan, colIdxes);
}
inline bool isEmpty() const { return getNumTuples() == 0; }
void scan(std::vector<std::shared_ptr<common::ValueVector>>& vectors, ft_tuple_idx_t tupleIdx,
void scan(std::vector<common::ValueVector*>& vectors, ft_tuple_idx_t tupleIdx,
uint64_t numTuplesToScan, std::vector<uint32_t>& colIdxToScan) const;
// TODO(Guodong): Unify these two interfaces along with `readUnflatCol`.
void lookup(std::vector<std::shared_ptr<common::ValueVector>>& vectors,
std::vector<uint32_t>& colIdxesToScan, uint8_t** tuplesToRead, uint64_t startPos,
uint64_t numTuplesToRead) const;
void lookup(std::vector<std::shared_ptr<common::ValueVector>>& vectors,
void lookup(std::vector<common::ValueVector*>& vectors, std::vector<uint32_t>& colIdxesToScan,
uint8_t** tuplesToRead, uint64_t startPos, uint64_t numTuplesToRead) const;
void lookup(std::vector<common::ValueVector*>& vectors,
const common::SelectionVector* selVector, std::vector<uint32_t>& colIdxesToScan,
uint8_t* tupleToRead) const;
void lookup(std::vector<std::shared_ptr<common::ValueVector>>& vectors,
std::vector<uint32_t>& colIdxesToScan, std::vector<ft_tuple_idx_t>& tupleIdxesToRead,
uint64_t startPos, uint64_t numTuplesToRead) const;
void lookup(std::vector<common::ValueVector*>& vectors, std::vector<uint32_t>& colIdxesToScan,
std::vector<ft_tuple_idx_t>& tupleIdxesToRead, uint64_t startPos,
uint64_t numTuplesToRead) const;

// When we merge two factorizedTables, we need to update the hasNoNullGuarantee based on
// other factorizedTable.
Expand Down Expand Up @@ -253,7 +252,7 @@ class FactorizedTable {
}

void copySingleValueToVector(ft_tuple_idx_t tupleIdx, ft_col_idx_t colIdx,
std::shared_ptr<common::ValueVector> valueVector, uint32_t posInVector) const;
common::ValueVector* valueVector, uint32_t posInVector) const;
bool isOverflowColNull(
const uint8_t* nullBuffer, ft_tuple_idx_t tupleIdx, ft_col_idx_t colIdx) const;
bool isNonOverflowColNull(const uint8_t* nullBuffer, ft_col_idx_t colIdx) const;
Expand All @@ -272,7 +271,7 @@ class FactorizedTable {
void setOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t colIdx, ft_tuple_idx_t tupleIdx);

uint64_t computeNumTuplesToAppend(
const std::vector<std::shared_ptr<common::ValueVector>>& vectorsToAppend) const;
const std::vector<common::ValueVector*>& vectorsToAppend) const;

inline uint8_t* getCell(
ft_block_idx_t blockIdx, ft_block_offset_t blockOffset, ft_col_offset_t colOffset) const {
Expand Down
Loading

0 comments on commit 02c4f33

Please sign in to comment.