Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel file operations #7228

Merged
merged 18 commits into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions toolsrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ if(GCC OR CLANG)
endif()

file(GLOB_RECURSE VCPKGLIB_SOURCES src/vcpkg/*.cpp)
file(GLOB_RECURSE VCPKGTEST_SOURCES src/vcpkg-tests/*.cpp)
file(GLOB_RECURSE VCPKGTEST_SOURCES src/vcpkg-test/*.cpp)

if (DEFINE_DISABLE_METRICS)
set(DISABLE_METRICS_VALUE "1")
Expand All @@ -52,7 +52,11 @@ add_executable(vcpkg-test EXCLUDE_FROM_ALL ${VCPKGTEST_SOURCES} ${VCPKGLIB_SOURC
target_compile_definitions(vcpkg-test PRIVATE -DDISABLE_METRICS=${DISABLE_METRICS_VALUE})
target_include_directories(vcpkg-test PRIVATE include)

foreach(TEST_NAME arguments chrono dependencies paragraph plan specifier supports)
foreach(TEST_NAME
arguments chrono dependencies files
paragraph plan specifier statusparagraphs
strings supports update
)
add_test(${TEST_NAME} vcpkg-test [${TEST_NAME}])
endforeach()

Expand Down Expand Up @@ -91,3 +95,4 @@ endif()
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
target_link_libraries(vcpkg PRIVATE Threads::Threads)
target_link_libraries(vcpkg-test PRIVATE Threads::Threads)
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <vcpkg/base/files.h>
#include <vcpkg/statusparagraph.h>

#include <memory>
Expand Down Expand Up @@ -30,4 +31,12 @@ T&& unwrap(vcpkg::Optional<T>&& opt)
return std::move(*opt.get());
}

extern const bool SYMLINKS_ALLOWED;

extern const fs::path TEMPORARY_DIRECTORY;

void create_symlink(const fs::path& file, const fs::path& target, std::error_code& ec);

void create_directory_symlink(const fs::path& file, const fs::path& target, std::error_code& ec);

}
57 changes: 53 additions & 4 deletions toolsrc/include/vcpkg/base/files.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,60 @@ namespace fs
using stdfs::file_status;
using stdfs::file_type;
using stdfs::path;
using stdfs::perms;
using stdfs::u8path;

inline bool is_regular_file(file_status s) { return stdfs::is_regular_file(s); }
inline bool is_directory(file_status s) { return stdfs::is_directory(s); }
inline bool is_symlink(file_status s) { return stdfs::is_symlink(s); }
/*
std::experimental::filesystem's file_status and file_type are broken in
the presence of symlinks -- a symlink is treated as the object it points
to for `symlink_status` and `symlink_type`
*/

using stdfs::status;

// we want to poison ADL with these niebloids

namespace detail
{
struct symlink_status_t
{
file_status operator()(const path& p, std::error_code& ec) const noexcept;
file_status operator()(const path& p, vcpkg::LineInfo li) const noexcept;
};
struct is_symlink_t
{
inline bool operator()(file_status s) const { return stdfs::is_symlink(s); }
};
struct is_regular_file_t
{
inline bool operator()(file_status s) const { return stdfs::is_regular_file(s); }
};
struct is_directory_t
{
inline bool operator()(file_status s) const { return stdfs::is_directory(s); }
};
}

constexpr detail::symlink_status_t symlink_status{};
constexpr detail::is_symlink_t is_symlink{};
constexpr detail::is_regular_file_t is_regular_file{};
constexpr detail::is_directory_t is_directory{};
}

/*
if someone attempts to use unqualified `symlink_status` or `is_symlink`,
they might get the ADL version, which is broken.
Therefore, put `symlink_status` in the global namespace, so that they get
our symlink_status.

We also want to poison the ADL on is_regular_file and is_directory, because
we don't want people calling these functions on paths
*/
using fs::is_directory;
using fs::is_regular_file;
using fs::is_symlink;
using fs::symlink_status;

namespace vcpkg::Files
{
struct Filesystem
Expand All @@ -44,7 +91,9 @@ namespace vcpkg::Files
std::error_code& ec) = 0;
bool remove(const fs::path& path, LineInfo linfo);
virtual bool remove(const fs::path& path, std::error_code& ec) = 0;
virtual std::uintmax_t remove_all(const fs::path& path, std::error_code& ec) = 0;

virtual std::uintmax_t remove_all(const fs::path& path, std::error_code& ec, fs::path& failure_point) = 0;
std::uintmax_t remove_all(const fs::path& path, LineInfo li);
virtual bool exists(const fs::path& path) const = 0;
virtual bool is_directory(const fs::path& path) const = 0;
virtual bool is_regular_file(const fs::path& path) const = 0;
Expand Down
5 changes: 5 additions & 0 deletions toolsrc/include/vcpkg/base/strings.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,9 @@ namespace vcpkg::Strings
const char* search(StringView haystack, StringView needle);

bool contains(StringView haystack, StringView needle);

// base 32 encoding, since base64 encoding requires lowercase letters,
// which are not distinct from uppercase letters on macOS or Windows filesystems.
// follows RFC 4648
std::string b32_encode(std::uint64_t x) noexcept;
}
230 changes: 230 additions & 0 deletions toolsrc/include/vcpkg/base/work_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
#pragma once

#include <condition_variable>
#include <memory>
#include <queue>

namespace vcpkg
{
template<class Action, class ThreadLocalData>
struct WorkQueue;

namespace detail
{
// for SFINAE purposes, keep out of the class
template<class Action, class ThreadLocalData>
auto call_moved_action(Action& action,
const WorkQueue<Action, ThreadLocalData>& work_queue,
ThreadLocalData& tld) -> decltype(static_cast<void>(std::move(action)(tld, work_queue)))
{
std::move(action)(tld, work_queue);
}

template<class Action, class ThreadLocalData>
auto call_moved_action(Action& action, const WorkQueue<Action, ThreadLocalData>&, ThreadLocalData& tld)
-> decltype(static_cast<void>(std::move(action)(tld)))
{
std::move(action)(tld);
}
}

template<class Action, class ThreadLocalData>
struct WorkQueue
{
template<class F>
WorkQueue(std::uint16_t num_threads, LineInfo li, const F& tld_init) noexcept
{
m_line_info = li;

set_unjoined_workers(num_threads);
m_threads.reserve(num_threads);
for (std::size_t i = 0; i < num_threads; ++i)
{
m_threads.push_back(std::thread(Worker{this, tld_init()}));
}
}

WorkQueue(WorkQueue const&) = delete;
WorkQueue(WorkQueue&&) = delete;

~WorkQueue()
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
if (!is_joined(m_state))
{
Checks::exit_with_message(m_line_info, "Failed to call join() on a WorkQueue that was destroyed");
}
}

// should only be called once; anything else is an error
void run(LineInfo li)
{
// this should _not_ be locked before `run()` is called; however, we
// want to terminate if someone screws up, rather than cause UB
auto lck = std::unique_lock<std::mutex>(m_mutex);

if (m_state != State::BeforeRun)
{
Checks::exit_with_message(li, "Attempted to run() twice");
}

m_state = State::Running;
}

// runs all remaining tasks, and blocks on their finishing
// if this is called in an existing task, _will block forever_
// DO NOT DO THAT
// thread-unsafe
void join(LineInfo li)
{
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
if (is_joined(m_state))
{
Checks::exit_with_message(li, "Attempted to call join() more than once");
}
else if (m_state == State::Terminated)
{
m_state = State::TerminatedJoined;
}
else
{
m_state = State::Joined;
}
}

while (unjoined_workers())
{
if (!running_workers())
{
m_cv.notify_one();
}
}

// wait for all threads to join
for (auto& thrd : m_threads)
{
thrd.join();
}
}

// useful in the case of errors
// doesn't stop any existing running tasks
// returns immediately, so that one can call this in a task
void terminate() const
{
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
if (is_joined(m_state))
{
m_state = State::TerminatedJoined;
}
else
{
m_state = State::Terminated;
}
}
m_cv.notify_all();
}

void enqueue_action(Action a) const
{
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
m_actions.push_back(std::move(a));

if (m_state == State::BeforeRun) return;
}
m_cv.notify_one();
}

private:
struct Worker
{
const WorkQueue* work_queue;
ThreadLocalData tld;

void operator()()
{
// unlocked when waiting, or when in the action
// locked otherwise
auto lck = std::unique_lock<std::mutex>(work_queue->m_mutex);

work_queue->m_cv.wait(lck, [&] { return work_queue->m_state != State::BeforeRun; });

work_queue->increment_running_workers();
for (;;)
{
const auto state = work_queue->m_state;

if (is_terminated(state))
{
break;
}

if (work_queue->m_actions.empty())
{
if (state == State::Running || work_queue->running_workers() > 1)
{
work_queue->decrement_running_workers();
work_queue->m_cv.wait(lck);
strega-nil marked this conversation as resolved.
Show resolved Hide resolved
work_queue->increment_running_workers();
continue;
}

// the queue is joining, and we are the only worker running
// no more work!
break;
}

Action action = std::move(work_queue->m_actions.back());
work_queue->m_actions.pop_back();

lck.unlock();
work_queue->m_cv.notify_one();
detail::call_moved_action(action, *work_queue, tld);
lck.lock();
strega-nil marked this conversation as resolved.
Show resolved Hide resolved
}

work_queue->decrement_running_workers();
work_queue->decrement_unjoined_workers();
}
};

enum class State : std::int16_t
{
// can only exist upon construction
BeforeRun = -1,

Running,
Joined,
Terminated,
TerminatedJoined,
};

static bool is_terminated(State st) { return st == State::Terminated || st == State::TerminatedJoined; }

static bool is_joined(State st) { return st == State::Joined || st == State::TerminatedJoined; }

mutable std::mutex m_mutex{};
// these are all under m_mutex
mutable State m_state = State::BeforeRun;
mutable std::vector<Action> m_actions{};
mutable std::condition_variable m_cv{};

mutable std::atomic<std::uint32_t> m_workers;
// = unjoined_workers << 16 | running_workers

void set_unjoined_workers(std::uint16_t threads) { m_workers = std::uint32_t(threads) << 16; }
void decrement_unjoined_workers() const { m_workers -= 1 << 16; }

std::uint16_t unjoined_workers() const { return std::uint16_t(m_workers >> 16); }

void increment_running_workers() const { ++m_workers; }
void decrement_running_workers() const { --m_workers; }
std::uint16_t running_workers() const { return std::uint16_t(m_workers); }

std::vector<std::thread> m_threads{};
LineInfo m_line_info;
};
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <vcpkg-tests/catch.h>
#include <vcpkg-test/catch.h>

#include <vcpkg/vcpkgcmdarguments.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#define CATCH_CONFIG_RUNNER
#include <vcpkg-tests/catch.h>
#include <vcpkg-test/catch.h>

#include <vcpkg/base/system.debug.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <vcpkg-tests/catch.h>
#include <vcpkg-test/catch.h>

#include <vcpkg/base/chrono.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <vcpkg-tests/catch.h>
#include <vcpkg-test/catch.h>

#include <vcpkg/sourceparagraph.h>

Expand Down
Loading