Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/libcmd/installable-attr-path.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ DerivedPathsWithInfo InstallableAttrPath::toDerivedPaths()
}

DerivedPathsWithInfo res;
for (auto & [drvPath, outputs] : byDrvPath)
for (auto & [drvPath, outputs] : byDrvPath) {
state->waitForPath(drvPath);
res.push_back({
.path =
DerivedPath::Built{
Expand All @@ -102,6 +103,7 @@ DerivedPathsWithInfo InstallableAttrPath::toDerivedPaths()
so we can fill in this info. */
}),
});
}

return res;
}
Expand Down
1 change: 1 addition & 0 deletions src/libcmd/installable-flake.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ DerivedPathsWithInfo InstallableFlake::toDerivedPaths()
}

auto drvPath = attr->forceDerivation();
state->waitForPath(drvPath);

std::optional<NixInt::Inner> priority;

Expand Down
1 change: 1 addition & 0 deletions src/libcmd/repl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ StorePath NixRepl::getDerivationPath(Value & v)
auto drvPath = packageInfo->queryDrvPath();
if (!drvPath)
throw Error("expression did not evaluate to a valid derivation (no 'drvPath' attribute)");
state->waitForPath(*drvPath);
if (!state->store->isValidPath(*drvPath))
throw Error("expression evaluated to invalid derivation '%s'", state->store->printStorePath(*drvPath));
return *drvPath;
Expand Down
5 changes: 5 additions & 0 deletions src/libexpr-c/nix_api_expr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ nix_err nix_expr_eval_from_string(
nix::Expr * parsedExpr = state->state.parseExprFromString(expr, state->state.rootPath(nix::CanonPath(path)));
state->state.eval(parsedExpr, value->value);
state->state.forceValue(value->value, nix::noPos);
state->state.waitForAllPaths();
}
NIXC_CATCH_ERRS
}
Expand All @@ -80,6 +81,7 @@ nix_err nix_value_call(nix_c_context * context, EvalState * state, Value * fn, n
try {
state->state.callFunction(fn->value, arg->value, value->value, nix::noPos);
state->state.forceValue(value->value, nix::noPos);
state->state.waitForAllPaths();
}
NIXC_CATCH_ERRS
}
Expand All @@ -92,6 +94,7 @@ nix_err nix_value_call_multi(
try {
state->state.callFunction(fn->value, {(nix::Value **) args, nargs}, value->value, nix::noPos);
state->state.forceValue(value->value, nix::noPos);
state->state.waitForAllPaths();
}
NIXC_CATCH_ERRS
}
Expand All @@ -102,6 +105,7 @@ nix_err nix_value_force(nix_c_context * context, EvalState * state, nix_value *
context->last_err_code = NIX_OK;
try {
state->state.forceValue(value->value, nix::noPos);
state->state.waitForAllPaths();
}
NIXC_CATCH_ERRS
}
Expand All @@ -112,6 +116,7 @@ nix_err nix_value_force_deep(nix_c_context * context, EvalState * state, nix_val
context->last_err_code = NIX_OK;
try {
state->state.forceValueDeep(value->value);
state->state.waitForAllPaths();
}
NIXC_CATCH_ERRS
}
Expand Down
1 change: 1 addition & 0 deletions src/libexpr-c/nix_api_value.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ nix_value * nix_get_attr_byname(nix_c_context * context, const nix_value * value
if (attr) {
nix_gc_incref(nullptr, attr->value);
state->state.forceValue(*attr->value, nix::noPos);
state->state.waitForAllPaths();
return as_nix_value_ptr(attr->value);
}
nix_set_err_msg(context, NIX_ERR_KEY, "missing attribute");
Expand Down
1 change: 1 addition & 0 deletions src/libexpr/eval-cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ StorePath AttrCursor::forceDerivation()
/* The eval cache contains 'drvPath', but the actual path has
been garbage-collected. So force it to be regenerated. */
aDrvPath->forceValue();
root->state.waitForPath(drvPath);
if (!root->state.store->isValidPath(drvPath))
throw Error(
"don't know how to recreate store derivation '%s'!", root->state.store->printStorePath(drvPath));
Expand Down
23 changes: 23 additions & 0 deletions src/libexpr/eval.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "nix/fetchers/fetch-to-store.hh"
#include "nix/fetchers/tarball.hh"
#include "nix/fetchers/input-cache.hh"
#include "nix/store/async-path-writer.hh"

#include "parser-tab.hh"

Expand Down Expand Up @@ -326,6 +327,7 @@ EvalState::EvalState(
, debugRepl(nullptr)
, debugStop(false)
, trylevel(0)
, asyncPathWriter(AsyncPathWriter::make(store))
, regexCache(makeRegexCache())
#if NIX_USE_BOEHMGC
, valueAllocCache(std::allocate_shared<void *>(traceable_allocator<void *>(), nullptr))
Expand Down Expand Up @@ -1024,6 +1026,7 @@ std::string EvalState::mkSingleDerivedPathStringRaw(const SingleDerivedPath & p)
auto optStaticOutputPath = std::visit(
overloaded{
[&](const SingleDerivedPath::Opaque & o) {
waitForPath(o.path);
auto drv = store->readDerivation(o.path);
auto i = drv.outputs.find(b.output);
if (i == drv.outputs.end())
Expand Down Expand Up @@ -3249,4 +3252,24 @@ void forceNoNullByte(std::string_view s, std::function<Pos()> pos)
}
}

void EvalState::waitForPath(const StorePath & path)
{
asyncPathWriter->waitForPath(path);
}

void EvalState::waitForPath(const SingleDerivedPath & path)
{
std::visit(
overloaded{
[&](const DerivedPathOpaque & p) { waitForPath(p.path); },
[&](const SingleDerivedPathBuilt & p) { waitForPath(*p.drvPath); },
},
path.raw());
}

void EvalState::waitForAllPaths()
{
asyncPathWriter->waitForAllPaths();
}

} // namespace nix
7 changes: 7 additions & 0 deletions src/libexpr/include/nix/expr/eval.hh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class StorePath;
struct SingleDerivedPath;
enum RepairFlag : bool;
struct MemorySourceAccessor;
struct AsyncPathWriter;

namespace eval_cache {
class EvalCache;
Expand Down Expand Up @@ -320,6 +321,8 @@ public:
std::list<DebugTrace> debugTraces;
std::map<const Expr *, const std::shared_ptr<const StaticEnv>> exprEnvs;

ref<AsyncPathWriter> asyncPathWriter;

const std::shared_ptr<const StaticEnv> getStaticEnv(const Expr & expr) const
{
auto i = exprEnvs.find(&expr);
Expand Down Expand Up @@ -907,6 +910,10 @@ public:

DocComment getDocCommentForPos(PosIdx pos);

void waitForPath(const StorePath & path);
void waitForPath(const SingleDerivedPath & path);
void waitForAllPaths();

private:

/**
Expand Down
6 changes: 5 additions & 1 deletion src/libexpr/primops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ StringMap EvalState::realiseContext(const NixStringContext & context, StorePathS

for (auto & c : context) {
auto ensureValid = [&](const StorePath & p) {
waitForPath(p);
if (!store->isValidPath(p))
error<InvalidPathError>(store->printStorePath(p)).debugThrow();
};
Expand Down Expand Up @@ -291,6 +292,7 @@ static void import(EvalState & state, const PosIdx pos, Value & vPath, Value * v
if (!state.store->isStorePath(path2))
return std::nullopt;
auto storePath = state.store->parseStorePath(path2);
state.waitForPath(storePath);
if (!(state.store->isValidPath(storePath) && isDerivation(path2)))
return std::nullopt;
return storePath;
Expand Down Expand Up @@ -1583,6 +1585,8 @@ static void derivationStrictInternal(EvalState & state, std::string_view drvName
[&](const NixStringContextElem::DrvDeep & d) {
/* !!! This doesn't work if readOnlyMode is set. */
StorePathSet refs;
// FIXME: don't need to wait, we only need the references.
state.waitForPath(d.drvPath);
state.store->computeFSClosure(d.drvPath, refs);
for (auto & j : refs) {
drv.inputSrcs.insert(j);
Expand Down Expand Up @@ -1707,7 +1711,7 @@ static void derivationStrictInternal(EvalState & state, std::string_view drvName
}

/* Write the resulting term into the Nix store directory. */
auto drvPath = writeDerivation(*state.store, drv, state.repair);
auto drvPath = writeDerivation(*state.store, *state.asyncPathWriter, drv, state.repair);
auto drvPathS = state.store->printStorePath(drvPath);

printMsg(lvlChatty, "instantiated '%1%' -> '%2%'", drvName, drvPathS);
Expand Down
1 change: 1 addition & 0 deletions src/libexpr/primops/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ static void prim_unsafeDiscardOutputDependency(EvalState & state, const PosIdx p
NixStringContext context2;
for (auto && c : context) {
if (auto * ptr = std::get_if<NixStringContextElem::DrvDeep>(&c.raw)) {
state.waitForPath(ptr->drvPath); // FIXME: why?
context2.emplace(NixStringContextElem::Opaque{.path = ptr->drvPath});
} else {
/* Can reuse original item */
Expand Down
151 changes: 151 additions & 0 deletions src/libstore/async-path-writer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#include "nix/store/async-path-writer.hh"
#include "nix/util/archive.hh"

#include <thread>
#include <future>

namespace nix {

struct AsyncPathWriterImpl : AsyncPathWriter
{
ref<Store> store;

struct Item
{
StorePath storePath;
std::string contents;
std::string name;
Hash hash;
StorePathSet references;
RepairFlag repair;
std::promise<void> promise;
};

struct State
{
std::vector<Item> items;
std::unordered_map<StorePath, std::shared_future<void>> futures;
bool quit = false;
};

Sync<State> state_;

std::thread workerThread;

std::condition_variable wakeupCV;

AsyncPathWriterImpl(ref<Store> store)
: store(store)
{
workerThread = std::thread([&]() {
while (true) {
std::vector<Item> items;

{
auto state(state_.lock());
while (!state->quit && state->items.empty())
state.wait(wakeupCV);
if (state->items.empty() && state->quit)
return;
std::swap(items, state->items);
}

try {
writePaths(items);
for (auto & item : items)
item.promise.set_value();
} catch (...) {
for (auto & item : items)
item.promise.set_exception(std::current_exception());
}
}
});
}

~AsyncPathWriterImpl()
{
state_.lock()->quit = true;
wakeupCV.notify_all();
workerThread.join();
}

StorePath
addPath(std::string contents, std::string name, StorePathSet references, RepairFlag repair, bool readOnly) override
{
auto hash = hashString(HashAlgorithm::SHA256, contents);

auto storePath = store->makeFixedOutputPathFromCA(
name,
TextInfo{
.hash = hash,
.references = references,
});

if (!readOnly) {
auto state(state_.lock());
std::promise<void> promise;
state->futures.insert_or_assign(storePath, promise.get_future());
state->items.push_back(
Item{
.storePath = storePath,
.contents = std::move(contents),
.name = std::move(name),
.hash = hash,
.references = std::move(references),
.repair = repair,
.promise = std::move(promise),
});
wakeupCV.notify_all();
}

return storePath;
}

void waitForPath(const StorePath & path) override
{
auto future = ({
auto state = state_.lock();
auto i = state->futures.find(path);
if (i == state->futures.end())
return;
i->second;
});
future.get();
}

void waitForAllPaths() override
{
auto futures = ({
auto state(state_.lock());
std::move(state->futures);
});
for (auto & future : futures)
future.second.get();
}

void writePaths(const std::vector<Item> & items)
{
// FIXME: use addMultipeToStore() once it doesn't require a
// NAR hash from the client for CA objects.

for (auto & item : items) {
StringSource source(item.contents);
auto storePath = store->addToStoreFromDump(
source,
item.storePath.name(),
FileSerialisationMethod::Flat,
ContentAddressMethod::Raw::Text,
HashAlgorithm::SHA256,
item.references,
item.repair);
assert(storePath == item.storePath);
}
Comment on lines +128 to +142
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed briefly in the meeting, we could instead implement pipelining, which does not require an update to the worker protocol.

Another benefit of pipelining that was not brought up is that it also allows the first response to come in before any of the other requests, reducing the latency of individual add operations.

}
};

ref<AsyncPathWriter> AsyncPathWriter::make(ref<Store> store)
{
return make_ref<AsyncPathWriterImpl>(store);
}

} // namespace nix
15 changes: 15 additions & 0 deletions src/libstore/derivations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "nix/store/common-protocol-impl.hh"
#include "nix/util/strings-inline.hh"
#include "nix/util/json-utils.hh"
#include "nix/store/async-path-writer.hh"

#include <boost/container/small_vector.hpp>
#include <nlohmann/json.hpp>
Expand Down Expand Up @@ -133,6 +134,20 @@ StorePath writeDerivation(Store & store, const Derivation & drv, RepairFlag repa
});
}

StorePath writeDerivation(
Store & store, AsyncPathWriter & asyncPathWriter, const Derivation & drv, RepairFlag repair, bool readOnly)
{
auto references = drv.inputSrcs;
for (auto & i : drv.inputDrvs.map)
references.insert(i.first);
return asyncPathWriter.addPath(
drv.unparse(store, false),
std::string(drv.name) + drvExtension,
references,
repair,
readOnly || settings.readOnlyMode);
}

namespace {
/**
* This mimics std::istream to some extent. We use this much smaller implementation
Expand Down
Loading
Loading