Skip to content

Commit 6264ece

Browse files
author
Neil Booth
committed
No longer do block processing in a thread.
1 parent bd5807d commit 6264ece

File tree

1 file changed

+23
-20
lines changed

1 file changed

+23
-20
lines changed

electrumx/server/block_processor.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111

1212
import asyncio
1313
import time
14+
from asyncio import sleep
1415

15-
from aiorpcx import TaskGroup, run_in_thread, CancelledError
16+
from aiorpcx import TaskGroup, CancelledError
1617

1718
import electrumx
1819
from electrumx.server.daemon import DaemonError
@@ -54,7 +55,7 @@ async def main_loop(self, bp_height):
5455
# Sleep a while if there is nothing to prefetch
5556
await self.refill_event.wait()
5657
if not await self._prefetch_blocks():
57-
await asyncio.sleep(self.polling_delay)
58+
await sleep(self.polling_delay)
5859
except DaemonError as e:
5960
self.logger.info(f'ignoring daemon error: {e}')
6061
except CancelledError as e:
@@ -190,16 +191,14 @@ def __init__(self, env, db, daemon, notifications):
190191
# Signalled after backing up during a reorg
191192
self.backed_up_event = asyncio.Event()
192193

193-
async def run_in_thread_with_lock(self, func, *args):
194-
# Run in a thread to prevent blocking. Shielded so that
195-
# cancellations from shutdown don't lose work - when the task
196-
# completes the data will be flushed and then we shut down.
197-
# Take the state lock to be certain in-memory state is
198-
# consistent and not being updated elsewhere.
199-
async def run_in_thread_locked():
194+
async def run_with_lock(self, coro):
195+
# Shielded so that cancellations from shutdown don't lose work - when the task
196+
# completes the data will be flushed and then we shut down. Take the state lock
197+
# to be certain in-memory state is consistent and not being updated elsewhere.
198+
async def run_locked():
200199
async with self.state_lock:
201-
return await run_in_thread(func, *args)
202-
return await asyncio.shield(run_in_thread_locked())
200+
return await coro
201+
return await asyncio.shield(run_locked())
203202

204203
async def check_and_advance_blocks(self, raw_blocks):
205204
'''Process the list of raw blocks passed. Detects and handles
@@ -214,7 +213,7 @@ async def check_and_advance_blocks(self, raw_blocks):
214213

215214
if hprevs == chain:
216215
start = time.monotonic()
217-
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
216+
await self.run_with_lock(self.advance_blocks(blocks))
218217
await self._maybe_flush()
219218
if not self.db.first_sync:
220219
s = '' if len(blocks) == 1 else 's'
@@ -256,9 +255,10 @@ async def get_raw_blocks(last_height, hex_hashes):
256255
except FileNotFoundError:
257256
return await self.daemon.raw_blocks(hex_hashes)
258257

259-
def flush_backup():
258+
async def backup_and_flush(raw_blocks):
260259
# self.touched can include other addresses which is
261260
# harmless, but remove None.
261+
await self.backup_blocks(raw_blocks)
262262
self.touched.discard(None)
263263
self.db.flush_backup(self.flush_data(), self.touched)
264264

@@ -267,8 +267,7 @@ def flush_backup():
267267
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
268268
for hex_hashes in chunks(hashes, 50):
269269
raw_blocks = await get_raw_blocks(last, hex_hashes)
270-
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
271-
await self.run_in_thread_with_lock(flush_backup)
270+
await self.run_with_lock(backup_and_flush(raw_blocks))
272271
last -= len(raw_blocks)
273272
await self.prefetcher.reset_height(self.height)
274273
self.backed_up_event.set()
@@ -340,10 +339,10 @@ def flush_data(self):
340339
self.db_deletes, self.tip)
341340

342341
async def flush(self, flush_utxos):
343-
def flush():
342+
async def flush():
344343
self.db.flush_dbs(self.flush_data(), flush_utxos,
345344
self.estimate_txs_remaining)
346-
await self.run_in_thread_with_lock(flush)
345+
await self.run_with_lock(flush())
347346

348347
async def _maybe_flush(self):
349348
# If caught up, flush everything as client queries are
@@ -382,8 +381,8 @@ def check_cache_size(self):
382381
return utxo_MB >= cache_MB * 4 // 5
383382
return None
384383

385-
def advance_blocks(self, blocks):
386-
'''Synchronously advance the blocks.
384+
async def advance_blocks(self, blocks):
385+
'''Advance the blocks.
387386
388387
It is already verified they correctly connect onto our tip.
389388
'''
@@ -400,6 +399,8 @@ def advance_blocks(self, blocks):
400399
self.undo_infos.append((undo_info, height))
401400
self.db.write_raw_block(block.raw, height)
402401

402+
await sleep(0)
403+
403404
headers = [block.header for block in blocks]
404405
self.height = height
405406
self.headers.extend(headers)
@@ -457,7 +458,7 @@ def advance_txs(self, txs, is_unspendable):
457458

458459
return undo_info
459460

460-
def backup_blocks(self, raw_blocks):
461+
async def backup_blocks(self, raw_blocks):
461462
'''Backup the raw blocks and flush.
462463
463464
The blocks should be in order of decreasing height, starting at.
@@ -484,6 +485,8 @@ def backup_blocks(self, raw_blocks):
484485
self.height -= 1
485486
self.db.tx_counts.pop()
486487

488+
await sleep(0)
489+
487490
self.logger.info('backed up to height {:,d}'.format(self.height))
488491

489492
def backup_txs(self, txs, is_unspendable):

0 commit comments

Comments
 (0)