From ff96f7d8769de50ca0ecc49b7e64cc47eac6f897 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 17 Jul 2024 15:27:15 +0200 Subject: [PATCH] Factor out commonality between WorkerProto::Basic{Client,Server}Connection This also renames clientVersion and daemonVersion to the more correct protoVersion (since it's the version agreed to by both sides). --- src/libstore/daemon.cc | 29 +++-- src/libstore/daemon.hh | 4 +- src/libstore/remote-store.cc | 38 +++--- .../unix/build/local-derivation-goal.cc | 7 +- src/libstore/worker-protocol-connection.cc | 20 ++-- src/libstore/worker-protocol-connection.hh | 113 ++++++------------ src/libstore/worker-protocol.hh | 1 + src/nix/unix/daemon.cc | 17 ++- 8 files changed, 100 insertions(+), 129 deletions(-) diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index 5c5080f8aa93..fc14eda3c962 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -1020,8 +1020,8 @@ static void performOp(TunnelLogger * logger, ref store, void processConnection( ref store, - FdSource & from, - FdSink & to, + FdSource && from, + FdSink && to, TrustedFlag trusted, RecursiveFlag recursive) { @@ -1037,7 +1037,12 @@ void processConnection( if (clientVersion < 0x10a) throw Error("the Nix client version is too old"); - auto tunnelLogger = new TunnelLogger(to, clientVersion); + WorkerProto::BasicServerConnection conn; + conn.to = std::move(to); + conn.from = std::move(from); + conn.protoVersion = clientVersion; + + auto tunnelLogger = new TunnelLogger(conn.to, clientVersion); auto prevLogger = nix::logger; // FIXME if (!recursive) @@ -1050,12 +1055,6 @@ void processConnection( printMsgUsing(prevLogger, lvlDebug, "%d operations", opCount); }); - WorkerProto::BasicServerConnection conn { - .to = to, - .from = from, - .clientVersion = clientVersion, - }; - conn.postHandshake(*store, { .daemonNixVersion = nixVersion, // We and the underlying store both need to trust the client for @@ -1071,13 +1070,13 @@ void processConnection( try { tunnelLogger->stopWork(); - to.flush(); + conn.to.flush(); /* Process client requests. */ while (true) { WorkerProto::Op op; try { - op = (enum WorkerProto::Op) readInt(from); + op = (enum WorkerProto::Op) readInt(conn.from); } catch (Interrupted & e) { break; } catch (EndOfFile & e) { @@ -1091,7 +1090,7 @@ void processConnection( debug("performing daemon worker op: %d", op); try { - performOp(tunnelLogger, store, trusted, recursive, clientVersion, from, to, op); + performOp(tunnelLogger, store, trusted, recursive, clientVersion, conn.from, conn.to, op); } catch (Error & e) { /* If we're not in a state where we can send replies, then something went wrong processing the input of the @@ -1107,19 +1106,19 @@ void processConnection( throw; } - to.flush(); + conn.to.flush(); assert(!tunnelLogger->state_.lock()->canSendStderr); }; } catch (Error & e) { tunnelLogger->stopWork(&e); - to.flush(); + conn.to.flush(); return; } catch (std::exception & e) { auto ex = Error(e.what()); tunnelLogger->stopWork(&ex); - to.flush(); + conn.to.flush(); return; } } diff --git a/src/libstore/daemon.hh b/src/libstore/daemon.hh index 1964c0d997c4..a8ce32d8deb1 100644 --- a/src/libstore/daemon.hh +++ b/src/libstore/daemon.hh @@ -10,8 +10,8 @@ enum RecursiveFlag : bool { NotRecursive = false, Recursive = true }; void processConnection( ref store, - FdSource & from, - FdSink & to, + FdSource && from, + FdSink && to, TrustedFlag trusted, RecursiveFlag recursive); diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 6e8931ca2e41..ebb0864c5559 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -73,7 +73,7 @@ void RemoteStore::initConnection(Connection & conn) StringSink saved; TeeSource tee(conn.from, saved); try { - conn.daemonVersion = WorkerProto::BasicClientConnection::handshake( + conn.protoVersion = WorkerProto::BasicClientConnection::handshake( conn.to, tee, PROTOCOL_VERSION); } catch (SerialisationError & e) { /* In case the other side is waiting for our input, close @@ -115,7 +115,7 @@ void RemoteStore::setOptions(Connection & conn) << settings.buildCores << settings.useSubstitutes; - if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 12) { + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 12) { std::map overrides; settings.getSettings(overrides, true); // libstore settings fileTransferSettings.getSettings(overrides, true); @@ -175,7 +175,7 @@ bool RemoteStore::isValidPathUncached(const StorePath & path) StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, SubstituteFlag maybeSubstitute) { auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) { StorePathSet res; for (auto & i : paths) if (isValidPath(i)) res.insert(i); @@ -198,7 +198,7 @@ StorePathSet RemoteStore::queryAllValidPaths() StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths) { auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) { StorePathSet res; for (auto & i : paths) { conn->to << WorkerProto::Op::HasSubstitutes << printStorePath(i); @@ -221,7 +221,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) { for (auto & i : pathsMap) { SubstitutablePathInfo info; @@ -241,7 +241,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S } else { conn->to << WorkerProto::Op::QuerySubstitutablePathInfos; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 22) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 22) { StorePathSet paths; for (auto & path : pathsMap) paths.insert(path.first); @@ -368,7 +368,7 @@ ref RemoteStore::addCAToStore( std::optional conn_(getConnection()); auto & conn = *conn_; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 25) { conn->to << WorkerProto::Op::AddToStore @@ -485,7 +485,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, { auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 18) { auto source2 = sinkToSource([&](Sink & sink) { sink << 1 // == path follows ; @@ -513,11 +513,11 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, << info.ultimate << info.sigs << renderContentAddress(info.ca) << repair << !checkSigs; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 23) { conn.withFramedSink([&](Sink & sink) { copyNAR(source, sink); }); - } else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) { + } else if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 21) { conn.processStderr(0, &source); } else { copyNAR(source, conn->to); @@ -554,7 +554,7 @@ void RemoteStore::addMultipleToStore( RepairFlag repair, CheckSigsFlag checkSigs) { - if (GET_PROTOCOL_MINOR(getConnection()->daemonVersion) >= 32) { + if (GET_PROTOCOL_MINOR(getConnection()->protoVersion) >= 32) { auto conn(getConnection()); conn->to << WorkerProto::Op::AddMultipleToStore @@ -572,7 +572,7 @@ void RemoteStore::registerDrvOutput(const Realisation & info) { auto conn(getConnection()); conn->to << WorkerProto::Op::RegisterDrvOutput; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 31) { conn->to << info.id.to_string(); conn->to << std::string(info.outPath.to_string()); } else { @@ -587,7 +587,7 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id, try { auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 27) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 27) { warn("the daemon is too old to support content-addressed derivations, please upgrade it to 2.4"); return callback(nullptr); } @@ -597,7 +597,7 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id, conn.processStderr(); auto real = [&]() -> std::shared_ptr { - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 31) { auto outPaths = WorkerProto::Serialise>::read( *this, *conn); if (outPaths.empty()) @@ -644,9 +644,9 @@ void RemoteStore::buildPaths(const std::vector & drvPaths, BuildMod auto conn(getConnection()); conn->to << WorkerProto::Op::BuildPaths; - assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13); + assert(GET_PROTOCOL_MINOR(conn->protoVersion) >= 13); WorkerProto::write(*this, *conn, drvPaths); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15) + if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 15) conn->to << buildMode; else /* Old daemons did not take a 'buildMode' parameter, so we @@ -667,7 +667,7 @@ std::vector RemoteStore::buildPathsWithResults( std::optional conn_(getConnection()); auto & conn = *conn_; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 34) { conn->to << WorkerProto::Op::BuildPathsWithResults; WorkerProto::write(*this, *conn, paths); conn->to << buildMode; @@ -841,7 +841,7 @@ void RemoteStore::queryMissing(const std::vector & targets, { { auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 19) + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 19) // Don't hold the connection handle in the fallback case // to prevent a deadlock. goto fallback; @@ -889,7 +889,7 @@ void RemoteStore::connect() unsigned int RemoteStore::getProtocol() { auto conn(connections->get()); - return conn->daemonVersion; + return conn->protoVersion; } std::optional RemoteStore::isTrustedClient() diff --git a/src/libstore/unix/build/local-derivation-goal.cc b/src/libstore/unix/build/local-derivation-goal.cc index f968bbc5b7f7..0dd102200a5f 100644 --- a/src/libstore/unix/build/local-derivation-goal.cc +++ b/src/libstore/unix/build/local-derivation-goal.cc @@ -1526,10 +1526,11 @@ void LocalDerivationGoal::startDaemon() debug("received daemon connection"); auto workerThread = std::thread([store, remote{std::move(remote)}]() { - FdSource from(remote.get()); - FdSink to(remote.get()); try { - daemon::processConnection(store, from, to, + daemon::processConnection( + store, + FdSource(remote.get()), + FdSink(remote.get()), NotTrusted, daemon::Recursive); debug("terminated daemon connection"); } catch (SystemError &) { diff --git a/src/libstore/worker-protocol-connection.cc b/src/libstore/worker-protocol-connection.cc index 072bae8da82a..22a01323762a 100644 --- a/src/libstore/worker-protocol-connection.cc +++ b/src/libstore/worker-protocol-connection.cc @@ -58,7 +58,7 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink } else if (msg == STDERR_ERROR) { - if (GET_PROTOCOL_MINOR(daemonVersion) >= 26) { + if (GET_PROTOCOL_MINOR(protoVersion) >= 26) { ex = std::make_exception_ptr(readError(from)); } else { auto error = readString(from); @@ -114,7 +114,7 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink // explain to users what's going on when their daemon is // older than #4628 (2023). if (experimentalFeatureSettings.isEnabled(Xp::DynamicDerivations) - && GET_PROTOCOL_MINOR(daemonVersion) <= 35) { + && GET_PROTOCOL_MINOR(protoVersion) <= 35) { auto m = e.msg(); if (m.find("parsing derivation") != std::string::npos && m.find("expected string") != std::string::npos && m.find("Derive([") != std::string::npos) @@ -173,15 +173,15 @@ WorkerProto::ClientHandshakeInfo WorkerProto::BasicClientConnection::postHandsha { WorkerProto::ClientHandshakeInfo res; - if (GET_PROTOCOL_MINOR(daemonVersion) >= 14) { + if (GET_PROTOCOL_MINOR(protoVersion) >= 14) { // Obsolete CPU affinity. to << 0; } - if (GET_PROTOCOL_MINOR(daemonVersion) >= 11) + if (GET_PROTOCOL_MINOR(protoVersion) >= 11) to << false; // obsolete reserveSpace - if (GET_PROTOCOL_MINOR(daemonVersion) >= 33) + if (GET_PROTOCOL_MINOR(protoVersion) >= 33) to.flush(); return WorkerProto::Serialise::read(store, *this); @@ -189,12 +189,12 @@ WorkerProto::ClientHandshakeInfo WorkerProto::BasicClientConnection::postHandsha void WorkerProto::BasicServerConnection::postHandshake(const StoreDirConfig & store, const ClientHandshakeInfo & info) { - if (GET_PROTOCOL_MINOR(clientVersion) >= 14 && readInt(from)) { + if (GET_PROTOCOL_MINOR(protoVersion) >= 14 && readInt(from)) { // Obsolete CPU affinity. readInt(from); } - if (GET_PROTOCOL_MINOR(clientVersion) >= 11) + if (GET_PROTOCOL_MINOR(protoVersion) >= 11) readInt(from); // obsolete reserveSpace WorkerProto::write(store, *this, info); @@ -212,7 +212,7 @@ UnkeyedValidPathInfo WorkerProto::BasicClientConnection::queryPathInfo( throw InvalidPath(std::move(e.info())); throw; } - if (GET_PROTOCOL_MINOR(daemonVersion) >= 17) { + if (GET_PROTOCOL_MINOR(protoVersion) >= 17) { bool valid; from >> valid; if (!valid) @@ -224,10 +224,10 @@ UnkeyedValidPathInfo WorkerProto::BasicClientConnection::queryPathInfo( StorePathSet WorkerProto::BasicClientConnection::queryValidPaths( const StoreDirConfig & store, bool * daemonException, const StorePathSet & paths, SubstituteFlag maybeSubstitute) { - assert(GET_PROTOCOL_MINOR(daemonVersion) >= 12); + assert(GET_PROTOCOL_MINOR(protoVersion) >= 12); to << WorkerProto::Op::QueryValidPaths; WorkerProto::write(store, *this, paths); - if (GET_PROTOCOL_MINOR(daemonVersion) >= 27) { + if (GET_PROTOCOL_MINOR(protoVersion) >= 27) { to << maybeSubstitute; } processStderr(daemonException); diff --git a/src/libstore/worker-protocol-connection.hh b/src/libstore/worker-protocol-connection.hh index 9dd723fd0892..5c76e2827896 100644 --- a/src/libstore/worker-protocol-connection.hh +++ b/src/libstore/worker-protocol-connection.hh @@ -6,7 +6,7 @@ namespace nix { -struct WorkerProto::BasicClientConnection +struct WorkerProto::BasicConnection { /** * Send with this. @@ -19,14 +19,46 @@ struct WorkerProto::BasicClientConnection FdSource from; /** - * Worker protocol version used for the connection. + * The protocol version agreed by both sides. + */ + WorkerProto::Version protoVersion; + + /** + * Coercion to `WorkerProto::ReadConn`. This makes it easy to use the + * factored out serve protocol serializers with a + * `LegacySSHStore::Connection`. + * + * The serve protocol connection types are unidirectional, unlike + * this type. + */ + operator WorkerProto::ReadConn() + { + return WorkerProto::ReadConn{ + .from = from, + .version = protoVersion, + }; + } + + /** + * Coercion to `WorkerProto::WriteConn`. This makes it easy to use the + * factored out serve protocol serializers with a + * `LegacySSHStore::Connection`. * - * Despite its name, it is actually the maximum version both - * sides support. (If the maximum doesn't exist, we would fail to - * establish a connection and produce a value of this type.) + * The serve protocol connection types are unidirectional, unlike + * this type. */ - WorkerProto::Version daemonVersion; + operator WorkerProto::WriteConn() + { + return WorkerProto::WriteConn{ + .to = to, + .version = protoVersion, + }; + } + +}; +struct WorkerProto::BasicClientConnection : WorkerProto::BasicConnection +{ /** * Flush to direction */ @@ -60,38 +92,6 @@ struct WorkerProto::BasicClientConnection */ ClientHandshakeInfo postHandshake(const StoreDirConfig & store); - /** - * Coercion to `WorkerProto::ReadConn`. This makes it easy to use the - * factored out serve protocol serializers with a - * `LegacySSHStore::Connection`. - * - * The serve protocol connection types are unidirectional, unlike - * this type. - */ - operator WorkerProto::ReadConn() - { - return WorkerProto::ReadConn{ - .from = from, - .version = daemonVersion, - }; - } - - /** - * Coercion to `WorkerProto::WriteConn`. This makes it easy to use the - * factored out serve protocol serializers with a - * `LegacySSHStore::Connection`. - * - * The serve protocol connection types are unidirectional, unlike - * this type. - */ - operator WorkerProto::WriteConn() - { - return WorkerProto::WriteConn{ - .to = to, - .version = daemonVersion, - }; - } - void addTempRoot(const StoreDirConfig & remoteStore, bool * daemonException, const StorePath & path); StorePathSet queryValidPaths( @@ -124,43 +124,8 @@ struct WorkerProto::BasicClientConnection void importPaths(const StoreDirConfig & store, bool * daemonException, Source & source); }; -struct WorkerProto::BasicServerConnection +struct WorkerProto::BasicServerConnection : WorkerProto::BasicConnection { - /** - * Send with this. - */ - FdSink & to; - - /** - * Receive with this. - */ - FdSource & from; - - /** - * Worker protocol version used for the connection. - * - * Despite its name, it is actually the maximum version both - * sides support. (If the maximum doesn't exist, we would fail to - * establish a connection and produce a value of this type.) - */ - WorkerProto::Version clientVersion; - - operator WorkerProto::ReadConn() - { - return WorkerProto::ReadConn{ - .from = from, - .version = clientVersion, - }; - } - - operator WorkerProto::WriteConn() - { - return WorkerProto::WriteConn{ - .to = to, - .version = clientVersion, - }; - } - /** * Establishes connection, negotiating version. * diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index 62a12d182017..f4023f7ea94a 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -82,6 +82,7 @@ struct WorkerProto * * @todo remove once Hydra uses Store abstraction consistently. */ + struct BasicConnection; struct BasicClientConnection; struct BasicServerConnection; diff --git a/src/nix/unix/daemon.cc b/src/nix/unix/daemon.cc index 4a7997b1fb0d..66d8dbcf0926 100644 --- a/src/nix/unix/daemon.cc +++ b/src/nix/unix/daemon.cc @@ -370,9 +370,12 @@ static void daemonLoop(std::optional forceTrustClientOpt) } // Handle the connection. - FdSource from(remote.get()); - FdSink to(remote.get()); - processConnection(openUncachedStore(), from, to, trusted, NotRecursive); + processConnection( + openUncachedStore(), + FdSource(remote.get()), + FdSink(remote.get()), + trusted, + NotRecursive); exit(0); }, options); @@ -437,9 +440,11 @@ static void forwardStdioConnection(RemoteStore & store) { */ static void processStdioConnection(ref store, TrustedFlag trustClient) { - FdSource from(STDIN_FILENO); - FdSink to(STDOUT_FILENO); - processConnection(store, from, to, trustClient, NotRecursive); + processConnection( + store, + FdSource(STDIN_FILENO), + FdSink(STDOUT_FILENO), + trustClient, NotRecursive); } /**