Skip to content

Commit

Permalink
Merge pull request #2219 from kuzudb/offset
Browse files Browse the repository at this point in the history
Change offset vector from int64 to internal_id when reading from files
  • Loading branch information
ray6080 committed Oct 16, 2023
2 parents c604332 + b07286b commit 1a3b814
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 42 deletions.
15 changes: 9 additions & 6 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(
// For table with SERIAL columns, we need to read in serial from files.
auto containsSerial = bindContainsSerial(tableSchema);
auto columns = bindExpectedNodeFileColumns(tableSchema, *readerConfig);
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto nodeID =
createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INTERNAL_ID);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::NODE);
std::move(readerConfig), std::move(columns), std::move(nodeID), TableType::NODE);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, nullptr);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
Expand All @@ -129,9 +130,10 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(
auto columns = bindExpectedRelFileColumns(tableSchema, *readerConfig);
auto srcKey = columns[0];
auto dstKey = columns[1];
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto relID =
createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INTERNAL_ID);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::REL);
std::move(readerConfig), std::move(columns), std::move(relID), TableType::REL);
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
auto srcTableSchema =
catalog.getReadOnlyVersion()->getTableSchema(relTableSchema->getSrcTableID());
Expand Down Expand Up @@ -160,10 +162,11 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(
auto subjectKey = columns[0];
auto predicateKey = columns[1];
auto objectKey = columns[2];
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto relID =
createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INTERNAL_ID);
auto containsSerial = false;
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::REL);
std::move(readerConfig), std::move(columns), std::move(relID), TableType::REL);
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
assert(relTableSchema->getSrcTableID() == relTableSchema->getDstTableID());
auto nodeTableID = relTableSchema->getSrcTableID();
Expand Down
10 changes: 5 additions & 5 deletions src/include/binder/copy/bound_file_scan_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ namespace binder {
struct BoundFileScanInfo {
std::unique_ptr<common::ReaderConfig> readerConfig;
binder::expression_vector columns;
std::shared_ptr<Expression> offset;
std::shared_ptr<Expression> internalID;

// TODO: remove the following field
common::TableType tableType;

BoundFileScanInfo(std::unique_ptr<common::ReaderConfig> readerConfig,
binder::expression_vector columns, std::shared_ptr<Expression> offset,
binder::expression_vector columns, std::shared_ptr<Expression> internalID,
common::TableType tableType)
: readerConfig{std::move(readerConfig)}, columns{std::move(columns)},
offset{std::move(offset)}, tableType{tableType} {}
internalID{std::move(internalID)}, tableType{tableType} {}
BoundFileScanInfo(const BoundFileScanInfo& other)
: readerConfig{other.readerConfig->copy()}, columns{other.columns}, offset{other.offset},
tableType{other.tableType} {}
: readerConfig{other.readerConfig->copy()}, columns{other.columns},
internalID{other.internalID}, tableType{other.tableType} {}

inline std::unique_ptr<BoundFileScanInfo> copy() const {
return std::make_unique<BoundFileScanInfo>(*this);
Expand Down
2 changes: 1 addition & 1 deletion src/include/catalog/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Property {
// TODO: these should be guarded as reserved property names.
static constexpr std::string_view REL_FROM_PROPERTY_NAME = "_FROM_";
static constexpr std::string_view REL_TO_PROPERTY_NAME = "_TO_";
static constexpr std::string_view OFFSET_NAME = "_OFFSET_";
static constexpr std::string_view INTERNAL_ID_NAME = "_ID_";
static constexpr std::string_view REL_BOUND_OFFSET_NAME = "_BOUND_OFFSET_";
static constexpr std::string_view REL_NBR_OFFSET_NAME = "_NBR_OFFSET_";

Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/persistent/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ class DirectedInMemRelData {
struct CopyRelInfo {
catalog::RelTableSchema* schema;
std::vector<DataPos> dataPoses;
DataPos offsetPos;
DataPos internalIDPos;
DataPos boundOffsetPos;
DataPos nbrOffsetPos;
storage::WAL* wal;
bool containsSerial;

CopyRelInfo(catalog::RelTableSchema* schema, std::vector<DataPos> dataPose,
const DataPos& offsetPos, const DataPos& boundOffsetPos, const DataPos& nbrOffsetPos,
const DataPos& internalIDPos, const DataPos& boundOffsetPos, const DataPos& nbrOffsetPos,
storage::WAL* wal, bool containsSerial)
: schema{schema}, dataPoses{std::move(dataPose)}, offsetPos{offsetPos},
: schema{schema}, dataPoses{std::move(dataPose)}, internalIDPos{internalIDPos},
boundOffsetPos{boundOffsetPos}, nbrOffsetPos{nbrOffsetPos}, wal{wal},
containsSerial{containsSerial} {}
};
Expand Down
12 changes: 6 additions & 6 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ namespace kuzu {
namespace processor {

struct ReaderInfo {
DataPos nodeOffsetPos;
DataPos internalIDPos;
std::vector<DataPos> dataColumnsPos;

common::TableType tableType;

ReaderInfo(const DataPos& nodeOffsetPos, std::vector<DataPos> dataColumnsPos,
ReaderInfo(const DataPos& internalIDPos, std::vector<DataPos> dataColumnsPos,
common::TableType tableType)
: nodeOffsetPos{nodeOffsetPos}, dataColumnsPos{std::move(dataColumnsPos)}, tableType{
: internalIDPos{internalIDPos}, dataColumnsPos{std::move(dataColumnsPos)}, tableType{
tableType} {}
ReaderInfo(const ReaderInfo& other)
: nodeOffsetPos{other.nodeOffsetPos},
: internalIDPos{other.internalIDPos},
dataColumnsPos{other.dataColumnsPos}, tableType{other.tableType} {}

inline uint32_t getNumColumns() const { return dataColumnsPos.size(); }
Expand All @@ -31,7 +31,7 @@ class Reader : public PhysicalOperator {
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::READER, id, paramsString}, info{std::move(info)},
sharedState{std::move(sharedState)}, dataChunk{nullptr},
offsetVector{nullptr}, readFunc{nullptr}, initFunc{nullptr}, readFuncData{nullptr} {}
internalIDVector{nullptr}, readFunc{nullptr}, initFunc{nullptr}, readFuncData{nullptr} {}

inline bool isSource() const final { return true; }
inline bool canParallel() const final {
Expand Down Expand Up @@ -76,7 +76,7 @@ class Reader : public PhysicalOperator {
LeftArrowArrays leftArrowArrays;

std::unique_ptr<common::DataChunk> dataChunk;
common::ValueVector* offsetVector;
common::ValueVector* internalIDVector;

read_rows_func_t readFunc;
init_reader_data_func_t initFunc;
Expand Down
8 changes: 4 additions & 4 deletions src/planner/operator/scan/logical_scan_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ void LogicalScanFile::computeFactorizedSchema() {
createEmptySchema();
auto groupPos = schema->createGroup();
schema->insertToGroupAndScope(info->columns, groupPos);
if (info->offset != nullptr) {
schema->insertToGroupAndScope(info->offset, groupPos);
if (info->internalID != nullptr) {
schema->insertToGroupAndScope(info->internalID, groupPos);
}
}

void LogicalScanFile::computeFlatSchema() {
createEmptySchema();
schema->createGroup();
schema->insertToGroupAndScope(info->columns, 0);
if (info->offset != nullptr) {
schema->insertToGroupAndScope(info->offset, 0);
if (info->internalID != nullptr) {
schema->insertToGroupAndScope(info->internalID, 0);
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRelColumnsOrLists(
assert(prevOperator->getChild(0)->getOperatorType() == PhysicalOperatorType::READER);
auto reader = (Reader*)prevOperator->getChild(0);
auto tableSchema = reinterpret_cast<RelTableSchema*>(copyFromInfo->tableSchema);
auto offsetDataPos = DataPos{outFSchema->getExpressionPos(*copyFromInfo->fileScanInfo->offset)};
auto internalIDDataPos =
DataPos{outFSchema->getExpressionPos(*copyFromInfo->fileScanInfo->internalID)};
DataPos srcOffsetPos;
DataPos dstOffsetPos;
std::vector<DataPos> dataColumnPositions;
Expand All @@ -85,7 +86,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRelColumnsOrLists(
dstOffsetPos = DataPos{outFSchema->getExpressionPos(*extraInfo->dstOffset)};
dataColumnPositions = reader->getReaderInfo()->dataColumnsPos;
}
CopyRelInfo copyRelInfo{tableSchema, dataColumnPositions, offsetDataPos, srcOffsetPos,
CopyRelInfo copyRelInfo{tableSchema, dataColumnPositions, internalIDDataPos, srcOffsetPos,
dstOffsetPos, storageManager.getWAL(), copyFromInfo->containsSerial};
if (isColumns) {
return std::make_unique<CopyRelColumns>(copyRelInfo, std::move(sharedState),
Expand Down
8 changes: 4 additions & 4 deletions src/processor/map/map_scan_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapScanFile(LogicalOperator* logic
for (auto& expression : info->columns) {
dataColumnsPos.emplace_back(outSchema->getExpressionPos(*expression));
}
auto offsetPos = DataPos{};
if (info->offset != nullptr) {
offsetPos = DataPos(outSchema->getExpressionPos(*info->offset));
auto internalIDPos = DataPos{};
if (info->internalID != nullptr) {
internalIDPos = DataPos(outSchema->getExpressionPos(*info->internalID));
}
auto readInfo = std::make_unique<ReaderInfo>(offsetPos, dataColumnsPos, info->tableType);
auto readInfo = std::make_unique<ReaderInfo>(internalIDPos, dataColumnsPos, info->tableType);
return std::make_unique<Reader>(std::move(readInfo), readerSharedState, getOperatorID(),
logicalOperator->getExpressionsForPrinting());
}
Expand Down
8 changes: 5 additions & 3 deletions src/processor/operator/persistent/copy_rel_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ row_idx_t CopyRelColumns::copyRelColumns(RelDataDirection relDirection,
arrow::AllocateBuffer((int64_t)(numRowsInChunk * sizeof(offset_t)))
.Value(&relIDBuffer));
auto relOffsets = (offset_t*)relIDBuffer->data();
auto offsetValueVector = resultSet->getValueVector(info.offsetPos);
auto startRowIdx = offsetValueVector->getValue<offset_t>(
offsetValueVector->state->selVector->selectedPositions[0]);
auto internalIDValueVector = resultSet->getValueVector(info.internalIDPos);
auto startRowIdx = internalIDValueVector
->getValue<internalID_t>(
internalIDValueVector->state->selVector->selectedPositions[0])
.offset;
for (auto rowIdx = 0u; rowIdx < numRowsInChunk; rowIdx++) {
relOffsets[rowIdx] = startRowIdx + rowIdx;
}
Expand Down
8 changes: 5 additions & 3 deletions src/processor/operator/persistent/copy_rel_lists.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ void CopyRelLists::copyRelLists(DirectedInMemRelData* relData, const DataPos& bo
TableCopyUtils::throwCopyExceptionIfNotOK(
arrow::AllocateBuffer((int64_t)(numRows * sizeof(offset_t))).Value(&relIDBuffer));
auto relOffsets = (offset_t*)relIDBuffer->data();
auto offsetValueVector = resultSet->getValueVector(info.offsetPos);
auto startRowIdx = offsetValueVector->getValue<offset_t>(
offsetValueVector->state->selVector->selectedPositions[0]);
auto internalIDValueVector = resultSet->getValueVector(info.internalIDPos);
auto startRowIdx = internalIDValueVector
->getValue<internalID_t>(
internalIDValueVector->state->selVector->selectedPositions[0])
.offset;
for (auto rowIdx = 0u; rowIdx < numRows; rowIdx++) {
relOffsets[rowIdx] = startRowIdx + rowIdx;
}
Expand Down
12 changes: 7 additions & 5 deletions src/processor/operator/persistent/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ void Reader::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont
}
initFunc = ReaderFunctions::getInitDataFunc(*sharedState->readerConfig, info->tableType);
readFunc = ReaderFunctions::getReadRowsFunc(*sharedState->readerConfig, info->tableType);
if (info->nodeOffsetPos.dataChunkPos != INVALID_DATA_CHUNK_POS) {
offsetVector = resultSet->getValueVector(info->nodeOffsetPos).get();
if (info->internalIDPos.dataChunkPos != INVALID_DATA_CHUNK_POS) {
internalIDVector = resultSet->getValueVector(info->internalIDPos).get();
}
assert(!sharedState->readerConfig->filePaths.empty());
if (sharedState->readerConfig->fileType == FileType::CSV &&
Expand Down Expand Up @@ -54,9 +54,11 @@ void Reader::readNextDataChunk() {
std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY);
leftArrowArrays.appendToDataChunk(dataChunk.get(), numLeftToAppend);
auto currRowIdx = sharedState->currRowIdx.fetch_add(numLeftToAppend);
if (offsetVector != nullptr) {
offsetVector->setValue(
offsetVector->state->selVector->selectedPositions[0], currRowIdx);
if (internalIDVector != nullptr) {
internalIDVector
->getValue<internalID_t>(
internalIDVector->state->selVector->selectedPositions[0])
.offset = currRowIdx;
}
break;
}
Expand Down

0 comments on commit 1a3b814

Please sign in to comment.