Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change offset vector from int64 to internal_id when reading from files #2219

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(
// For table with SERIAL columns, we need to read in serial from files.
auto containsSerial = bindContainsSerial(tableSchema);
auto columns = bindExpectedNodeFileColumns(tableSchema, *readerConfig);
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto nodeID =
createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INTERNAL_ID);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::NODE);
std::move(readerConfig), std::move(columns), std::move(nodeID), TableType::NODE);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, nullptr);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
Expand All @@ -129,9 +130,10 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(
auto columns = bindExpectedRelFileColumns(tableSchema, *readerConfig);
auto srcKey = columns[0];
auto dstKey = columns[1];
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto relID =
createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INTERNAL_ID);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::REL);
std::move(readerConfig), std::move(columns), std::move(relID), TableType::REL);
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
auto srcTableSchema =
catalog.getReadOnlyVersion()->getTableSchema(relTableSchema->getSrcTableID());
Expand Down Expand Up @@ -160,10 +162,11 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(
auto subjectKey = columns[0];
auto predicateKey = columns[1];
auto objectKey = columns[2];
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto relID =
createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INTERNAL_ID);
auto containsSerial = false;
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::REL);
std::move(readerConfig), std::move(columns), std::move(relID), TableType::REL);
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
assert(relTableSchema->getSrcTableID() == relTableSchema->getDstTableID());
auto nodeTableID = relTableSchema->getSrcTableID();
Expand Down
10 changes: 5 additions & 5 deletions src/include/binder/copy/bound_file_scan_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ namespace binder {
struct BoundFileScanInfo {
std::unique_ptr<common::ReaderConfig> readerConfig;
binder::expression_vector columns;
std::shared_ptr<Expression> offset;
std::shared_ptr<Expression> internalID;

// TODO: remove the following field
common::TableType tableType;

BoundFileScanInfo(std::unique_ptr<common::ReaderConfig> readerConfig,
binder::expression_vector columns, std::shared_ptr<Expression> offset,
binder::expression_vector columns, std::shared_ptr<Expression> internalID,
common::TableType tableType)
: readerConfig{std::move(readerConfig)}, columns{std::move(columns)},
offset{std::move(offset)}, tableType{tableType} {}
internalID{std::move(internalID)}, tableType{tableType} {}
BoundFileScanInfo(const BoundFileScanInfo& other)
: readerConfig{other.readerConfig->copy()}, columns{other.columns}, offset{other.offset},
tableType{other.tableType} {}
: readerConfig{other.readerConfig->copy()}, columns{other.columns},
internalID{other.internalID}, tableType{other.tableType} {}

inline std::unique_ptr<BoundFileScanInfo> copy() const {
return std::make_unique<BoundFileScanInfo>(*this);
Expand Down
2 changes: 1 addition & 1 deletion src/include/catalog/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Property {
// TODO: these should be guarded as reserved property names.
static constexpr std::string_view REL_FROM_PROPERTY_NAME = "_FROM_";
static constexpr std::string_view REL_TO_PROPERTY_NAME = "_TO_";
static constexpr std::string_view OFFSET_NAME = "_OFFSET_";
static constexpr std::string_view INTERNAL_ID_NAME = "_ID_";
static constexpr std::string_view REL_BOUND_OFFSET_NAME = "_BOUND_OFFSET_";
static constexpr std::string_view REL_NBR_OFFSET_NAME = "_NBR_OFFSET_";

Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/persistent/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ class DirectedInMemRelData {
struct CopyRelInfo {
catalog::RelTableSchema* schema;
std::vector<DataPos> dataPoses;
DataPos offsetPos;
DataPos internalIDPos;
DataPos boundOffsetPos;
DataPos nbrOffsetPos;
storage::WAL* wal;
bool containsSerial;

CopyRelInfo(catalog::RelTableSchema* schema, std::vector<DataPos> dataPose,
const DataPos& offsetPos, const DataPos& boundOffsetPos, const DataPos& nbrOffsetPos,
const DataPos& internalIDPos, const DataPos& boundOffsetPos, const DataPos& nbrOffsetPos,
storage::WAL* wal, bool containsSerial)
: schema{schema}, dataPoses{std::move(dataPose)}, offsetPos{offsetPos},
: schema{schema}, dataPoses{std::move(dataPose)}, internalIDPos{internalIDPos},
boundOffsetPos{boundOffsetPos}, nbrOffsetPos{nbrOffsetPos}, wal{wal},
containsSerial{containsSerial} {}
};
Expand Down
12 changes: 6 additions & 6 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ namespace kuzu {
namespace processor {

struct ReaderInfo {
DataPos nodeOffsetPos;
DataPos internalIDPos;
std::vector<DataPos> dataColumnsPos;

common::TableType tableType;

ReaderInfo(const DataPos& nodeOffsetPos, std::vector<DataPos> dataColumnsPos,
ReaderInfo(const DataPos& internalIDPos, std::vector<DataPos> dataColumnsPos,
common::TableType tableType)
: nodeOffsetPos{nodeOffsetPos}, dataColumnsPos{std::move(dataColumnsPos)}, tableType{
: internalIDPos{internalIDPos}, dataColumnsPos{std::move(dataColumnsPos)}, tableType{
tableType} {}
ReaderInfo(const ReaderInfo& other)
: nodeOffsetPos{other.nodeOffsetPos},
: internalIDPos{other.internalIDPos},
dataColumnsPos{other.dataColumnsPos}, tableType{other.tableType} {}

inline uint32_t getNumColumns() const { return dataColumnsPos.size(); }
Expand All @@ -31,7 +31,7 @@ class Reader : public PhysicalOperator {
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::READER, id, paramsString}, info{std::move(info)},
sharedState{std::move(sharedState)}, dataChunk{nullptr},
offsetVector{nullptr}, readFunc{nullptr}, initFunc{nullptr}, readFuncData{nullptr} {}
internalIDVector{nullptr}, readFunc{nullptr}, initFunc{nullptr}, readFuncData{nullptr} {}

inline bool isSource() const final { return true; }
inline bool canParallel() const final {
Expand Down Expand Up @@ -76,7 +76,7 @@ class Reader : public PhysicalOperator {
LeftArrowArrays leftArrowArrays;

std::unique_ptr<common::DataChunk> dataChunk;
common::ValueVector* offsetVector;
common::ValueVector* internalIDVector;

read_rows_func_t readFunc;
init_reader_data_func_t initFunc;
Expand Down
8 changes: 4 additions & 4 deletions src/planner/operator/scan/logical_scan_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ void LogicalScanFile::computeFactorizedSchema() {
createEmptySchema();
auto groupPos = schema->createGroup();
schema->insertToGroupAndScope(info->columns, groupPos);
if (info->offset != nullptr) {
schema->insertToGroupAndScope(info->offset, groupPos);
if (info->internalID != nullptr) {
schema->insertToGroupAndScope(info->internalID, groupPos);
}
}

void LogicalScanFile::computeFlatSchema() {
createEmptySchema();
schema->createGroup();
schema->insertToGroupAndScope(info->columns, 0);
if (info->offset != nullptr) {
schema->insertToGroupAndScope(info->offset, 0);
if (info->internalID != nullptr) {
schema->insertToGroupAndScope(info->internalID, 0);
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRelColumnsOrLists(
assert(prevOperator->getChild(0)->getOperatorType() == PhysicalOperatorType::READER);
auto reader = (Reader*)prevOperator->getChild(0);
auto tableSchema = reinterpret_cast<RelTableSchema*>(copyFromInfo->tableSchema);
auto offsetDataPos = DataPos{outFSchema->getExpressionPos(*copyFromInfo->fileScanInfo->offset)};
auto internalIDDataPos =
DataPos{outFSchema->getExpressionPos(*copyFromInfo->fileScanInfo->internalID)};
DataPos srcOffsetPos;
DataPos dstOffsetPos;
std::vector<DataPos> dataColumnPositions;
Expand All @@ -85,7 +86,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRelColumnsOrLists(
dstOffsetPos = DataPos{outFSchema->getExpressionPos(*extraInfo->dstOffset)};
dataColumnPositions = reader->getReaderInfo()->dataColumnsPos;
}
CopyRelInfo copyRelInfo{tableSchema, dataColumnPositions, offsetDataPos, srcOffsetPos,
CopyRelInfo copyRelInfo{tableSchema, dataColumnPositions, internalIDDataPos, srcOffsetPos,
dstOffsetPos, storageManager.getWAL(), copyFromInfo->containsSerial};
if (isColumns) {
return std::make_unique<CopyRelColumns>(copyRelInfo, std::move(sharedState),
Expand Down
8 changes: 4 additions & 4 deletions src/processor/map/map_scan_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapScanFile(LogicalOperator* logic
for (auto& expression : info->columns) {
dataColumnsPos.emplace_back(outSchema->getExpressionPos(*expression));
}
auto offsetPos = DataPos{};
if (info->offset != nullptr) {
offsetPos = DataPos(outSchema->getExpressionPos(*info->offset));
auto internalIDPos = DataPos{};
if (info->internalID != nullptr) {
internalIDPos = DataPos(outSchema->getExpressionPos(*info->internalID));
}
auto readInfo = std::make_unique<ReaderInfo>(offsetPos, dataColumnsPos, info->tableType);
auto readInfo = std::make_unique<ReaderInfo>(internalIDPos, dataColumnsPos, info->tableType);
return std::make_unique<Reader>(std::move(readInfo), readerSharedState, getOperatorID(),
logicalOperator->getExpressionsForPrinting());
}
Expand Down
8 changes: 5 additions & 3 deletions src/processor/operator/persistent/copy_rel_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ row_idx_t CopyRelColumns::copyRelColumns(RelDataDirection relDirection,
arrow::AllocateBuffer((int64_t)(numRowsInChunk * sizeof(offset_t)))
.Value(&relIDBuffer));
auto relOffsets = (offset_t*)relIDBuffer->data();
auto offsetValueVector = resultSet->getValueVector(info.offsetPos);
auto startRowIdx = offsetValueVector->getValue<offset_t>(
offsetValueVector->state->selVector->selectedPositions[0]);
auto internalIDValueVector = resultSet->getValueVector(info.internalIDPos);
auto startRowIdx = internalIDValueVector
->getValue<internalID_t>(
internalIDValueVector->state->selVector->selectedPositions[0])
.offset;
for (auto rowIdx = 0u; rowIdx < numRowsInChunk; rowIdx++) {
relOffsets[rowIdx] = startRowIdx + rowIdx;
}
Expand Down
8 changes: 5 additions & 3 deletions src/processor/operator/persistent/copy_rel_lists.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ void CopyRelLists::copyRelLists(DirectedInMemRelData* relData, const DataPos& bo
TableCopyUtils::throwCopyExceptionIfNotOK(
arrow::AllocateBuffer((int64_t)(numRows * sizeof(offset_t))).Value(&relIDBuffer));
auto relOffsets = (offset_t*)relIDBuffer->data();
auto offsetValueVector = resultSet->getValueVector(info.offsetPos);
auto startRowIdx = offsetValueVector->getValue<offset_t>(
offsetValueVector->state->selVector->selectedPositions[0]);
auto internalIDValueVector = resultSet->getValueVector(info.internalIDPos);
auto startRowIdx = internalIDValueVector
->getValue<internalID_t>(
internalIDValueVector->state->selVector->selectedPositions[0])
.offset;
for (auto rowIdx = 0u; rowIdx < numRows; rowIdx++) {
relOffsets[rowIdx] = startRowIdx + rowIdx;
}
Expand Down
12 changes: 7 additions & 5 deletions src/processor/operator/persistent/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ void Reader::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont
}
initFunc = ReaderFunctions::getInitDataFunc(*sharedState->readerConfig, info->tableType);
readFunc = ReaderFunctions::getReadRowsFunc(*sharedState->readerConfig, info->tableType);
if (info->nodeOffsetPos.dataChunkPos != INVALID_DATA_CHUNK_POS) {
offsetVector = resultSet->getValueVector(info->nodeOffsetPos).get();
if (info->internalIDPos.dataChunkPos != INVALID_DATA_CHUNK_POS) {
internalIDVector = resultSet->getValueVector(info->internalIDPos).get();
}
assert(!sharedState->readerConfig->filePaths.empty());
if (sharedState->readerConfig->fileType == FileType::CSV &&
Expand Down Expand Up @@ -54,9 +54,11 @@ void Reader::readNextDataChunk() {
std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY);
leftArrowArrays.appendToDataChunk(dataChunk.get(), numLeftToAppend);
auto currRowIdx = sharedState->currRowIdx.fetch_add(numLeftToAppend);
if (offsetVector != nullptr) {
offsetVector->setValue(
offsetVector->state->selVector->selectedPositions[0], currRowIdx);
if (internalIDVector != nullptr) {
internalIDVector
->getValue<internalID_t>(
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
internalIDVector->state->selVector->selectedPositions[0])
.offset = currRowIdx;
}
break;
}
Expand Down