Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CopyCSV taskscheduler improvement #1404

Closed
acquamarin opened this issue Mar 22, 2023 · 1 comment · Fixed by #1460
Closed

CopyCSV taskscheduler improvement #1404

acquamarin opened this issue Mar 22, 2023 · 1 comment · Fixed by #1460
Assignees

Comments

@acquamarin
Copy link
Collaborator

acquamarin commented Mar 22, 2023

There are two issues related to the current taskScheduler:

1. CopyCSV taskScheduler exception handling.

When executing copycsv, the main thread may put multiple tasks in the task queue.
If an exception occurs on a task, we will remove that task from the task queue and throw the exception. However, the other tasks which are planned by the copyCSV are not removed from the queue. The worker thread will continue to grab task from the taskQueue and execute, which causes seg fault.

2. CopyCSV taskScheduler waitAllTasksToCompleteOrError problem:

The copyCSV calls the waitAllTasksToCompleteOrError to wait for all copyCSV tasks to finish. However, the processor may put some processor tasks into the task queue at the same time. The copyCSV will wait for the processor task to finish as well.

@acquamarin acquamarin self-assigned this Mar 22, 2023
@acquamarin acquamarin changed the title CopyCSV taskscheduler exception handling CopyCSV taskscheduler improvement Mar 23, 2023
@semihsalihoglu-uw
Copy link
Contributor

semihsalihoglu-uw commented Apr 4, 2023

Here's my suggestion for how to do this. Unless someone has a comment before I start, this is my current plan:

Overview

All tasks that are meant to be executed by multiple threads are based on reading a chunk of a disk file (e.g., arrow or parquet). Currently this works as follow. The main thread Tm (user's thread) that prepares and assigns the tasks by scanning 1 block (~8MB) of the disk file and puts it into a task and schedules it. Tm does this for multiple tasks NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH (200 by default) many at a time and waits for these tasks to finish. And then does another 200 tasks etc. The reading for 8MB at a time is done by "streaming readers", e.g., CSVStreamingReader of Arrow.

Instead, I'll make these parts of the code morsel-driven in a simple way: I'll add a lock that registered threads needs to grab to scan a batch of 8MB, and then after that release the lock and run the rest of the copying function. For example, NodeCopier::assignCopyCSVTasks is one of those functions. It looks as follows.

arrow::Status NodeCopier::assignCopyCSVTasks(arrow::csv::StreamingReader* csvStreamingReader,
    offset_t startOffset, std::string filePath, std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
    auto it = csvStreamingReader->begin();
    auto endIt = csvStreamingReader->end();
    std::shared_ptr<arrow::RecordBatch> currBatch;
    block_idx_t blockIdx = 0;
    while (it != endIt) {
        for (int i = 0; i < common::CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
            if (it == endIt) {
                break;
            }
            ARROW_ASSIGN_OR_RAISE(currBatch, *it);
            taskScheduler.scheduleTask(
                CopyTaskFactory::createCopyTask(batchPopulateColumnsTask<T, arrow::Array>,
                    reinterpret_cast<NodeTableSchema*>(tableSchema)->primaryKeyPropertyID, blockIdx,
                    startOffset, pkIndex.get(), this, currBatch->columns(), filePath));
            startOffset += currBatch->num_rows();
            ++blockIdx;
            ++it;
        }
        taskScheduler.waitUntilEnoughTasksFinish(
            CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
    }
    taskScheduler.waitAllTasksToCompleteOrError();
    return arrow::Status::OK();
}

It would change as follows:


void NodeCopier::assignCopyCSVTasks(arrow::csv::StreamingReader* csvStreamingReader,
    offset_t startOffset, std::string filePath, std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
    auto it = csvStreamingReader->begin();
    auto endIt = csvStreamingReader->end();
    block_idx_t blockIdx = 0;
    SharedState sharedState{startOffset, pkIndex.get(), this, currBatch->columns(), filePath, pkIndex, it, end, blockId};
    taskScheduler.scheduleTaskAndWaitOrError(
                CopyTaskFactory::createCopyTask(batchPopulateColumnsTask<T, arrow::Array>,
                    reinterpret_cast<NodeTableSchema*>(tableSchema)->primaryKeyPropertyID, blockIdx,
                    startOffset, pkIndex.get(), this, currBatch->columns(), filePath, clientContext));

}

void NodeCopier::batchPopulateColumnsTask(SharedState* sharedState) {
        std::shared_ptr<arrow::RecordBatch> currBatch; 
    while (true) {
          // lock-unlock block to grab a morsel
          lock_t lck{sharedState->mtx};
          // some special case code for the very first batch
          if (sharedState->it == sharedState->endIt) {
            break; // should unlock automatically as the function exits
          }
          ++sharedState->it;
          startOffsetToUse = sharedState->startOffset; 
          sharedState->startOffset += currBatch->num_rows();
          ARROW_ASSIGN_OR_RAISE(currBatch, *sharedState->it);
          lck->unlock();
          // grabbed morsel at this point
          // copy the contents of the current batchPopulateColumnsTask here. 
    }
}

Something like this. Note that to solve this current issue, I'll have to add some code to check if an error was thrown before grabbing a new batch and throw an exception if there was an error to stop working.

I would do similar changes in other places that we can "morselize" both in node_copier.cpp and rel_copier.cpp. As part of this, I expect to remove several functions from TaskScheduler, such as waitUntilEnoughTasksFinish and waitAllTasksToCompleteOrError.

In addition to these changes I want to also do the following (maybe in a separate PR):

  1. Add timeout mechanism also for copier. This means I would pass in the ClientContext to node/rel_copier and check it in appropriate places (though possibly not all) to ensure that we do not time out.
  2. Add a logic to physical operator so that when an exception happens, similar to an interrupt, and similar to what I'm proposing above, threads do not have to keep running the pipelines and can stop working on earlier.

I need to think of a way to test these changes though, i.e., that the mechanisms to stop early when there are timeouts and exceptions are working as expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants