Fix pending transaction issue, and use a different lock to prevent ho… (#5549)
* Fix pending transaction issue, and use a different lock to prevent holding blockchain lock * Force re-run of tests
This commit is contained in:
parent
5ce56bce6d
commit
ac59ec52c6
|
@ -1606,44 +1606,45 @@ class FullNode:
|
|||
if self.sync_store.get_sync_mode():
|
||||
status = MempoolInclusionStatus.FAILED
|
||||
error: Optional[Err] = Err.NO_TRANSACTIONS_WHILE_SYNCING
|
||||
self.mempool_manager.remove_seen(spend_name)
|
||||
else:
|
||||
try:
|
||||
cost_result = await self.mempool_manager.pre_validate_spendbundle(transaction)
|
||||
except Exception as e:
|
||||
self.mempool_manager.remove_seen(spend_name)
|
||||
raise e
|
||||
async with self.blockchain.lock:
|
||||
async with self.mempool_manager.lock:
|
||||
if self.mempool_manager.get_spendbundle(spend_name) is not None:
|
||||
self.mempool_manager.remove_seen(spend_name)
|
||||
return MempoolInclusionStatus.FAILED, Err.ALREADY_INCLUDING_TRANSACTION
|
||||
cost, status, error = await self.mempool_manager.add_spendbundle(transaction, cost_result, spend_name)
|
||||
if status == MempoolInclusionStatus.SUCCESS:
|
||||
self.log.debug(
|
||||
f"Added transaction to mempool: {spend_name} mempool size: "
|
||||
f"{self.mempool_manager.mempool.total_mempool_cost}"
|
||||
)
|
||||
# Only broadcast successful transactions, not pending ones. Otherwise it's a DOS
|
||||
# vector.
|
||||
mempool_item = self.mempool_manager.get_mempool_item(spend_name)
|
||||
assert mempool_item is not None
|
||||
fees = mempool_item.fee
|
||||
assert fees >= 0
|
||||
assert cost is not None
|
||||
new_tx = full_node_protocol.NewTransaction(
|
||||
spend_name,
|
||||
cost,
|
||||
fees,
|
||||
)
|
||||
msg = make_msg(ProtocolMessageTypes.new_transaction, new_tx)
|
||||
if peer is None:
|
||||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||||
else:
|
||||
await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id)
|
||||
if status == MempoolInclusionStatus.SUCCESS:
|
||||
self.log.debug(
|
||||
f"Added transaction to mempool: {spend_name} mempool size: "
|
||||
f"{self.mempool_manager.mempool.total_mempool_cost}"
|
||||
)
|
||||
# Only broadcast successful transactions, not pending ones. Otherwise it's a DOS
|
||||
# vector.
|
||||
mempool_item = self.mempool_manager.get_mempool_item(spend_name)
|
||||
assert mempool_item is not None
|
||||
fees = mempool_item.fee
|
||||
assert fees >= 0
|
||||
assert cost is not None
|
||||
new_tx = full_node_protocol.NewTransaction(
|
||||
spend_name,
|
||||
cost,
|
||||
fees,
|
||||
)
|
||||
msg = make_msg(ProtocolMessageTypes.new_transaction, new_tx)
|
||||
if peer is None:
|
||||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||||
else:
|
||||
self.mempool_manager.remove_seen(spend_name)
|
||||
self.log.debug(
|
||||
f"Wasn't able to add transaction with id {spend_name}, " f"status {status} error: {error}"
|
||||
)
|
||||
await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id)
|
||||
else:
|
||||
self.mempool_manager.remove_seen(spend_name)
|
||||
self.log.debug(
|
||||
f"Wasn't able to add transaction with id {spend_name}, " f"status {status} error: {error}"
|
||||
)
|
||||
return status, error
|
||||
|
||||
async def _needs_compact_proof(
|
||||
|
@ -1881,7 +1882,7 @@ class FullNode:
|
|||
if max_height is None:
|
||||
await asyncio.sleep(30)
|
||||
continue
|
||||
# Calculate 'min_height' correctly the first time this task is launched, using the db.
|
||||
# Calculate 'min_height' correctly the first time this task is launched, using the db
|
||||
assert min_height is not None
|
||||
min_height = await self.block_store.get_first_not_compactified(min_height)
|
||||
if min_height is None or min_height > max(0, max_height - 1000):
|
||||
|
|
|
@ -71,6 +71,7 @@ class MempoolManager:
|
|||
# The mempool will correspond to a certain peak
|
||||
self.peak: Optional[BlockRecord] = None
|
||||
self.mempool: Mempool = Mempool(self.mempool_max_total_cost)
|
||||
self.lock: asyncio.Lock = asyncio.Lock()
|
||||
|
||||
def shut_down(self):
|
||||
self.pool.shutdown(wait=True)
|
||||
|
@ -506,27 +507,28 @@ class MempoolManager:
|
|||
self.peak = new_peak
|
||||
|
||||
old_pool = self.mempool
|
||||
self.mempool = Mempool(self.mempool_max_total_cost)
|
||||
async with self.lock:
|
||||
self.mempool = Mempool(self.mempool_max_total_cost)
|
||||
|
||||
for item in old_pool.spends.values():
|
||||
_, result, _ = await self.add_spendbundle(
|
||||
item.spend_bundle, item.npc_result, item.spend_bundle_name, False, item.program
|
||||
)
|
||||
# If the spend bundle was confirmed or conflicting (can no longer be in mempool), it won't be successfully
|
||||
# added to the new mempool. In this case, remove it from seen, so in the case of a reorg, it can be
|
||||
# resubmitted
|
||||
if result != MempoolInclusionStatus.SUCCESS:
|
||||
self.remove_seen(item.spend_bundle_name)
|
||||
for item in old_pool.spends.values():
|
||||
_, result, _ = await self.add_spendbundle(
|
||||
item.spend_bundle, item.npc_result, item.spend_bundle_name, False, item.program
|
||||
)
|
||||
# If the spend bundle was confirmed or conflicting (can no longer be in mempool), it won't be
|
||||
# successfully added to the new mempool. In this case, remove it from seen, so in the case of a reorg,
|
||||
# it can be resubmitted
|
||||
if result != MempoolInclusionStatus.SUCCESS:
|
||||
self.remove_seen(item.spend_bundle_name)
|
||||
|
||||
potential_txs_copy = self.potential_txs.copy()
|
||||
self.potential_txs = {}
|
||||
txs_added = []
|
||||
for item in potential_txs_copy.values():
|
||||
cost, status, error = await self.add_spendbundle(
|
||||
item.spend_bundle, item.npc_result, item.spend_bundle_name, program=item.program
|
||||
)
|
||||
if status == MempoolInclusionStatus.SUCCESS:
|
||||
txs_added.append((item.spend_bundle, item.npc_result, item.spend_bundle_name))
|
||||
potential_txs_copy = self.potential_txs.copy()
|
||||
self.potential_txs = {}
|
||||
txs_added = []
|
||||
for item in potential_txs_copy.values():
|
||||
cost, status, error = await self.add_spendbundle(
|
||||
item.spend_bundle, item.npc_result, item.spend_bundle_name, program=item.program
|
||||
)
|
||||
if status == MempoolInclusionStatus.SUCCESS:
|
||||
txs_added.append((item.spend_bundle, item.npc_result, item.spend_bundle_name))
|
||||
log.info(
|
||||
f"Size of mempool: {len(self.mempool.spends)} spends, cost: {self.mempool.total_mempool_cost} "
|
||||
f"minimum fee to get in: {self.mempool.get_min_fee_rate(100000)}"
|
||||
|
|
Loading…
Reference in New Issue