-
Notifications
You must be signed in to change notification settings - Fork 272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Binary cache: async push_success #908
base: main
Are you sure you want to change the base?
Changes from 46 commits
95f0438
9d999d8
163d9cd
2a54205
0912655
5d7288c
10189ac
2567607
ecdd000
8e7ae61
850d7c9
548be38
6dbbf06
74b86fd
5171d3e
d69ed8f
2df42d5
5f1786e
93303c3
8a26c8b
aa7e52f
d46a4d6
5e51718
a9ac558
4faf674
b9be8c6
78ca081
579bfa9
103968e
dd32416
b666f94
15bb503
d995bfd
24cd026
92fc76b
3527227
48305b3
27fa076
bcd459a
f958d36
7a24007
50114f9
ca5f2b1
eccd9ee
e7837e0
969e7fc
2d5586f
809d0b6
455e29b
03fdfea
f4bad8c
26bbbd5
814e434
290e586
3cc3378
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
#pragma once | ||
|
||
#include <condition_variable> | ||
#include <mutex> | ||
#include <vector> | ||
|
||
template<class T> | ||
class BatchQueue | ||
{ | ||
public: | ||
template<class... Args> | ||
void push(Args&&... args) | ||
{ | ||
forward.emplace_back(std::forward<Args>(args)...); | ||
} | ||
|
||
bool empty() const { return forward.empty(); } | ||
|
||
void pop(std::vector<T>& out) | ||
{ | ||
out.clear(); | ||
swap(out, forward); | ||
} | ||
|
||
private: | ||
std::vector<T> forward; | ||
}; | ||
|
||
template<class WorkItem> | ||
struct BGThreadBatchQueue | ||
{ | ||
template<class... Args> | ||
void push(Args&&... args) | ||
{ | ||
std::lock_guard<std::mutex> lock(m_mtx); | ||
m_tasks.push(std::forward<Args>(args)...); | ||
m_cv.notify_all(); | ||
} | ||
|
||
void wait_for_items(std::vector<WorkItem>& out) | ||
{ | ||
std::unique_lock<std::mutex> lock(m_mtx); | ||
m_cv.wait(lock, [this]() { return !m_tasks.empty() || !m_running; }); | ||
m_tasks.pop(out); | ||
} | ||
|
||
void stop() | ||
{ | ||
std::lock_guard<std::mutex> lock(m_mtx); | ||
m_running = false; | ||
m_cv.notify_all(); | ||
} | ||
|
||
bool stopped() const { return !m_running; } | ||
|
||
private: | ||
std::mutex m_mtx; | ||
std::condition_variable m_cv; | ||
BatchQueue<WorkItem> m_tasks; | ||
bool m_running = true; | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,4 +10,5 @@ namespace vcpkg | |
|
||
struct FileSink; | ||
struct CombiningSink; | ||
struct BGMessageSink; | ||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -4,6 +4,8 @@ | |||
|
||||
#include <vcpkg/base/messages.h> | ||||
|
||||
#include <mutex> | ||||
|
||||
namespace vcpkg | ||||
{ | ||||
|
||||
|
@@ -75,4 +77,30 @@ namespace vcpkg | |||
CombiningSink(MessageSink& first, MessageSink& second) : m_first(first), m_second(second) { } | ||||
void print(Color c, StringView sv) override; | ||||
}; | ||||
|
||||
struct BGMessageSink final : MessageSink | ||||
{ | ||||
BGMessageSink(MessageSink& out_sink) : out_sink(out_sink) { } | ||||
~BGMessageSink() { publish_directly_to_out_sink(); } | ||||
// must be called from producer | ||||
void print(Color c, StringView sv) override; | ||||
using MessageSink::print; | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There's no hiding going on here since it's an override. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah but without it I get
|
||||
|
||||
// must be called from consumer (synchronizer of out) | ||||
void print_published(); | ||||
|
||||
void publish_directly_to_out_sink(); | ||||
|
||||
private: | ||||
MessageSink& out_sink; | ||||
|
||||
std::mutex m_published_lock; | ||||
std::vector<std::pair<Color, std::string>> m_published; | ||||
|
||||
// buffers messages until newline is reached | ||||
// guarded by m_print_directly_lock | ||||
std::vector<std::pair<Color, std::string>> m_unpublished; | ||||
Comment on lines
+100
to
+102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to add some form of API that lets whole lines be submitted to the sink at a time rather than needing the complexity of the publish/not published yet going on here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean somethink like a MessageSink that only allows the printing of complete lines by providing only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something like that? Maybe? I think what this really argues is that we really need a semi-standardized 'document' error type @ras0219-msft has been asking for ages... we're almost done getting rid of |
||||
std::mutex m_print_directly_lock; | ||||
bool m_print_directly_to_out_sink = false; | ||||
}; | ||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,12 +3,15 @@ | |||||
#include <vcpkg/base/fwd/message_sinks.h> | ||||||
|
||||||
#include <vcpkg/fwd/binarycaching.h> | ||||||
#include <vcpkg/fwd/build.h> | ||||||
#include <vcpkg/fwd/dependencies.h> | ||||||
#include <vcpkg/fwd/tools.h> | ||||||
#include <vcpkg/fwd/vcpkgpaths.h> | ||||||
|
||||||
#include <vcpkg/base/batch-queue.h> | ||||||
#include <vcpkg/base/downloads.h> | ||||||
#include <vcpkg/base/expected.h> | ||||||
#include <vcpkg/base/message_sinks.h> | ||||||
#include <vcpkg/base/path.h> | ||||||
|
||||||
#include <vcpkg/archives.h> | ||||||
|
@@ -18,6 +21,7 @@ | |||||
#include <iterator> | ||||||
#include <set> | ||||||
#include <string> | ||||||
#include <thread> | ||||||
#include <unordered_map> | ||||||
#include <vector> | ||||||
|
||||||
|
@@ -196,23 +200,41 @@ namespace vcpkg | |||||
|
||||||
struct BinaryCache : ReadOnlyBinaryCache | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When adding threading functionality to the codebase that is not effectively 'do a single function but faster', I think there needs to be a discussion in a comment of 'this is how the threads and communication between them work'. For instance:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does your approach here survive #1076 ? Hard links mean that we have a lot less certainty on the packages directory being 'hermetic'. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #1076 does not have an impact here. The files "copied" to the installed dir are not later changed by another package. Even if they are overwritten, the hard link in the packages folder would still link to the same original file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It has an impact on windows if #802 gets merged before this. Because when the packages folder gets compressed by 7z, the next feature gets tested and stuff gets removed from the installed dir and on windows you cant remove a hard link even if the linked file is only opened via another hard link -.- PS: Not sure, but I noticed this behavior only on the windows dev drive, but maybe the normal filesystem was simply too slow so that this never happened There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was finally able to catch this situation that happens in combination of this PR with #802: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interestingly the catch seems to only happen with this specific rocksdb.lib file and error nearly only happens with lib files. And in general it only happens on the dev drive and not on a normal NTFS drive. Strage 😕 |
||||||
{ | ||||||
static ExpectedL<BinaryCache> make(const VcpkgCmdArguments& args, const VcpkgPaths& paths, MessageSink& sink); | ||||||
static ExpectedL<std::unique_ptr<BinaryCache>> make(const VcpkgCmdArguments& args, | ||||||
const VcpkgPaths& paths, | ||||||
MessageSink& sink); | ||||||
|
||||||
BinaryCache(const Filesystem& fs); | ||||||
BinaryCache(const BinaryCache&) = delete; | ||||||
BinaryCache(BinaryCache&&) = default; | ||||||
BinaryCache(BinaryCache&&) = delete; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I disagree with this design change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
How would you implement that? The function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have now put all data in an extra struct that is hold via a std::unique_ptr by the BinaryCache class. |
||||||
~BinaryCache(); | ||||||
|
||||||
/// Called upon a successful build of `action` to store those contents in the binary cache. | ||||||
void push_success(const InstallPlanAction& action); | ||||||
|
||||||
void print_push_success_messages(); | ||||||
void wait_for_async_complete(); | ||||||
|
||||||
private: | ||||||
BinaryCache(BinaryProviders&& providers, const Filesystem& fs); | ||||||
|
||||||
const Filesystem& m_fs; | ||||||
Optional<ZipTool> m_zip_tool; | ||||||
bool m_needs_nuspec_data = false; | ||||||
bool m_needs_zip_file = false; | ||||||
|
||||||
struct ActionToPush | ||||||
{ | ||||||
BinaryPackageWriteInfo request; | ||||||
CleanPackages clean_after_push; | ||||||
}; | ||||||
|
||||||
void push_thread_main(); | ||||||
|
||||||
BGMessageSink m_bg_msg_sink; | ||||||
BGThreadBatchQueue<ActionToPush> m_actions_to_push; | ||||||
std::atomic_int m_remaining_packages_to_push = 0; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think this should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This member existing at all is a smell to me. The number of items in the queue should be a part of the queue, not something tracked externally. Moreover, the queue already contains locks and stuff so I'm not sure why we need an atomic here. |
||||||
std::thread m_push_thread; | ||||||
}; | ||||||
|
||||||
ExpectedL<DownloadManagerConfig> parse_download_configuration(const Optional<std::string>& arg); | ||||||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -1,5 +1,6 @@ | ||||||||
#include <vcpkg/base/file_sink.h> | ||||||||
#include <vcpkg/base/message_sinks.h> | ||||||||
#include <vcpkg/base/strings.h> | ||||||||
|
||||||||
namespace | ||||||||
{ | ||||||||
|
@@ -58,4 +59,74 @@ namespace vcpkg | |||||||
m_second.print(c, sv); | ||||||||
} | ||||||||
|
||||||||
void BGMessageSink::print(Color c, StringView sv) | ||||||||
{ | ||||||||
std::lock_guard<std::mutex> print_lk(m_print_directly_lock); | ||||||||
if (m_print_directly_to_out_sink) | ||||||||
{ | ||||||||
out_sink.print(c, sv); | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
auto pos = Strings::find_last(sv, '\n'); | ||||||||
if (pos != std::string::npos) | ||||||||
{ | ||||||||
{ | ||||||||
std::lock_guard<std::mutex> lk(m_published_lock); | ||||||||
m_published.insert(m_published.end(), | ||||||||
std::make_move_iterator(m_unpublished.begin()), | ||||||||
std::make_move_iterator(m_unpublished.end())); | ||||||||
m_published.emplace_back(c, sv.substr(0, pos + 1)); | ||||||||
} | ||||||||
m_unpublished.clear(); | ||||||||
if (sv.size() > pos + 1) | ||||||||
{ | ||||||||
m_unpublished.emplace_back(c, sv.substr(pos + 1)); | ||||||||
} | ||||||||
} | ||||||||
else | ||||||||
{ | ||||||||
m_unpublished.emplace_back(c, sv); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
void BGMessageSink::print_published() | ||||||||
{ | ||||||||
std::vector<std::pair<Color, std::string>> tmp; | ||||||||
for (;;) | ||||||||
{ | ||||||||
{ | ||||||||
std::lock_guard<std::mutex> lk(m_published_lock); | ||||||||
swap(tmp, m_published); | ||||||||
} | ||||||||
|
||||||||
if (tmp.empty()) | ||||||||
{ | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
for (auto&& m : tmp) | ||||||||
{ | ||||||||
out_sink.print(m.first, m.second); | ||||||||
} | ||||||||
|
||||||||
tmp.clear(); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
void BGMessageSink::publish_directly_to_out_sink() | ||||||||
{ | ||||||||
std::lock_guard<std::mutex> print_lk(m_print_directly_lock); | ||||||||
std::lock_guard<std::mutex> lk(m_published_lock); | ||||||||
Comment on lines
+119
to
+120
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
if this survives There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you actually mean |
||||||||
|
||||||||
m_print_directly_to_out_sink = true; | ||||||||
for (auto& messages : {&m_published, &m_unpublished}) | ||||||||
{ | ||||||||
for (auto&& m : *messages) | ||||||||
{ | ||||||||
out_sink.print(m.first, m.second); | ||||||||
autoantwort marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
} | ||||||||
messages->clear(); | ||||||||
} | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this thing being called queue given that this is how it works. Given that we expect this to be a multi producer single consumer queue, can we instead put the vector inside and note that only one thread may call pop but any number of threads may call push? That would also resolve the criticism over separate tracking atomics below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BatchQueue
alone is not thread safe.I don't get what you have in mind here 😅
Do you have an idea for a better name? :)