Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 51 additions & 49 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,8 @@ LedgerManagerImpl::ApplyState::handleUpgradeAffectingSorobanInMemoryStateSize(
void
LedgerManagerImpl::ApplyState::finishPendingCompilation()
{
assertWritablePhase();
threadInvariant();
releaseAssert(mPhase == Phase::SETTING_UP_STATE);
releaseAssert(mCompiler);
auto newCache = mCompiler->wait();
getMetrics().mSorobanMetrics.mModuleCacheRebuildBytes.set_count(
Expand Down Expand Up @@ -1032,7 +1033,14 @@ void
LedgerManagerImpl::ApplyState::markEndOfCommitting()
{
assertCommittingPhase();
mPhase = Phase::READY_TO_APPLY;
if (isCompilationRunning())
{
mPhase = Phase::SETTING_UP_STATE;
}
else
{
mPhase = Phase::READY_TO_APPLY;
}
}

void
Expand Down Expand Up @@ -1106,51 +1114,34 @@ LedgerManagerImpl::ApplyState::maybeRebuildModuleCache(
// unbounded growth.
//
// Unfortunately we do not know exactly how much memory is used by each byte
// of contract we compile, and the size estimates from the cost model have
// to assume a worst case which is almost a factor of _40_ larger than the
// byte-size of the contracts. So for example if we assume 100MB of
// contracts, the cost model says we ought to budget for 4GB of memory, just
// in case _all 100MB of contracts_ are "the worst case contract" that's
// just a continuous stream of function definitions.
// of contract we compile. But we do know how much wasm we fed _into_ the
// compiler, and we can assume that the network's cost model is already
// serving to roughly bound the live set of contracts in the BL.
//
// So: we take this multiplier, times the size of the contracts we _last_
// drew from the BL when doing a full recompile, times two, as a cap on the
// _current_ (post-rebuild, currently-growing) cache's budget-tracked
// memory. This should avoid rebuilding spuriously, while still treating
// events that double the size of the contract-set in the live BL as an
// event that warrants a rebuild.

// We try to fish the current cost multiplier out of the soroban network
// config's memory cost model, but fall back to a conservative default in
// case there is no mem cost param for VmInstantiation (This should never
// happen but just in case).
uint64_t linearTerm = 5000;

// linearTerm is in 1/128ths in the cost model, to reduce rounding error.
uint64_t scale = 128;
auto sorobanConfig = SorobanNetworkConfig::loadFromLedger(snap);
auto const& memParams = sorobanConfig.memCostParams();
if (memParams.size() > (size_t)stellar::VmInstantiation)
{
auto const& param = memParams[(size_t)stellar::VmInstantiation];
linearTerm = param.linearTerm;
}
auto lastBytesCompiled =
// So: we take the input size of the contracts we _last_ drew from the BL
// when doing a full recompile (which we always do on startup at least), and
// multiply it by two, and use that as a cap on the _current_ (post-rebuild,
// currently-growing) cache's wasm input bytes. This should avoid rebuilding
// spuriously, while still treating events that double the size of the
// contract-set in the live BL as an event that warrants a rebuild.

int64_t lastCompiledWasmBytesCount =
getMetrics().mSorobanMetrics.mModuleCacheRebuildBytes.count();
uint64_t limit = 2 * lastBytesCompiled * linearTerm / scale;

uint64_t lastCompiledWasmBytes =
lastCompiledWasmBytesCount < 0
? 0
: static_cast<uint64_t>(lastCompiledWasmBytesCount);
uint64_t currCompiledWasmBytes = 0;
for (auto const& v : mModuleCacheProtocols)
{
auto bytesConsumed = mModuleCache->get_mem_bytes_consumed(v);
if (bytesConsumed > limit)
{
CLOG_DEBUG(Ledger,
"Rebuilding module cache: worst-case estimate {} "
"model-bytes consumed of {} limit",
bytesConsumed, limit);
startCompilingAllContracts(snap, minLedgerVersion);
break;
}
currCompiledWasmBytes += mModuleCache->get_wasm_bytes_input(v);
}
if (currCompiledWasmBytes > (2 * lastCompiledWasmBytes))
{
CLOG_DEBUG(Ledger,
"Rebuilding module cache after {} wasm bytes compiled",
currCompiledWasmBytes);
startCompilingAllContracts(snap, minLedgerVersion);
}
}

Expand Down Expand Up @@ -1458,6 +1449,7 @@ LedgerManagerImpl::applyLedger(LedgerCloseData const& ledgerData,
if (mApplyState.isCompilationRunning())
{
mApplyState.finishPendingCompilation();
mApplyState.markEndOfSetupPhase();
}

#ifdef BUILD_TESTS
Expand Down Expand Up @@ -3062,21 +3054,31 @@ LedgerManagerImpl::ApplyState::addAnyContractsToModuleCache(
{
if (e.data.type() == CONTRACT_CODE)
{
using rslice = ::rust::Slice<uint8_t const>;
auto const& key = e.data.contractCode().hash;
auto const& wasm = e.data.contractCode().code;
rslice const keySlice{key.data(), key.size()};
rslice const wasmSlice{wasm.data(), wasm.size()};
for (auto const& v : mModuleCacheProtocols)
{
if (v >= ledgerVersion)
{
auto const& wasm = e.data.contractCode().code;
if (mModuleCache->contains_module(v, keySlice))
{
CLOG_DEBUG(Ledger,
"module cache already contains wasm {} "
"for protocol {}",
binToHex(key), v);
continue;
}
CLOG_DEBUG(Ledger,
"compiling wasm {} for protocol {} module cache",
binToHex(sha256(wasm)), v);
auto slice =
rust::Slice<uint8_t const>(wasm.data(), wasm.size());
getMetrics().mSorobanMetrics.mModuleCacheNumEntries.inc();
binToHex(key), v);
auto timer =
getMetrics()
.mSorobanMetrics.mModuleCompilationTime.TimeScope();
mModuleCache->compile(v, slice);
mModuleCache->compile(v, wasmSlice);
getMetrics().mSorobanMetrics.mModuleCacheNumEntries.inc();
}
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/ledger/LedgerManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class LedgerManagerImpl : public LedgerManager
// During the ledger close process, the apply state goes through these
// phases:
// - SETTING_UP_STATE: LedgerManager is waiting for or setting up
// ApplyState. This occurs on startup and after BucketApply during
// catchup.
// ApplyState. This occurs on startup, after BucketApply during
// catchup, and any time the module cache has to be rebuilt.
// - READY_TO_APPLY: Apply State is ready but not actively executing
// transactions or committing ledger state. ApplyState is immutable.
// - APPLYING: ApplyState is actively executing transactions.
Expand All @@ -135,14 +135,23 @@ class LedgerManagerImpl : public LedgerManager
// commits state to disk, and advances the ledger header.
//
// Phase transitions:
// SETTING_UP_STATE -> READY_TO_APPLY -> SETTING_UP_STATE
// |
// -> APPLYING -> COMMITTING -> READY_TO_APPLY
//
// SETTING_UP_STATE <-------(rebuild cache)----+
// | ^ |
// | | (catchup) |
// v | |
// READY_TO_APPLY <----(no rebuild cache)---- COMMITTING
// | ^
// v |
// APPLYING ---------------------------------+
//
// SETTING_UP_STATE is the initial phase on startup. ApplyState may
// also transition from READY_TO_APPLY -> SETTING_UP_STATE if a node
// falls out of sync and must enter catchup, which requires re-entering
// the SETTING_UP_STATE phase to reset lcl state.
// the SETTING_UP_STATE phase to reset lcl state. After COMMITTING,
// the state returns to SETTING_UP_STATE if a module cache rebuild
// is needed, or directly to READY_TO_APPLY otherwise. In both cases
// READY_TO_APPLY is always reached before entering APPLYING.
//
// APPLYING is the only phase in which Soroban execution
// threads are active.
Expand Down
4 changes: 2 additions & 2 deletions src/ledger/SharedModuleCacheCompiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ size_t
SharedModuleCacheCompiler::getBytesCompiled()
{
std::unique_lock lock(mMutex);
return mBytesCompiled;
return mBytesCompiled * mLedgerVersions.size();
}

std::chrono::nanoseconds
Expand All @@ -234,7 +234,7 @@ size_t
SharedModuleCacheCompiler::getContractsCompiled()
{
std::unique_lock lock(mMutex);
return mContractsCompiled;
return mContractsCompiled * mLedgerVersions.size();
}

}
2 changes: 1 addition & 1 deletion src/rust/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ pub(crate) mod rust_bridge {
fn evict_contract_code(self: &SorobanModuleCache, key: &[u8]) -> Result<()>;
fn clear(self: &SorobanModuleCache) -> Result<()>;
fn contains_module(self: &SorobanModuleCache, protocol: u32, key: &[u8]) -> Result<bool>;
fn get_mem_bytes_consumed(self: &SorobanModuleCache, protocol: u32) -> Result<u64>;
fn get_wasm_bytes_input(self: &SorobanModuleCache, protocol: u32) -> Result<u64>;

// Given a quorum set configuration, checks if quorum intersection is
// enjoyed among all possible quorums. Returns `Ok(status)` where
Expand Down
12 changes: 6 additions & 6 deletions src/rust/src/soroban_module_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,19 @@ impl SorobanModuleCache {
_ => Err(protocol_agnostic::make_error("unsupported protocol")),
}
}
pub fn get_mem_bytes_consumed(
pub fn get_wasm_bytes_input(
&self,
ledger_protocol: u32,
) -> Result<u64, Box<dyn std::error::Error>> {
#[allow(unused_mut)]
let mut bytes = 0;
match ledger_protocol {
23 => bytes = bytes.max(self.p23_cache.get_mem_bytes_consumed()?),
24 => bytes = bytes.max(self.p24_cache.get_mem_bytes_consumed()?),
25 => bytes = bytes.max(self.p25_cache.get_mem_bytes_consumed()?),
26 => bytes = bytes.max(self.p26_cache.get_mem_bytes_consumed()?),
23 => bytes = bytes.max(self.p23_cache.get_wasm_bytes_input()?),
24 => bytes = bytes.max(self.p24_cache.get_wasm_bytes_input()?),
25 => bytes = bytes.max(self.p25_cache.get_wasm_bytes_input()?),
26 => bytes = bytes.max(self.p26_cache.get_wasm_bytes_input()?),
#[cfg(feature = "next")]
27 => bytes = bytes.max(self.p26_cache.get_mem_bytes_consumed()?),
27 => bytes = bytes.max(self.p26_cache.get_wasm_bytes_input()?),
_ => return Err(protocol_agnostic::make_error("unsupported protocol")),
}
Ok(bytes)
Expand Down
28 changes: 15 additions & 13 deletions src/rust/src/soroban_proto_any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,9 @@ pub(crate) struct ProtocolSpecificModuleCache {
// `CompilationContext` is _not_ threadsafe (specifically its `Budget` is
// not) and so rather than reuse a single `CompilationContext` across
// threads, we make a throwaway `CompilationContext` on each `compile` call,
// and _copy out_ the memory usage (which we want to publish back to core).
pub(crate) mem_bytes_consumed: std::sync::atomic::AtomicU64,
// and track only a single _input_ value (which we want to publish back to
// core).
pub(crate) wasm_bytes_input: std::sync::atomic::AtomicU64,
}

#[allow(dead_code)]
Expand All @@ -716,7 +717,7 @@ impl ProtocolSpecificModuleCache {
let module_cache = ModuleCache::new(&compilation_context)?;
Ok(ProtocolSpecificModuleCache {
module_cache,
mem_bytes_consumed: std::sync::atomic::AtomicU64::new(0),
wasm_bytes_input: std::sync::atomic::AtomicU64::new(0),
})
}

Expand All @@ -727,12 +728,8 @@ impl ProtocolSpecificModuleCache {
get_max_proto(),
wasm,
);
self.mem_bytes_consumed.fetch_add(
compilation_context
.unlimited_budget
.get_mem_bytes_consumed()?,
std::sync::atomic::Ordering::SeqCst,
);
self.wasm_bytes_input
.fetch_add(wasm.len() as u64, std::sync::atomic::Ordering::SeqCst);
Ok(res?)
}

Expand All @@ -742,7 +739,12 @@ impl ProtocolSpecificModuleCache {
}

pub(crate) fn clear(&self) -> Result<(), Box<dyn std::error::Error>> {
Ok(self.module_cache.clear()?)
let res = self.module_cache.clear();
if res.is_ok() {
self.wasm_bytes_input
.store(0, std::sync::atomic::Ordering::SeqCst);
}
Ok(res?)
}

pub(crate) fn contains_module(
Expand All @@ -752,9 +754,9 @@ impl ProtocolSpecificModuleCache {
Ok(self.module_cache.contains_module(&key.clone().into())?)
}

pub(crate) fn get_mem_bytes_consumed(&self) -> Result<u64, Box<dyn std::error::Error>> {
pub(crate) fn get_wasm_bytes_input(&self) -> Result<u64, Box<dyn std::error::Error>> {
Ok(self
.mem_bytes_consumed
.wasm_bytes_input
.load(std::sync::atomic::Ordering::SeqCst))
}

Expand All @@ -771,7 +773,7 @@ impl ProtocolSpecificModuleCache {
let module_cache = self.module_cache.clone();
Ok(ProtocolSpecificModuleCache {
module_cache,
mem_bytes_consumed: std::sync::atomic::AtomicU64::new(0),
wasm_bytes_input: std::sync::atomic::AtomicU64::new(0),
})
}
}
Loading
Loading