1515 pkg/ eth/ common,
1616 pkg/ stew/ [interval_set, sorted_set],
1717 ../ worker_desc,
18- ./ headers_staged/ [headers_fetch, staged_collect, staged_headers],
18+ ./ headers_staged/ [headers_fetch, staged_headers],
1919 ./ headers_unproc
2020
2121# ------------------------------------------------------------------------------
@@ -27,7 +27,7 @@ func headersStagedCollectOk*(buddy: BeaconBuddyRef): bool =
2727 if buddy.ctrl.running:
2828 let ctx = buddy.ctx
2929 if 0 < ctx.headersUnprocAvail () and
30- not ctx.collectModeStopped ():
30+ not ctx.headersModeStopped ():
3131 return true
3232 false
3333
@@ -93,92 +93,56 @@ proc headersStagedCollect*(
9393 discard ctx.headersUnprocFetch (top - dangling).expect (" iv" )
9494
9595 let
96- # Reserve the full range of block numbers so they can be appended in a
97- # row. This avoid some fragmentation when header chains are stashed by
98- # multiple peers, i.e. they interleave peer task-wise.
99- iv = ctx.headersUnprocFetch (nFetchHeadersBatchListLen).valueOr:
100- break fetchHeadersBody # done, exit this function
101-
10296 # Get parent hash from the most senior stored header
10397 parent = ctx.hdrCache.antecedent.parentHash
10498
105- # Fetch headers and store them on the header chain cache. The function
106- # returns the last unprocessed block number
107- bottom = await buddy.collectAndStashOnDiskCache (iv, parent, info)
108-
109- # Check whether there were some headers fetched at all
110- if bottom < iv.maxPt:
111- nStored += (iv.maxPt - bottom) # statistics
112- ctx.pool.seenData = true # header data exist
99+ # Fetch some headers
100+ rev = (await buddy.headersFetch (
101+ parent, nFetchHeadersRequest, info)).valueOr:
102+ break fetchHeadersBody # error => exit block
113103
114- # Job might have been cancelled or completed while downloading headers.
115- # If so, no more bookkeeping of headers must take place. The *books*
116- # might have been reset and prepared for the next stage.
117- if ctx.collectModeStopped ():
118- trace info & " : stopped fetching/storing headers" , peer, iv,
119- bottom= bottom.bnStr, nStored, syncState= ($ buddy.syncState)
120- break fetchHeadersBody # done, exit this function
104+ ctx.pool.seenData = true # header data exist
121105
122- # Commit partially processed block numbers
123- if iv.minPt <= bottom:
124- ctx. headersUnprocCommit (iv,iv.minPt,bottom) # partial success only
125- break fetchHeadersBody # done, exit this function
106+ # Store it on the header chain cache
107+ let dTop = ctx.hdrCache.antecedent.number # current antecedent
108+ if not buddy. headersStashOnDisk (rev, buddy.peerID, info):
109+ break fetchHeadersBody # error => exit block
126110
127- ctx.headersUnprocCommit (iv) # all headers processed
111+ let dBottom = ctx.hdrCache.antecedent.number # update new antecedent
112+ nStored += (dTop - dBottom) # statistics
128113
129- debug info & " : fetched headers count" , peer,
130- unprocTop= ctx.headersUnprocAvailTop.bnStr,
131- D= ctx.hdrCache.antecedent.bnStr, nStored, nStagedQ= ctx.hdr.staged.len,
132- syncState= ($ buddy.syncState)
114+ if dBottom == dTop:
115+ break fetchHeadersBody # nothing achieved
133116
134- # Buddy might have been cancelled while downloading headers.
135- if buddy.ctrl.stopped:
136- break fetchHeadersBody
117+ if buddy.ctrl.stopped: # peer was cancelled
118+ break fetchHeadersBody # done, exit this block
137119
138120 # End while: `collectAndStashOnDiskCache()`
139121
140122 # Continue opportunistically fetching by block number rather than hash. The
141123 # fetched headers need to be staged and checked/serialised later.
142124 if ctx.hdr.staged.len + ctx.hdr.reserveStaged < headersStagedQueueLengthMax:
143125
144- let
145- # Comment see deterministic case
146- iv = ctx.headersUnprocFetch (nFetchHeadersBatchListLen).valueOr:
147- break fetchHeadersBody # done, exit this function
148-
149- # This record will accumulate the fetched headers. It must be on the
150- # heap so that `async` can capture that properly.
151- lhc = (ref LinkedHChain )(peerID: buddy.peerID)
152-
153- # Fetch headers and fill up the headers list of `lhc`. The function
154- # returns the last unprocessed block number.
126+ # Fetch headers
155127 ctx.hdr.reserveStaged.inc # Book a slot on `staged`
156- let bottom = await buddy.collectAndStageOnMemQueue (iv, lhc, info)
128+ let rc = await buddy.headersFetch (
129+ EMPTY_ROOT_HASH , nFetchHeadersRequest, info)
157130 ctx.hdr.reserveStaged.dec # Free that slot again
158131
159- nQueued = lhc.revHdrs.len # statistics
132+ if rc.isErr:
133+ break fetchHeadersBody # done, exit this block
160134
161- # Job might have been cancelled or completed while downloading headers.
162- # If so, no more bookkeeping of headers must take place. The *books*
163- # might have been reset and prepared for the next stage.
164- if ctx.collectModeStopped ():
165- trace info & " : stopped fetching/staging headers" , peer, iv,
166- bottom= bottom.bnStr, nStored, syncState= ($ buddy.syncState)
167- break fetchHeadersBody # done, exit this function
168-
169- # Store `lhc` chain on the `staged` queue if there is any
170- if 0 < lhc.revHdrs.len:
171- let qItem = ctx.hdr.staged.insert (iv.maxPt).valueOr:
172- raiseAssert info & " : duplicate key on staged queue iv=" & $ iv
173- qItem.data = lhc[]
174-
175- # Commit processed block numbers
176- if iv.minPt <= bottom:
177- ctx.headersUnprocCommit (iv,iv.minPt,bottom) # partial success only
178- break fetchHeadersBody # done, exit this function
135+ let
136+ # Insert headers list on the `staged` queue
137+ key = rc.value[0 ].number
138+ qItem = ctx.hdr.staged.insert (key).valueOr:
139+ raiseAssert info & " : duplicate key on staged queue" &
140+ " iv=" & (rc.value[^ 1 ].number,key).bnStr
141+ qItem.data.revHdrs = rc.value
142+ qItem.data.peerID = buddy.peerID
179143
180- ctx. headersUnprocCommit (iv) # all headers processed
181- # End inner block
144+ nQueued = rc.value.len # statistics
145+ # End if
182146
183147 # End block: `fetchHeadersBody`
184148
@@ -196,9 +160,10 @@ proc headersStagedCollect*(
196160 return
197161
198162 info " Queued/staged or DB/stored headers" ,
199- unprocTop= (if ctx.collectModeStopped (): " n/a"
163+ unprocTop= (if ctx.headersModeStopped (): " n/a"
200164 else : ctx.headersUnprocAvailTop.bnStr),
201- nQueued, nStored, nStagedQ= ctx.hdr.staged.len, nSyncPeers= ctx.pool.nBuddies
165+ nQueued, nStored, nStagedQ= ctx.hdr.staged.len,
166+ nSyncPeers= ctx.pool.nBuddies
202167
203168
204169proc headersStagedProcess * (buddy: BeaconBuddyRef ; info: static [string ]): bool =
@@ -216,22 +181,22 @@ proc headersStagedProcess*(buddy: BeaconBuddyRef; info: static[string]): bool =
216181 return false # switch peer
217182
218183 var
219- nStored = 0 # statistics
220- switchPeer = false # for return code
184+ nStored = 0 u64 # statistics
185+ switchPeer = false # for return code
221186
222187 while ctx.hdrCache.state == collecting:
223188
224189 # Fetch list with largest block numbers
225190 let qItem = ctx.hdr.staged.le (high BlockNumber ).valueOr:
226- break # all done
191+ break # all done
227192
228193 let
229194 minNum = qItem.data.revHdrs[^ 1 ].number
230195 maxNum = qItem.data.revHdrs[0 ].number
231196 dangling = ctx.hdrCache.antecedent.number
232197 if maxNum + 1 < dangling:
233- debug info & " : gap, serialisation postponed" , peer,
234- qItem= qItem.data.bnStr, D= dangling.bnStr, nStored,
198+ trace info & " : gap, serialisation postponed" , peer,
199+ qItem= qItem.data.revHdrs. bnStr, D= dangling.bnStr, nStored,
235200 nStagedQ= ctx.hdr.staged.len, nSyncPeers= ctx.pool.nBuddies
236201 switchPeer = true # there is a gap -- come back later
237202 break
@@ -240,18 +205,14 @@ proc headersStagedProcess*(buddy: BeaconBuddyRef; info: static[string]): bool =
240205 discard ctx.hdr.staged.delete (qItem.key)
241206
242207 # Store headers on database
243- if not buddy.headersStashOnDisk (qItem.data.revHdrs, info):
244- # Error mark buddy that produced that unusable headers list
245- ctx.incHdrProcErrors qItem.data.peerID
246-
208+ if not buddy.headersStashOnDisk (
209+ qItem.data.revHdrs, qItem.data.peerID, info):
247210 ctx.headersUnprocAppend (minNum, maxNum)
248211 switchPeer = true
249212 break
250213
251- # Antecedent `dangling` of the header cache might not be at `revHdrs[^1]`.
252- let revHdrsLen = maxNum - ctx.hdrCache.antecedent.number + 1
253-
254- nStored += revHdrsLen.int # count headers
214+ # Antecedent of the header cache might not be at `revHdrs[^1]`.
215+ nStored += (maxNum - ctx.hdrCache.antecedent.number + 1 ) # count headers
255216 # End while loop
256217
257218 if 0 < nStored:
0 commit comments