Skip to content

Commit

Permalink
fixing issue 1404
Browse files Browse the repository at this point in the history
  • Loading branch information
semihsalihoglu-uw committed Apr 12, 2023
1 parent 06181ac commit 3b3f2a7
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 168 deletions.
5 changes: 4 additions & 1 deletion src/common/task_system/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ void TaskScheduler::scheduleTaskAndWaitOrError(
}
auto scheduledTask = scheduleTask(task);
while (!task->isCompleted()) {
if (context->clientContext->isTimeOutEnabled()) {
if (context != nullptr && context->clientContext->isTimeOutEnabled()) {
interruptTaskIfTimeOutNoLock(context);
} else if (task->hasException()) {
// Interrupt tasks that errored, so other threads can stop working on them early.
context->clientContext->interrupt();
}
std::this_thread::sleep_for(
std::chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct BufferPoolConstants {
// `EvictionQueue::removeNonEvictableCandidates()` for more details.
static constexpr uint64_t EVICTION_QUEUE_PURGING_INTERVAL = 1024;
// The default max size for a VMRegion.
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 45; // (32TB)
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 43; // (8TB)

static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB)
};
Expand Down
2 changes: 0 additions & 2 deletions src/include/main/client_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class ClientContext {

inline bool isTimeOutEnabled() const { return timeoutInMS != 0; }

inline uint64_t getTimeOutMS() const { return timeoutInMS; }

void startTimingIfEnabled();

private:
Expand Down
19 changes: 12 additions & 7 deletions src/include/storage/copier/copy_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ namespace storage {

class CopyTask : public common::Task {
public:
// CopyCSV tasks are intended to be tasks on which only 1 thread works.
CopyTask() : Task(1){};
CopyTask(uint64_t numThreads) : Task(numThreads){};
};

template<typename F>
class ParameterizedCopyTask : public CopyTask {
public:
ParameterizedCopyTask(F&& func) : f(func){};
ParameterizedCopyTask(F&& func, uint64_t numThreads = 1) : CopyTask(numThreads), f(func){};
void run() override { f(); }

private:
Expand All @@ -25,15 +24,21 @@ class CopyTaskFactory {

private:
template<typename F>
static std::shared_ptr<CopyTask> createCopyTaskInternal(F&& f) {
return std::shared_ptr<CopyTask>(new ParameterizedCopyTask<F>(std::forward<F>(f)));
static std::shared_ptr<CopyTask> createCopyTaskInternal(F&& f, uint64_t numThreads = 1) {
return std::shared_ptr<CopyTask>(
new ParameterizedCopyTask<F>(std::forward<F>(f), numThreads));
};

public:
template<typename F, typename... Args>
static std::shared_ptr<CopyTask> createCopyTask(F function, Args&&... args) {
return std::shared_ptr<CopyTask>(
createCopyTaskInternal(std::move(std::bind(function, args...))));
return createParallelCopyTask(1 /* num threads */, function, args...);
};

template<typename F, typename... Args>
static std::shared_ptr<CopyTask> createParallelCopyTask(
uint64_t numThreads, F function, Args&&... args) {
return createCopyTaskInternal(std::move(std::bind(function, args...)), numThreads);
};
};
} // namespace storage
Expand Down
134 changes: 116 additions & 18 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,117 @@
namespace kuzu {
namespace storage {

using lock_t = std::unique_lock<std::mutex>;

using set_element_func_t = std::function<void(NodeInMemColumn* column,
InMemColumnChunk* columnChunk, common::offset_t nodeOffset, const std::string& data)>;

template<typename MORSEL_T>
class NodeCopyMorsel {

public:
static constexpr common::block_idx_t INVALID_BLOCK_IDX = -1ull;

public:
NodeCopyMorsel(common::offset_t startOffset, common::block_idx_t blockIdx)
: startOffset{startOffset}, blockIdx{blockIdx} {};

virtual ~NodeCopyMorsel() = default;

virtual const std::vector<std::shared_ptr<MORSEL_T>>& getArrowColumns() = 0;

bool success() { return blockIdx != INVALID_BLOCK_IDX; }

public:
common::offset_t startOffset;
common::block_idx_t blockIdx;
};

class CSVNodeCopyMorsel : public NodeCopyMorsel<arrow::Array> {

public:
CSVNodeCopyMorsel(std::shared_ptr<arrow::RecordBatch> recordBatch, common::offset_t startOffset,
common::block_idx_t blockIdx)
: NodeCopyMorsel{startOffset, blockIdx}, recordBatch{recordBatch} {};

const std::vector<std::shared_ptr<arrow::Array>>& getArrowColumns() override {
return recordBatch->columns();
}

private:
std::shared_ptr<arrow::RecordBatch> recordBatch;
};

class ParquetNodeCopyMorsel : public NodeCopyMorsel<arrow::ChunkedArray> {

public:
ParquetNodeCopyMorsel(std::shared_ptr<arrow::Table> currTable, common::offset_t startOffset,
common::block_idx_t blockIdx)
: NodeCopyMorsel{startOffset, blockIdx}, currTable{currTable} {};

const std::vector<std::shared_ptr<arrow::ChunkedArray>>& getArrowColumns() override {
return currTable->columns();
}

private:
std::shared_ptr<arrow::Table> currTable;
};

template<typename T1, typename T2>
class NodeCopySharedState {

public:
NodeCopySharedState(
std::string filePath, HashIndexBuilder<T1>* pkIndex, common::offset_t startOffset)
: filePath{filePath}, pkIndex{pkIndex}, startOffset{startOffset}, blockIdx{0} {};

virtual ~NodeCopySharedState() = default;

virtual std::unique_ptr<NodeCopyMorsel<T2>> getMorsel() = 0;

public:
std::string filePath;
HashIndexBuilder<T1>* pkIndex;
common::offset_t startOffset;

protected:
common::block_idx_t blockIdx;
std::mutex mtx;
};

template<typename T>
class CSVNodeCopySharedState : public NodeCopySharedState<T, arrow::Array> {

public:
CSVNodeCopySharedState(std::string filePath, HashIndexBuilder<T>* pkIndex,
common::offset_t startOffset,
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader)
: NodeCopySharedState<T, arrow::Array>{filePath, pkIndex, startOffset},
csvStreamingReader{move(csvStreamingReader)} {};
std::unique_ptr<NodeCopyMorsel<arrow::Array>> getMorsel() override;

private:
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader;
};

template<typename T>
class ParquetNodeCopySharedState : public NodeCopySharedState<T, arrow::ChunkedArray> {

public:
ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder<T>* pkIndex,
common::offset_t startOffset, uint64_t numBlocks,
std::unique_ptr<parquet::arrow::FileReader> parquetReader)
: NodeCopySharedState<T, arrow::ChunkedArray>{filePath, pkIndex, startOffset},
numBlocks{numBlocks}, parquetReader{std::move(parquetReader)} {};
std::unique_ptr<NodeCopyMorsel<arrow::ChunkedArray>> getMorsel() override;

public:
uint64_t numBlocks;

private:
std::unique_ptr<parquet::arrow::FileReader> parquetReader;
};

class NodeCopier : public TableCopier {

public:
Expand All @@ -23,7 +131,7 @@ class NodeCopier : public TableCopier {
protected:
void initializeColumnsAndLists() override;

void populateColumnsAndLists() override;
void populateColumnsAndLists(processor::ExecutionContext* executionContext) override;

void saveToFile() override;

Expand All @@ -36,13 +144,15 @@ class NodeCopier : public TableCopier {

private:
template<typename T>
arrow::Status populateColumns();
arrow::Status populateColumns(processor::ExecutionContext* executionContext);

template<typename T>
arrow::Status populateColumnsFromCSV(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
arrow::Status populateColumnsFromCSV(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
arrow::Status populateColumnsFromParquet(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
arrow::Status populateColumnsFromParquet(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
static void putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
Expand All @@ -51,21 +161,9 @@ class NodeCopier : public TableCopier {
PageByteCursor& overflowCursor);

// Concurrent tasks.
// Note that primaryKeyPropertyIdx is *NOT* the property ID of the primary key property.
template<typename T1, typename T2>
static arrow::Status batchPopulateColumnsTask(uint64_t primaryKeyPropertyIdx, uint64_t blockIdx,
uint64_t startOffset, HashIndexBuilder<T1>* pkIndex, NodeCopier* copier,
const std::vector<std::shared_ptr<T2>>& batchColumns, std::string filePath);

template<typename T>
arrow::Status assignCopyCSVTasks(arrow::csv::StreamingReader* csvStreamingReader,
common::offset_t startOffset, std::string filePath,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
arrow::Status assignCopyParquetTasks(parquet::arrow::FileReader* parquetReader,
common::offset_t startOffset, std::string filePath,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
static void batchPopulateColumnsTask(NodeCopySharedState<T1, T2>* sharedState,
NodeCopier* copier, processor::ExecutionContext* executionContext);

template<typename T>
static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/copier/npy_node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ class NpyNodeCopier : public NodeCopier {
~NpyNodeCopier() override = default;

private:
void populateColumnsAndLists() override;
void populateColumnsAndLists(processor::ExecutionContext* executionContext) override;

void populateInMemoryStructures() override;
void populateInMemoryStructures(processor::ExecutionContext* executionContext) override;

void initializeNpyReaders();

Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class RelCopier : public TableCopier {

void initializeColumnsAndLists() override;

void populateColumnsAndLists() override;
void populateColumnsAndLists(processor::ExecutionContext* executionContext) override;

void saveToFile() override;

Expand Down
13 changes: 4 additions & 9 deletions src/include/storage/copier/table_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ class TableCopier {

virtual ~TableCopier() = default;

uint64_t copy();
uint64_t copy(processor::ExecutionContext* executionContext);

protected:
virtual void initializeColumnsAndLists() = 0;

virtual void populateColumnsAndLists() = 0;
virtual void populateColumnsAndLists(processor::ExecutionContext* executionContext) = 0;

virtual void saveToFile() = 0;

virtual void populateInMemoryStructures();
virtual void populateInMemoryStructures(processor::ExecutionContext* executionContext);

inline void updateTableStatistics() {
tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows);
Expand All @@ -65,13 +65,8 @@ class TableCopier {
arrow::Status initCSVReaderAndCheckStatus(
std::shared_ptr<arrow::csv::StreamingReader>& csv_streaming_reader,
const std::string& filePath);
arrow::Status initCSVReader(std::shared_ptr<arrow::csv::StreamingReader>& csv_streaming_reader,
const std::string& filePath);

arrow::Status initArrowReaderAndCheckStatus(
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& ipc_reader,
const std::string& filePath);
arrow::Status initArrowReader(std::shared_ptr<arrow::ipc::RecordBatchFileReader>& ipc_reader,
arrow::Status initCSVReader(std::shared_ptr<arrow::csv::StreamingReader>& csv_streaming_reader,
const std::string& filePath);

arrow::Status initParquetReaderAndCheckStatus(
Expand Down
1 change: 1 addition & 0 deletions src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ SystemConfig::SystemConfig(uint64_t bufferPoolSize_) {
(double_t)std::min(systemMemSize, (std::uint64_t)UINTPTR_MAX));
}
bufferPoolSize = bufferPoolSize_;
auto maxConcurrency = std::thread::hardware_concurrency();
maxNumThreads = std::thread::hardware_concurrency();
}

Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ uint64_t CopyNode::executeInternal(
if (copyDescription.fileType == common::CopyDescription::FileType::NPY) {
auto nodeCopier = std::make_unique<NpyNodeCopier>(copyDescription, wal->getDirectory(),
*taskScheduler, *catalog, tableID, nodesStatistics);
numNodesCopied = nodeCopier->copy();
numNodesCopied = nodeCopier->copy(executionContext);
} else {
auto nodeCopier = std::make_unique<NodeCopier>(copyDescription, wal->getDirectory(),
*taskScheduler, *catalog, tableID, nodesStatistics);
numNodesCopied = nodeCopier->copy();
numNodesCopied = nodeCopier->copy(executionContext);
}
for (auto& relTableSchema : catalog->getAllRelTableSchemasContainBoundTable(tableID)) {
relsStore.getRelTable(relTableSchema->tableID)
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/copy/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace processor {

uint64_t CopyRel::executeInternal(
kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto relCSVCopier = make_unique<RelCopier>(copyDescription, wal->getDirectory(), *taskScheduler,
auto relCopier = make_unique<RelCopier>(copyDescription, wal->getDirectory(), *taskScheduler,
*catalog, nodesStore, executionContext->bufferManager, tableID, relsStatistics);
auto numRelsCopied = relCSVCopier->copy();
auto numRelsCopied = relCopier->copy(executionContext);
wal->logCopyRelRecord(tableID);
return numRelsCopied;
}
Expand Down
Loading

0 comments on commit 3b3f2a7

Please sign in to comment.