Skip to content

Commit 10b6757

Browse files
committed
download per slot
1 parent 362b65b commit 10b6757

File tree

5 files changed

+225
-165
lines changed

5 files changed

+225
-165
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# nimbus_verified_proxy
2+
# Copyright (c) 2025 Status Research & Development GmbH
3+
# Licensed and distributed under either of
4+
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
5+
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
6+
# at your option. This file may not be copied, modified, or distributed except according to those terms.
7+
8+
{.push raises: [], gcsafe.}
9+
10+
import
11+
stint,
12+
chronos,
13+
chronicles,
14+
presto/client,
15+
beacon_chain/spec/eth2_apis/rest_light_client_calls,
16+
beacon_chain/spec/presets,
17+
beacon_chain/spec/forks,
18+
./lc/lc_manager,
19+
./nimbus_verified_proxy_conf
20+
21+
logScope:
22+
topics = "SSZLCRestClient"
23+
24+
const
25+
MaxMessageBodyBytes* = 128 * 1024 * 1024 # 128 MB (JSON encoded)
26+
BASE_URL="/eth/v1/beacon/light_client"
27+
28+
type
29+
LCRestPeer = ref object
30+
score: int
31+
restClient: RestClientRef
32+
33+
LCRestClient* = ref object
34+
cfg: RuntimeConfig
35+
forkDigests: ref ForkDigests
36+
peers: seq[LCRestPeer]
37+
urls: seq[string]
38+
39+
func new*(T: type LCRestClient, cfg: RuntimeConfig, forkDigests: ref ForkDigests): LCRestClient =
40+
LCRestClient(cfg: cfg, forkDigests: forkDigests, peers: @[])
41+
42+
proc addEndpoint*(client: LCRestClient, endpoint: string) {.raises: [ValueError].} =
43+
if endpoint in client.urls:
44+
raise newException(ValueError, "Endpoint already added")
45+
46+
let restClient = RestClientRef.new(endpoint).valueOr:
47+
raise newException(ValueError, $error)
48+
49+
client.peers.add(LCRestPeer(score: 0, restClient: restClient))
50+
client.urls.add(endpoint)
51+
52+
proc closeAll*(client: LCRestClient) {.async: (raises: []).} =
53+
for peer in client.peers:
54+
await peer.restClient.closeWait()
55+
56+
client.peers.setLen(0)
57+
client.urls.setLen(0)
58+
59+
proc getEthLCBackend*(client: LCRestClient): EthLCBackend =
60+
61+
let
62+
getLCBootstrapProc = proc(reqId: uint64, blockRoot: Eth2Digest): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).} =
63+
let
64+
peer = client.peers[reqId mod uint64(client.peers.len)]
65+
res =
66+
try:
67+
await peer.restClient.getLightClientBootstrap(blockRoot, client.cfg, client.forkDigests)
68+
except CatchableError as e:
69+
raise newException(CancelledError, e.msg)
70+
71+
ok(res)
72+
73+
getLCUpdatesProc = proc(reqId: uint64, startPeriod: SyncCommitteePeriod, count: uint64): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [CancelledError]).} =
74+
let
75+
peer = client.peers[reqId mod uint64(client.peers.len)]
76+
res =
77+
try:
78+
await peer.restClient.getLightClientUpdatesByRange(startPeriod, count, client.cfg, client.forkDigests)
79+
except CatchableError as e:
80+
raise newException(CancelledError, e.msg)
81+
82+
ok(res)
83+
84+
getLCFinalityProc = proc(reqId: uint64): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError]).} =
85+
let
86+
peer = client.peers[reqId mod uint64(client.peers.len)]
87+
res =
88+
try:
89+
await peer.restClient.getLightClientFinalityUpdate(client.cfg, client.forkDigests)
90+
except CatchableError as e:
91+
raise newException(CancelledError, e.msg)
92+
93+
ok(res)
94+
95+
getLCOptimisticProc = proc(reqId: uint64): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError]).} =
96+
let
97+
peer = client.peers[reqId mod uint64(client.peers.len)]
98+
res =
99+
try:
100+
await peer.restClient.getLightClientOptimisticUpdate(client.cfg, client.forkDigests)
101+
except CatchableError as e:
102+
raise newException(CancelledError, e.msg)
103+
104+
ok(res)
105+
106+
updateScoreProc = proc(reqId: uint64, value: int) =
107+
let peer = client.peers[reqId mod uint64(client.peers.len)]
108+
peer.score += value
109+
110+
EthLCBackend(
111+
getLightClientBootstrap: getLCBootstrapProc,
112+
getLightClientUpdatesByRange: getLCUpdatesProc,
113+
getLightClientFinalityUpdate: getLCFinalityProc,
114+
getLightClientOptimisticUpdate: getLCOptimisticProc,
115+
updateScore: updateScoreProc,
116+
)
117+
118+

nimbus_verified_proxy/lc/lc.nim

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,28 +116,32 @@ proc new*(
116116
forkyStore.is_next_sync_committee_known
117117
else:
118118
false
119-
func getFinalizedPeriod(): SyncCommitteePeriod =
119+
120+
func getFinalizedSlot(): Slot =
120121
withForkyStore(lightClient.store[]):
121122
when lcDataFork > LightClientDataFork.None:
122-
forkyStore.finalized_header.beacon.slot.sync_committee_period
123+
forkyStore.finalized_header.beacon.slot
123124
else:
124-
GENESIS_SLOT.sync_committee_period
125+
GENESIS_SLOT
125126

126-
func getOptimisticPeriod(): SyncCommitteePeriod =
127+
func getOptimisticSlot(): Slot =
127128
withForkyStore(lightClient.store[]):
128129
when lcDataFork > LightClientDataFork.None:
129-
forkyStore.optimistic_header.beacon.slot.sync_committee_period
130+
forkyStore.optimistic_header.beacon.slot
130131
else:
131-
GENESIS_SLOT.sync_committee_period
132+
GENESIS_SLOT
132133

133134
lightClient.manager = LightClientManager.init(
134135
rng, getTrustedBlockRoot, bootstrapVerifier, updateVerifier,
135136
finalityVerifier, optimisticVerifier, isLightClientStoreInitialized,
136-
isNextSyncCommitteeKnown, getFinalizedPeriod, getOptimisticPeriod, getBeaconTime,
137+
isNextSyncCommitteeKnown, getFinalizedSlot, getOptimisticSlot, getBeaconTime,
137138
)
138139

139140
lightClient
140141

142+
proc setBackend*(lightClient: LightClient, backend: EthLCBackend) =
143+
lightClient.manager.backend = backend
144+
141145
proc start*(lightClient: LightClient) =
142146
info "Starting beacon light client", trusted_block_root = lightClient.trustedBlockRoot
143147
lightClient.manager.start()

nimbus_verified_proxy/lc/lc_manager.nim

Lines changed: 80 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -51,23 +51,23 @@ type
5151
proc(): Option[Eth2Digest] {.gcsafe, raises: [].}
5252
GetBoolCallback* =
5353
proc(): bool {.gcsafe, raises: [].}
54-
GetSyncCommitteePeriodCallback* =
55-
proc(): SyncCommitteePeriod {.gcsafe, raises: [].}
54+
GetSlotCallback* =
55+
proc(): Slot {.gcsafe, raises: [].}
5656

57-
LightClientUpdatesByRangeResponse = NetRes[List[ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]]
57+
LightClientUpdatesByRangeResponse* = NetRes[seq[ForkedLightClientUpdate]]
5858

5959
LightClientBootstrapProc = proc(id: uint64, blockRoot: Eth2Digest): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).}
6060
LightClientUpdatesByRangeProc = proc(id: uint64, startPeriod: SyncCommitteePeriod, count: uint64): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [CancelledError]).}
6161
LightClientFinalityUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError]).}
6262
LightClientOptimisticUpdateProc = proc(id: uint64): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError]).}
63-
ReportRequestQualityProc = proc(id: uint64, value: int) {.gcsafe, raises: [].}
63+
UpdateScoreProc = proc(id: uint64, value: int) {.gcsafe, raises: [].}
6464

6565
EthLCBackend* = object
66-
getLightClientBootstrap: LightClientBootstrapProc
67-
getLightClientUpdatesByRange: LightClientUpdatesByRangeProc
68-
getLightClientFinalityUpdate: LightClientFinalityUpdateProc
69-
getLightClientOptimisticUpdate: LightClientOptimisticUpdateProc
70-
reportRequestQuality: ReportRequestQualityProc
66+
getLightClientBootstrap*: LightClientBootstrapProc
67+
getLightClientUpdatesByRange*: LightClientUpdatesByRangeProc
68+
getLightClientFinalityUpdate*: LightClientFinalityUpdateProc
69+
getLightClientOptimisticUpdate*: LightClientOptimisticUpdateProc
70+
updateScore*: UpdateScoreProc
7171

7272
LightClientManager* = object
7373
rng: ref HmacDrbgContext
@@ -79,8 +79,8 @@ type
7979
optimisticUpdateVerifier: OptimisticUpdateVerifier
8080
isLightClientStoreInitialized: GetBoolCallback
8181
isNextSyncCommitteeKnown: GetBoolCallback
82-
getFinalizedPeriod: GetSyncCommitteePeriodCallback
83-
getOptimisticPeriod: GetSyncCommitteePeriodCallback
82+
getFinalizedSlot: GetSlotCallback
83+
getOptimisticSlot: GetSlotCallback
8484
getBeaconTime: GetBeaconTimeFn
8585
loopFuture: Future[void].Raising([CancelledError])
8686

@@ -94,8 +94,8 @@ func init*(
9494
optimisticUpdateVerifier: OptimisticUpdateVerifier,
9595
isLightClientStoreInitialized: GetBoolCallback,
9696
isNextSyncCommitteeKnown: GetBoolCallback,
97-
getFinalizedPeriod: GetSyncCommitteePeriodCallback,
98-
getOptimisticPeriod: GetSyncCommitteePeriodCallback,
97+
getFinalizedSlot: GetSlotCallback,
98+
getOptimisticSlot: GetSlotCallback,
9999
getBeaconTime: GetBeaconTimeFn,
100100
): LightClientManager =
101101
## Initialize light client manager.
@@ -108,8 +108,8 @@ func init*(
108108
optimisticUpdateVerifier: optimisticUpdateVerifier,
109109
isLightClientStoreInitialized: isLightClientStoreInitialized,
110110
isNextSyncCommitteeKnown: isNextSyncCommitteeKnown,
111-
getFinalizedPeriod: getFinalizedPeriod,
112-
getOptimisticPeriod: getOptimisticPeriod,
111+
getFinalizedSlot: getFinalizedSlot,
112+
getOptimisticSlot: getOptimisticSlot,
113113
getBeaconTime: getBeaconTime)
114114

115115
# https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.3/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap
@@ -171,7 +171,7 @@ template valueVerifier[E](
171171
iterator values(v: auto): auto =
172172
## Local helper for `workerTask` to share the same implementation for both
173173
## scalar and aggregate values, by treating scalars as 1-length aggregates.
174-
when v is List:
174+
when v is seq:
175175
for i in v:
176176
yield i
177177
else:
@@ -215,7 +215,7 @@ proc workerTask[E](
215215
notice "Received value from an unviable fork", value = forkyObject, endpoint = E.name
216216
else:
217217
notice "Received value from an unviable fork", endpoint = E.name
218-
self.backend.reportRequestQuality(reqId, PeerScoreUnviableFork)
218+
self.backend.updateScore(reqId, PeerScoreUnviableFork)
219219
return didProgress
220220
of LightClientVerifierError.Invalid:
221221
# Descore, received data is malformed
@@ -224,19 +224,19 @@ proc workerTask[E](
224224
warn "Received invalid value", value = forkyObject.shortLog, endpoint = E.name
225225
else:
226226
warn "Received invalid value", endpoint = E.name
227-
self.backend.reportRequestQuality(reqId, PeerScoreBadValues)
227+
self.backend.updateScore(reqId, PeerScoreBadValues)
228228
return didProgress
229229
else:
230230
# Reward, peer returned something useful
231231
applyReward = true
232232
didProgress = true
233233
if applyReward:
234-
self.backend.reportRequestQuality(reqId, PeerScoreGoodValues)
234+
self.backend.updateScore(reqId, PeerScoreGoodValues)
235235
else:
236-
self.backend.reportRequestQuality(reqId, PeerScoreNoValues)
236+
self.backend.updateScore(reqId, PeerScoreNoValues)
237237
debug "Failed to receive value on request", value, endpoint = E.name
238238
except ResponseError as exc:
239-
self.backend.reportRequestQuality(reqId, PeerScoreBadValues)
239+
self.backend.updateScore(reqId, PeerScoreBadValues)
240240
warn "Received invalid response", error = exc.msg, endpoint = E.name
241241
except CancelledError as exc:
242242
raise exc
@@ -324,59 +324,77 @@ template query[E](
324324

325325
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.0/specs/altair/light-client/light-client.md#light-client-sync-process
326326
proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} =
327-
var nextSyncTaskTime = self.getBeaconTime()
327+
var
328+
downloadOptimistic = true
329+
downloadFinality = false
330+
didOptimisticProgress = false
331+
didFinalityProgress = false
328332

329333
while true:
330-
# Periodically wake and check for changes
331-
let wallTime = self.getBeaconTime()
332-
if wallTime < nextSyncTaskTime:
333-
await sleepAsync(chronos.seconds(2))
334-
continue
334+
let
335+
wallTime = self.getBeaconTime()
336+
currentSlot = wallTime.slotOrZero()
337+
currentEpoch = (currentSlot mod SLOTS_PER_EPOCH)
338+
currentPeriod = currentSlot.sync_committee_period
339+
finalizedSlot = self.getFinalizedSlot()
340+
finalizedPeriod = finalizedSlot.sync_committee_period
341+
finalizedEpoch = (finalizedSlot mod SLOTS_PER_EPOCH)
342+
optimisticSlot = self.getOptimisticSlot()
343+
optimisticPeriod = optimisticSlot.sync_committee_period
344+
optimisitcEpoch = (optimisticSlot mod SLOTS_PER_EPOCH)
335345

336346
# Obtain bootstrap data once a trusted block root is supplied
337347
if not self.isLightClientStoreInitialized():
338348
let trustedBlockRoot = self.getTrustedBlockRoot()
349+
350+
# reattempt bootstrap download in 2 seconds
339351
if trustedBlockRoot.isNone:
352+
debug "TrustedBlockRoot unavaialble re-attempting bootstrap download"
340353
await sleepAsync(chronos.seconds(2))
341354
continue
342355

343356
let didProgress = await self.query(Bootstrap, trustedBlockRoot.get)
344-
nextSyncTaskTime =
345-
if didProgress:
346-
wallTime
347-
else:
348-
wallTime + self.rng.computeDelayWithJitter(chronos.seconds(0))
349-
continue
350357

351-
# Fetch updates
352-
let
353-
current = wallTime.slotOrZero().sync_committee_period
354-
355-
syncTask = nextLightClientSyncTask(
356-
current = current,
357-
finalized = self.getFinalizedPeriod(),
358-
optimistic = self.getOptimisticPeriod(),
359-
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown())
360-
361-
didProgress =
362-
case syncTask.kind
363-
of LcSyncKind.UpdatesByRange:
364-
await self.query(UpdatesByRange,
365-
(startPeriod: syncTask.startPeriod, count: syncTask.count))
366-
of LcSyncKind.FinalityUpdate:
367-
await self.query(FinalityUpdate)
368-
of LcSyncKind.OptimisticUpdate:
369-
await self.query(OptimisticUpdate)
370-
371-
nextSyncTaskTime =
372-
wallTime +
373-
self.rng.nextLcSyncTaskDelay(
374-
wallTime,
375-
finalized = self.getFinalizedPeriod(),
376-
optimistic = self.getOptimisticPeriod(),
377-
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown(),
378-
didLatestSyncTaskProgress = didProgress
379-
)
358+
# reattempt bootstrap download in 2 seconds
359+
if not didProgress:
360+
debug "Re-attempting bootstrap download"
361+
await sleepAsync(chronos.seconds(2))
362+
continue
363+
364+
# check and download sync committee updates
365+
if finalizedPeriod == optimisticPeriod and not self.isNextSyncCommitteeKnown():
366+
if finalizedPeriod >= currentPeriod:
367+
debug "Downloading light client sync committee updates", start_period=finalizedPeriod, count=1
368+
discard await self.query(UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(1)))
369+
else:
370+
let count = min(currentPeriod - finalizedPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
371+
debug "Downloading light client sync committee updates", start_period=finalizedPeriod, count=count
372+
discard await self.query(UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(count)))
373+
elif finalizedPeriod + 1 < currentPeriod:
374+
let count = min(currentPeriod - (finalizedPeriod + 1), MAX_REQUEST_LIGHT_CLIENT_UPDATES)
375+
debug "Downloading light client sync committee updates", start_period=finalizedPeriod, count=count
376+
discard await self.query(UpdatesByRange, (startPeriod: finalizedPeriod, count: uint64(count)))
377+
378+
# check and download optimistic update
379+
if optimisticSlot < currentSlot:
380+
debug "Downloading light client optimistic updates", slot=currentSlot
381+
let didProgress = await self.query(OptimisticUpdate)
382+
if not didProgress:
383+
# retry in 2 seconds
384+
await sleepAsync(chronos.seconds(2))
385+
continue
386+
387+
# check and download finality update
388+
if currentEpoch > finalizedEpoch + 2:
389+
debug "Downloading light client finality updates", slot=currentSlot
390+
let didProgress = await self.query(FinalityUpdate)
391+
if not didProgress:
392+
# retry in two seconds
393+
await sleepAsync(chronos.seconds(2))
394+
continue
395+
396+
# check for updates every slot
397+
await sleepAsync(chronos.seconds(int64(SECONDS_PER_SLOT)))
380398

381399
proc start*(self: var LightClientManager) =
382400
## Start light client manager's loop.

0 commit comments

Comments
 (0)