Skip to content

Commit b661590

Browse files
committed
In TransactionQueue, prioritize peer queue transactions by fee per cost.
1 parent 9242a5c commit b661590

File tree

2 files changed

+98
-20
lines changed

2 files changed

+98
-20
lines changed

chia/_tests/core/full_node/test_tx_processing_queue.py

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,32 @@
33
import asyncio
44
import logging
55
import random
6-
from dataclasses import dataclass
6+
from dataclasses import dataclass, field
77
from typing import cast
88

99
import pytest
1010
from chia_rs.sized_bytes import bytes32
11+
from chia_rs.sized_ints import uint64
1112

12-
from chia.full_node.tx_processing_queue import TransactionQueue, TransactionQueueEntry, TransactionQueueFull
13+
from chia.full_node.tx_processing_queue import PeerWithTx, TransactionQueue, TransactionQueueEntry, TransactionQueueFull
1314
from chia.util.task_referencer import create_referenced_task
1415

1516
log = logging.getLogger(__name__)
1617

1718

1819
@dataclass(frozen=True)
1920
class FakeTransactionQueueEntry:
20-
index: int
21-
peer_id: bytes32 | None
21+
index: int = field(compare=False)
22+
peer_id: bytes32 | None = field(compare=False)
23+
peers_with_tx: dict[bytes32, PeerWithTx] | None = field(compare=False)
2224

2325

24-
def get_transaction_queue_entry(peer_id: bytes32 | None, tx_index: int) -> TransactionQueueEntry: # easy shortcut
25-
return cast(TransactionQueueEntry, FakeTransactionQueueEntry(index=tx_index, peer_id=peer_id))
26+
def get_transaction_queue_entry(
27+
peer_id: bytes32 | None, tx_index: int, peers_with_tx: dict[bytes32, PeerWithTx] | None = None
28+
) -> TransactionQueueEntry: # easy shortcut
29+
if peers_with_tx is None:
30+
peers_with_tx = {}
31+
return cast(TransactionQueueEntry, FakeTransactionQueueEntry(tx_index, peer_id, peers_with_tx))
2632

2733

2834
@pytest.mark.anyio
@@ -135,20 +141,79 @@ async def test_queue_cleanup_and_fairness(seeded_random: random.Random) -> None:
135141
peer_b = bytes32.random(seeded_random)
136142
peer_c = bytes32.random(seeded_random)
137143

144+
higher_tx_cost = uint64(20)
145+
lower_tx_cost = uint64(10)
146+
higher_tx_fee = uint64(5)
147+
lower_tx_fee = uint64(1)
138148
# 2 for a, 1 for b, 2 for c
139-
peer_tx_a = [get_transaction_queue_entry(peer_a, i) for i in range(2)]
140-
peer_tx_b = [get_transaction_queue_entry(peer_b, 0)]
141-
peer_tx_c = [get_transaction_queue_entry(peer_c, i) for i in range(2)]
149+
peer_tx_a = [
150+
get_transaction_queue_entry(peer_a, 0, {peer_a: PeerWithTx(str(peer_a), lower_tx_fee, higher_tx_cost)}),
151+
get_transaction_queue_entry(peer_a, 1, {peer_a: PeerWithTx(str(peer_a), higher_tx_fee, lower_tx_cost)}),
152+
]
153+
peer_tx_b = [
154+
get_transaction_queue_entry(peer_b, 0, {peer_b: PeerWithTx(str(peer_b), higher_tx_fee, lower_tx_cost)})
155+
]
156+
peer_tx_c = [
157+
get_transaction_queue_entry(peer_c, 0, {peer_c: PeerWithTx(str(peer_c), higher_tx_fee, lower_tx_cost)}),
158+
get_transaction_queue_entry(peer_c, 1, {peer_c: PeerWithTx(str(peer_c), lower_tx_fee, higher_tx_cost)}),
159+
]
142160

143161
list_txs = peer_tx_a + peer_tx_b + peer_tx_c
144162
for tx in list_txs:
145163
transaction_queue.put(tx, tx.peer_id) # type: ignore[attr-defined]
146164

147-
resulting_ids = []
165+
entries = []
148166
for _ in range(3): # we validate we get one transaction per peer
149-
resulting_ids.append((await transaction_queue.pop()).peer_id) # type: ignore[attr-defined]
150-
assert [peer_a, peer_b, peer_c] == resulting_ids # all peers have been properly included in the queue.
151-
second_resulting_ids = []
167+
entry = await transaction_queue.pop()
168+
entries.append((entry.peer_id, entry.index)) # type: ignore[attr-defined]
169+
assert [(peer_a, 1), (peer_b, 0), (peer_c, 0)] == entries # all peers have been properly included in the queue.
170+
second_entries = []
152171
for _ in range(2): # we validate that we properly queue the last 2 transactions
153-
second_resulting_ids.append((await transaction_queue.pop()).peer_id) # type: ignore[attr-defined]
154-
assert [peer_a, peer_c] == second_resulting_ids
172+
entry = await transaction_queue.pop()
173+
second_entries.append((entry.peer_id, entry.index)) # type: ignore[attr-defined]
174+
assert [(peer_a, 0), (peer_c, 1)] == second_entries
175+
176+
177+
@pytest.mark.anyio
178+
async def test_peer_queue_prioritization_fallback() -> None:
179+
"""
180+
Tests prioritization fallback, when `peer_id` is not in `peers_with_tx` and
181+
we compute the fee per cost (for priority) using values from the peer with
182+
the highest advertised cost, even if that results in a lower fee per cost.
183+
"""
184+
queue = TransactionQueue(42, log)
185+
peer1 = bytes32.random()
186+
peer2 = bytes32.random()
187+
# We'll be using this peer to test the fallback, so we don't include it in
188+
# peers with transactions maps.
189+
peer3 = bytes32.random()
190+
peers_with_tx1 = {
191+
# This has FPC of 5.0
192+
peer1: PeerWithTx(str(peer1), uint64(10), uint64(2)),
193+
# This has FPC of 2.0 but higher advertised cost
194+
peer2: PeerWithTx(str(peer2), uint64(20), uint64(10)),
195+
}
196+
tx1 = get_transaction_queue_entry(peer3, 0, peers_with_tx1)
197+
queue.put(tx1, peer3)
198+
peers_with_tx2 = {
199+
# This has FPC of 3.0
200+
peer1: PeerWithTx(str(peer1), uint64(30), uint64(10)),
201+
# This has FPC of 4.0 but lower advertised cost
202+
peer2: PeerWithTx(str(peer2), uint64(20), uint64(5)),
203+
}
204+
tx2 = get_transaction_queue_entry(peer3, 1, peers_with_tx2)
205+
queue.put(tx2, peer3)
206+
tx3 = get_transaction_queue_entry(peer3, 2, {})
207+
queue.put(tx3, peer3)
208+
# tx2 gets top priority with FPC 3.0 instead of 4.0 due to higher cost fallback
209+
assert queue._queue_dict[peer3].queue[0][0] == -3.0
210+
entry = await queue.pop()
211+
assert entry.index == 1 # type: ignore[attr-defined]
212+
# tx1 comes next with FPC 2.0 instead of 5.0 due to higher cost fallback
213+
assert queue._queue_dict[peer3].queue[0][0] == -2.0
214+
entry = await queue.pop()
215+
assert entry.index == 0 # type: ignore[attr-defined]
216+
# tx3 comes next with infinity priority due to no `peers_with_tx`
217+
assert queue._queue_dict[peer3].queue[0][0] == float("inf")
218+
entry = await queue.pop()
219+
assert entry.index == 2 # type: ignore[attr-defined]

chia/full_node/tx_processing_queue.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import dataclasses
55
import logging
66
from dataclasses import dataclass, field
7-
from queue import SimpleQueue
7+
from queue import PriorityQueue, SimpleQueue
88
from typing import ClassVar, Generic, TypeVar
99

1010
from chia_rs import SpendBundle
@@ -85,7 +85,7 @@ class TransactionQueue:
8585
_list_cursor: int # this is which index
8686
_queue_length: asyncio.Semaphore
8787
_index_to_peer_map: list[bytes32]
88-
_queue_dict: dict[bytes32, SimpleQueue[TransactionQueueEntry]]
88+
_queue_dict: dict[bytes32, PriorityQueue[tuple[float, TransactionQueueEntry]]]
8989
_high_priority_queue: SimpleQueue[TransactionQueueEntry]
9090
peer_size_limit: int
9191
log: logging.Logger
@@ -104,10 +104,23 @@ def put(self, tx: TransactionQueueEntry, peer_id: bytes32 | None, high_priority:
104104
self._high_priority_queue.put(tx)
105105
else:
106106
if peer_id not in self._queue_dict:
107-
self._queue_dict[peer_id] = SimpleQueue()
107+
self._queue_dict[peer_id] = PriorityQueue()
108108
self._index_to_peer_map.append(peer_id)
109109
if self._queue_dict[peer_id].qsize() < self.peer_size_limit:
110-
self._queue_dict[peer_id].put(tx)
110+
tx_info = tx.peers_with_tx.get(peer_id)
111+
if tx_info is not None and tx_info.advertised_cost > 0:
112+
fpc = tx_info.advertised_fee / tx_info.advertised_cost
113+
# PriorityQueue returns lowest first so we invert
114+
priority = -fpc
115+
else:
116+
# Compute the fee per cost using values from the peer with
117+
# the highest advertised cost.
118+
priority = float("inf")
119+
tx_info = max(tx.peers_with_tx.values(), key=lambda p: p.advertised_cost, default=None)
120+
if tx_info is not None and tx_info.advertised_cost > 0:
121+
fpc = tx_info.advertised_fee / tx_info.advertised_cost
122+
priority = -fpc
123+
self._queue_dict[peer_id].put((priority, tx))
111124
else:
112125
self.log.warning(f"Transaction queue full for peer {peer_id}")
113126
raise TransactionQueueFull(f"Transaction queue full for peer {peer_id}")
@@ -121,7 +134,7 @@ async def pop(self) -> TransactionQueueEntry:
121134
while True:
122135
peer_queue = self._queue_dict[self._index_to_peer_map[self._list_cursor]]
123136
if not peer_queue.empty():
124-
result = peer_queue.get()
137+
_, result = peer_queue.get()
125138
self._list_cursor += 1
126139
if self._list_cursor > len(self._index_to_peer_map) - 1:
127140
# reset iterator

0 commit comments

Comments
 (0)