Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions chia/_tests/core/full_node/test_tx_processing_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def test_local_txs(seeded_random: random.Random) -> None:
transaction_queue = TransactionQueue(1000, log)
# test 1 tx
first_tx = get_transaction_queue_entry(None, 0)
await transaction_queue.put(first_tx, None)
transaction_queue.put(first_tx, None)

result1 = await transaction_queue.pop()

Expand All @@ -40,7 +40,7 @@ async def test_local_txs(seeded_random: random.Random) -> None:
num_txs = 2000
list_txs = [get_transaction_queue_entry(bytes32.random(seeded_random), i) for i in range(num_txs)]
for tx in list_txs:
await transaction_queue.put(tx, None)
transaction_queue.put(tx, None)

resulting_txs = []
for _ in range(num_txs):
Expand All @@ -58,12 +58,12 @@ async def test_one_peer_and_await(seeded_random: random.Random) -> None:

list_txs = [get_transaction_queue_entry(peer_id, i) for i in range(num_txs)]
for tx in list_txs:
await transaction_queue.put(tx, peer_id)
transaction_queue.put(tx, peer_id)

# test transaction priority
local_txs = [get_transaction_queue_entry(None, i) for i in range(int(num_txs / 5))] # 20 txs
for tx in local_txs:
await transaction_queue.put(tx, None)
transaction_queue.put(tx, None)

resulting_txs = []
for _ in range(num_txs + len(local_txs)):
Expand All @@ -80,7 +80,7 @@ async def test_one_peer_and_await(seeded_random: random.Random) -> None:
with pytest.raises(asyncio.InvalidStateError): # task is not done, so we expect an error when getting result
task.result()
# add a tx to test task completion
await transaction_queue.put(get_transaction_queue_entry(None, 0), None)
transaction_queue.put(get_transaction_queue_entry(None, 0), None)
await asyncio.wait_for(task, 1) # we should never time out here


Expand All @@ -95,7 +95,7 @@ async def test_lots_of_peers(seeded_random: random.Random) -> None:
# 100 txs per peer
list_txs = [get_transaction_queue_entry(peer_id, i) for peer_id in peer_ids for i in range(num_txs)]
for tx in list_txs:
await transaction_queue.put(tx, tx.peer_id) # type: ignore[attr-defined]
transaction_queue.put(tx, tx.peer_id) # type: ignore[attr-defined]

resulting_txs = []
for _ in range(total_txs):
Expand All @@ -117,11 +117,11 @@ async def test_full_queue(seeded_random: random.Random) -> None:
# 999 txs per peer then 1 to fail later
list_txs = [get_transaction_queue_entry(peer_id, i) for peer_id in peer_ids for i in range(num_txs)]
for tx in list_txs:
await transaction_queue.put(tx, tx.peer_id) # type: ignore[attr-defined]
transaction_queue.put(tx, tx.peer_id) # type: ignore[attr-defined]

# test failure case.
with pytest.raises(TransactionQueueFull):
await transaction_queue.put(get_transaction_queue_entry(peer_ids[0], 1001), peer_ids[0])
transaction_queue.put(get_transaction_queue_entry(peer_ids[0], 1001), peer_ids[0])

resulting_txs = []
for _ in range(total_txs):
Expand All @@ -142,7 +142,7 @@ async def test_queue_cleanup_and_fairness(seeded_random: random.Random) -> None:

list_txs = peer_tx_a + peer_tx_b + peer_tx_c
for tx in list_txs:
await transaction_queue.put(tx, tx.peer_id) # type: ignore[attr-defined]
transaction_queue.put(tx, tx.peer_id) # type: ignore[attr-defined]

resulting_ids = []
for _ in range(3): # we validate we get one transaction per peer
Expand Down
4 changes: 2 additions & 2 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async def respond_transaction(

# TODO: Use fee in priority calculation, to prioritize high fee TXs
try:
await self.full_node.transaction_queue.put(
self.full_node.transaction_queue.put(
TransactionQueueEntry(tx.transaction, tx_bytes, spend_name, peer, test, peers_with_tx),
peer.peer_node_id,
)
Expand Down Expand Up @@ -1434,7 +1434,7 @@ async def send_transaction(self, request: wallet_protocol.SendTransaction, *, te
return make_msg(ProtocolMessageTypes.transaction_ack, response)

queue_entry = TransactionQueueEntry(request.transaction, None, spend_name, None, test)
await self.full_node.transaction_queue.put(queue_entry, peer_id=None, high_priority=True)
self.full_node.transaction_queue.put(queue_entry, peer_id=None, high_priority=True)
try:
with anyio.fail_after(delay=45):
status, error = await queue_entry.done.wait()
Expand Down
2 changes: 1 addition & 1 deletion chia/full_node/tx_processing_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(self, peer_size_limit: int, log: logging.Logger) -> None:
self.peer_size_limit = peer_size_limit
self.log = log

async def put(self, tx: TransactionQueueEntry, peer_id: bytes32 | None, high_priority: bool = False) -> None:
def put(self, tx: TransactionQueueEntry, peer_id: bytes32 | None, high_priority: bool = False) -> None:
if peer_id is None or high_priority: # when it's local there is no peer_id.
self._high_priority_queue.put(tx)
else:
Expand Down
Loading