Reduce the use of locks with synchronous store methods

This commit is contained in:
Mariano Sorgente 2020-01-28 17:14:50 +09:00
parent 194285f6d4
commit 6f45a4d515
No known key found for this signature in database
GPG Key ID: 0F866338C369278C
5 changed files with 212 additions and 239 deletions

View File

@ -70,7 +70,6 @@ class ClassGroup(tuple):
for x in [r[0], r[1]]]) for x in [r[0], r[1]]])
def __eq__(self, other): def __eq__(self, other):
print("other", other)
return tuple(self.reduced()) == tuple(ClassGroup((other[0], other[1], other[2])).reduced()) return tuple(self.reduced()) == tuple(ClassGroup((other[0], other[1], other[2])).reduced())
def __ne__(self, other): def __ne__(self, other):

View File

@ -101,27 +101,24 @@ class FullNode:
""" """
challenge_requests: List[timelord_protocol.ChallengeStart] = [] challenge_requests: List[timelord_protocol.ChallengeStart] = []
pos_info_requests: List[timelord_protocol.ProofOfSpaceInfo] = [] pos_info_requests: List[timelord_protocol.ProofOfSpaceInfo] = []
async with self.store.lock: tips: List[SmallHeaderBlock] = self.blockchain.get_current_tips()
tips: List[SmallHeaderBlock] = self.blockchain.get_current_tips() for tip in tips:
for tip in tips: assert tip.challenge
assert tip.challenge challenge_hash = tip.challenge.get_hash()
challenge_hash = tip.challenge.get_hash() challenge_requests.append(
challenge_requests.append( timelord_protocol.ChallengeStart(
timelord_protocol.ChallengeStart( challenge_hash, tip.challenge.total_weight
challenge_hash, tip.challenge.total_weight
)
) )
)
tip_hashes = [tip.header_hash for tip in tips] tip_hashes = [tip.header_hash for tip in tips]
tip_infos = [ tip_infos = [
tup[0] tup[0]
for tup in list((await self.store.get_unfinished_blocks()).items()) for tup in list((self.store.get_unfinished_blocks()).items())
if tup[1].prev_header_hash in tip_hashes if tup[1].prev_header_hash in tip_hashes
] ]
for chall, iters in tip_infos: for chall, iters in tip_infos:
pos_info_requests.append( pos_info_requests.append(timelord_protocol.ProofOfSpaceInfo(chall, iters))
timelord_protocol.ProofOfSpaceInfo(chall, iters)
)
for challenge_msg in challenge_requests: for challenge_msg in challenge_requests:
yield OutboundMessage( yield OutboundMessage(
NodeType.TIMELORD, Message("challenge_start", challenge_msg), delivery NodeType.TIMELORD, Message("challenge_start", challenge_msg), delivery
@ -214,30 +211,24 @@ class FullNode:
# Based on responses from peers about the current heads, see which head is the heaviest # Based on responses from peers about the current heads, see which head is the heaviest
# (similar to longest chain rule). # (similar to longest chain rule).
async with self.store.lock: potential_tips: List[
potential_tips: List[ Tuple[bytes32, FullBlock]
Tuple[bytes32, FullBlock] ] = self.store.get_potential_tips_tuples()
] = await self.store.get_potential_tips_tuples() log.info(f"Have collected {len(potential_tips)} potential tips")
log.info(f"Have collected {len(potential_tips)} potential tips") for header_hash, potential_tip_block in potential_tips:
for header_hash, potential_tip_block in potential_tips: if potential_tip_block.header_block.challenge is None:
if potential_tip_block.header_block.challenge is None: raise ValueError(
raise ValueError( f"Invalid tip block {potential_tip_block.header_hash} received"
f"Invalid tip block {potential_tip_block.header_hash} received" )
) if potential_tip_block.header_block.challenge.total_weight > highest_weight:
if ( highest_weight = potential_tip_block.header_block.challenge.total_weight
potential_tip_block.header_block.challenge.total_weight tip_block = potential_tip_block
> highest_weight tip_height = potential_tip_block.header_block.challenge.height
): if highest_weight <= max(
highest_weight = ( [t.weight for t in self.blockchain.get_current_tips()]
potential_tip_block.header_block.challenge.total_weight ):
) log.info("Not performing sync, already caught up.")
tip_block = potential_tip_block return
tip_height = potential_tip_block.header_block.challenge.height
if highest_weight <= max(
[t.weight for t in self.blockchain.get_current_tips()]
):
log.info("Not performing sync, already caught up.")
return
assert tip_block assert tip_block
log.info(f"Tip block {tip_block.header_hash} tip height {tip_block.height}") log.info(f"Tip block {tip_block.header_hash} tip height {tip_block.height}")
@ -275,10 +266,9 @@ class FullNode:
log.warning("Did not receive desired header hashes") log.warning("Did not receive desired header hashes")
# Finding the fork point allows us to only download headers and blocks from the fork point # Finding the fork point allows us to only download headers and blocks from the fork point
async with self.store.lock: header_hashes = self.store.get_potential_hashes()
header_hashes = self.store.get_potential_hashes() fork_point_height: uint32 = self.blockchain.find_fork_point(header_hashes)
fork_point_height: uint32 = self.blockchain.find_fork_point(header_hashes) fork_point_hash: bytes32 = header_hashes[fork_point_height]
fork_point_hash: bytes32 = header_hashes[fork_point_height]
log.info(f"Fork point: {fork_point_hash} at height {fork_point_height}") log.info(f"Fork point: {fork_point_hash} at height {fork_point_height}")
# Now, we download all of the headers in order to verify the weight, in batches # Now, we download all of the headers in order to verify the weight, in batches
@ -367,11 +357,10 @@ class FullNode:
total_time_slept += sleep_interval total_time_slept += sleep_interval
log.info(f"Did not receive desired header blocks") log.info(f"Did not receive desired header blocks")
async with self.store.lock: for h in range(fork_point_height + 1, tip_height + 1):
for h in range(fork_point_height + 1, tip_height + 1): header = self.store.get_potential_header(uint32(h))
header = self.store.get_potential_header(uint32(h)) assert header is not None
assert header is not None headers.append(header)
headers.append(header)
log.info(f"Downloaded headers up to tip height: {tip_height}") log.info(f"Downloaded headers up to tip height: {tip_height}")
if not verify_weight( if not verify_weight(
@ -477,7 +466,6 @@ class FullNode:
# Verifies this batch, which we are guaranteed to have (since we broke from the above loop) # Verifies this batch, which we are guaranteed to have (since we broke from the above loop)
blocks = [] blocks = []
# async with self.store.lock:
for height in range(height_checkpoint, end_height): for height in range(height_checkpoint, end_height):
b: Optional[FullBlock] = await self.store.get_potential_block( b: Optional[FullBlock] = await self.store.get_potential_block(
uint32(height) uint32(height)
@ -496,9 +484,9 @@ class FullNode:
) )
assert block is not None assert block is not None
prev_block: Optional[ prev_block: Optional[FullBlock] = await self.store.get_potential_block(
FullBlock uint32(height - 1)
] = await self.store.get_potential_block(uint32(height - 1)) )
if prev_block is None: if prev_block is None:
prev_block = await self.store.get_block(block.prev_header_hash) prev_block = await self.store.get_block(block.prev_header_hash)
assert prev_block is not None assert prev_block is not None
@ -524,7 +512,7 @@ class FullNode:
max([h.height for h in self.blockchain.get_current_tips()]) max([h.height for h in self.blockchain.get_current_tips()])
>= height >= height
) )
await self.store.set_proof_of_time_estimate_ips( self.store.set_proof_of_time_estimate_ips(
self.blockchain.get_next_ips(block.header_block) self.blockchain.get_next_ips(block.header_block)
) )
log.info( log.info(
@ -542,11 +530,10 @@ class FullNode:
Finalize sync by setting sync mode to False, clearing all sync information, and adding any final Finalize sync by setting sync mode to False, clearing all sync information, and adding any final
blocks that we have finalized recently. blocks that we have finalized recently.
""" """
potential_fut_blocks = (self.store.get_potential_future_blocks()).copy()
self.store.set_sync_mode(False)
async with self.store.lock: async with self.store.lock:
potential_fut_blocks = (
await self.store.get_potential_future_blocks()
).copy()
await self.store.set_sync_mode(False)
await self.store.clear_sync_info() await self.store.clear_sync_info()
for block in potential_fut_blocks: for block in potential_fut_blocks:
@ -579,11 +566,10 @@ class FullNode:
self, all_header_hashes: peer_protocol.AllHeaderHashes self, all_header_hashes: peer_protocol.AllHeaderHashes
) -> OutboundMessageGenerator: ) -> OutboundMessageGenerator:
assert len(all_header_hashes.header_hashes) > 0 assert len(all_header_hashes.header_hashes) > 0
async with self.store.lock: self.store.set_potential_hashes(all_header_hashes.header_hashes)
self.store.set_potential_hashes(all_header_hashes.header_hashes) phr = self.store.get_potential_hashes_received()
phr = self.store.get_potential_hashes_received() assert phr is not None
assert phr is not None phr.set()
phr.set()
for _ in []: # Yields nothing for _ in []: # Yields nothing
yield _ yield _
@ -632,10 +618,9 @@ class FullNode:
log.info( log.info(
f"Received header blocks {request.header_blocks[0].height, request.header_blocks[-1].height}." f"Received header blocks {request.header_blocks[0].height, request.header_blocks[-1].height}."
) )
async with self.store.lock: for header_block in request.header_blocks:
for header_block in request.header_blocks: self.store.add_potential_header(header_block)
self.store.add_potential_header(header_block) (self.store.get_potential_headers_received(header_block.height)).set()
(self.store.get_potential_headers_received(header_block.height)).set()
for _ in []: # Yields nothing for _ in []: # Yields nothing
yield _ yield _
@ -668,9 +653,7 @@ class FullNode:
HeaderBlock HeaderBlock
] = await self.store.get_header_blocks_by_hash(header_hashes) ] = await self.store.get_header_blocks_by_hash(header_hashes)
for header_block in header_blocks: for header_block in header_blocks:
fetched = await self.store.get_block( fetched = await self.store.get_block(header_block.header.get_hash())
header_block.header.get_hash()
)
assert fetched assert fetched
blocks.append(fetched) blocks.append(fetched)
except KeyError: except KeyError:
@ -696,7 +679,7 @@ class FullNode:
We have received the blocks that we needed for syncing. Add them to processing queue. We have received the blocks that we needed for syncing. Add them to processing queue.
""" """
log.info(f"Received sync blocks {[b.height for b in request.blocks]}") log.info(f"Received sync blocks {[b.height for b in request.blocks]}")
if not await self.store.get_sync_mode(): if not self.store.get_sync_mode():
log.warning("Receiving sync blocks when we are not in sync mode.") log.warning("Receiving sync blocks when we are not in sync mode.")
return return
@ -780,7 +763,7 @@ class FullNode:
block_header_data_hash: bytes32 = block_header_data.get_hash() block_header_data_hash: bytes32 = block_header_data.get_hash()
# self.stores this block so we can submit it to the blockchain after it's signed by harvester # self.stores this block so we can submit it to the blockchain after it's signed by harvester
await self.store.add_candidate_block( self.store.add_candidate_block(
proof_of_space_hash, body, block_header_data, request.proof_of_space proof_of_space_hash, body, block_header_data, request.proof_of_space
) )
@ -800,25 +783,22 @@ class FullNode:
block, which only needs a Proof of Time to be finished. If the signature is valid, block, which only needs a Proof of Time to be finished. If the signature is valid,
we call the unfinished_block routine. we call the unfinished_block routine.
""" """
async with self.store.lock: candidate: Optional[
candidate: Optional[ Tuple[Body, HeaderData, ProofOfSpace]
Tuple[Body, HeaderData, ProofOfSpace] ] = self.store.get_candidate_block(header_signature.pos_hash)
] = await self.store.get_candidate_block(header_signature.pos_hash) if candidate is None:
if candidate is None: log.warning(f"PoS hash {header_signature.pos_hash} not found in database")
log.warning( return
f"PoS hash {header_signature.pos_hash} not found in database" # Verifies that we have the correct header and body self.stored
) block_body, block_header_data, pos = candidate
return
# Verifies that we have the correct header and body self.stored
block_body, block_header_data, pos = candidate
assert block_header_data.get_hash() == header_signature.header_hash assert block_header_data.get_hash() == header_signature.header_hash
block_header: Header = Header( block_header: Header = Header(
block_header_data, header_signature.header_signature block_header_data, header_signature.header_signature
) )
header: HeaderBlock = HeaderBlock(pos, None, None, block_header) header: HeaderBlock = HeaderBlock(pos, None, None, block_header)
unfinished_block_obj: FullBlock = FullBlock(header, block_body) unfinished_block_obj: FullBlock = FullBlock(header, block_body)
# Propagate to ourselves (which validates and does further propagations) # Propagate to ourselves (which validates and does further propagations)
request = peer_protocol.UnfinishedBlock(unfinished_block_obj) request = peer_protocol.UnfinishedBlock(unfinished_block_obj)
@ -840,9 +820,9 @@ class FullNode:
request.proof.number_of_iterations, request.proof.number_of_iterations,
) )
unfinished_block_obj: Optional[ unfinished_block_obj: Optional[FullBlock] = self.store.get_unfinished_block(
FullBlock dict_key
] = await self.store.get_unfinished_block(dict_key) )
if not unfinished_block_obj: if not unfinished_block_obj:
log.warning( log.warning(
f"Received a proof of time that we cannot use to complete a block {dict_key}" f"Received a proof of time that we cannot use to complete a block {dict_key}"
@ -879,12 +859,8 @@ class FullNode:
new_header_block, unfinished_block_obj.body new_header_block, unfinished_block_obj.body
) )
async with self.store.lock: if self.store.get_sync_mode():
sync_mode = await self.store.get_sync_mode() self.store.add_potential_future_block(new_full_block)
if sync_mode:
async with self.store.lock:
await self.store.add_potential_future_block(new_full_block)
else: else:
async for msg in self.block(peer_protocol.Block(new_full_block)): async for msg in self.block(peer_protocol.Block(new_full_block)):
yield msg yield msg
@ -900,17 +876,17 @@ class FullNode:
""" """
finish_block: bool = False finish_block: bool = False
propagate_proof: bool = False propagate_proof: bool = False
async with self.store.lock: if self.store.get_unfinished_block(
if await self.store.get_unfinished_block( (
( new_proof_of_time.proof.challenge_hash,
new_proof_of_time.proof.challenge_hash, new_proof_of_time.proof.number_of_iterations,
new_proof_of_time.proof.number_of_iterations, )
) ):
):
finish_block = True
elif new_proof_of_time.proof.is_valid(constants["DISCRIMINANT_SIZE_BITS"]):
propagate_proof = True
finish_block = True
elif new_proof_of_time.proof.is_valid(constants["DISCRIMINANT_SIZE_BITS"]):
propagate_proof = True
if finish_block: if finish_block:
request = timelord_protocol.ProofOfTimeFinished(new_proof_of_time.proof) request = timelord_protocol.ProofOfTimeFinished(new_proof_of_time.proof)
async for msg in self.proof_of_time_finished(request): async for msg in self.proof_of_time_finished(request):
@ -965,13 +941,13 @@ class FullNode:
) )
if ( if (
await self.store.get_unfinished_block((challenge_hash, iterations_needed)) self.store.get_unfinished_block((challenge_hash, iterations_needed))
is not None is not None
): ):
return return
expected_time: uint64 = uint64( expected_time: uint64 = uint64(
int(iterations_needed / (await self.store.get_proof_of_time_estimate_ips())) int(iterations_needed / (self.store.get_proof_of_time_estimate_ips()))
) )
if expected_time > constants["PROPAGATION_DELAY_THRESHOLD"]: if expected_time > constants["PROPAGATION_DELAY_THRESHOLD"]:
@ -979,37 +955,36 @@ class FullNode:
# If this block is slow, sleep to allow faster blocks to come out first # If this block is slow, sleep to allow faster blocks to come out first
await asyncio.sleep(5) await asyncio.sleep(5)
async with self.store.lock: leader: Tuple[uint32, uint64] = self.store.get_unfinished_block_leader()
leader: Tuple[uint32, uint64] = self.store.get_unfinished_block_leader() if leader is None or unfinished_block.block.height > leader[0]:
if leader is None or unfinished_block.block.height > leader[0]: log.info(
log.info( f"This is the first unfinished block at height {unfinished_block.block.height}, so propagate."
f"This is the first unfinished block at height {unfinished_block.block.height}, so propagate."
)
# If this is the first block we see at this height, propagate
self.store.set_unfinished_block_leader(
(unfinished_block.block.height, expected_time)
)
elif unfinished_block.block.height == leader[0]:
if expected_time > leader[1] + constants["PROPAGATION_THRESHOLD"]:
# If VDF is expected to finish X seconds later than the best, don't propagate
log.info(
f"VDF will finish too late {expected_time} seconds, so don't propagate"
)
return
elif expected_time < leader[1]:
log.info(
f"New best unfinished block at height {unfinished_block.block.height}"
)
# If this will be the first block to finalize, update our leader
self.store.set_unfinished_block_leader((leader[0], expected_time))
else:
# If we have seen an unfinished block at a greater or equal height, don't propagate
log.info(f"Unfinished block at old height, so don't propagate")
return
await self.store.add_unfinished_block(
(challenge_hash, iterations_needed), unfinished_block.block
) )
# If this is the first block we see at this height, propagate
self.store.set_unfinished_block_leader(
(unfinished_block.block.height, expected_time)
)
elif unfinished_block.block.height == leader[0]:
if expected_time > leader[1] + constants["PROPAGATION_THRESHOLD"]:
# If VDF is expected to finish X seconds later than the best, don't propagate
log.info(
f"VDF will finish too late {expected_time} seconds, so don't propagate"
)
return
elif expected_time < leader[1]:
log.info(
f"New best unfinished block at height {unfinished_block.block.height}"
)
# If this will be the first block to finalize, update our leader
self.store.set_unfinished_block_leader((leader[0], expected_time))
else:
# If we have seen an unfinished block at a greater or equal height, don't propagate
log.info(f"Unfinished block at old height, so don't propagate")
return
self.store.add_unfinished_block(
(challenge_hash, iterations_needed), unfinished_block.block
)
timelord_request = timelord_protocol.ProofOfSpaceInfo( timelord_request = timelord_protocol.ProofOfSpaceInfo(
challenge_hash, iterations_needed challenge_hash, iterations_needed
@ -1037,15 +1012,15 @@ class FullNode:
if self.blockchain.cointains_block(header_hash): if self.blockchain.cointains_block(header_hash):
return return
if self.store.get_sync_mode():
# Add the block to our potential tips list
self.store.add_potential_tip(block.block)
return
prevalidate_block = await self.blockchain.pre_validate_blocks([block.block])
val, pos = prevalidate_block[0]
async with self.store.lock: async with self.store.lock:
if await self.store.get_sync_mode():
# Add the block to our potential tips list
await self.store.add_potential_tip(block.block)
return
prevalidate_block = await self.blockchain.pre_validate_blocks([block.block])
val, pos = prevalidate_block[0]
prev_block: Optional[FullBlock] = await self.store.get_block( prev_block: Optional[FullBlock] = await self.store.get_block(
block.block.prev_header_hash block.block.prev_header_hash
) )
@ -1065,6 +1040,7 @@ class FullNode:
or added == ReceiveBlockResult.ADDED_TO_HEAD or added == ReceiveBlockResult.ADDED_TO_HEAD
): ):
await self.store.add_block(block.block) await self.store.add_block(block.block)
if added == ReceiveBlockResult.ALREADY_HAVE_BLOCK: if added == ReceiveBlockResult.ALREADY_HAVE_BLOCK:
return return
elif added == ReceiveBlockResult.INVALID_BLOCK: elif added == ReceiveBlockResult.INVALID_BLOCK:
@ -1074,21 +1050,20 @@ class FullNode:
return return
elif added == ReceiveBlockResult.DISCONNECTED_BLOCK: elif added == ReceiveBlockResult.DISCONNECTED_BLOCK:
log.warning(f"Disconnected block {header_hash}") log.warning(f"Disconnected block {header_hash}")
async with self.store.lock: tip_height = min(
tip_height = min( [head.height for head in self.blockchain.get_current_tips()]
[head.height for head in self.blockchain.get_current_tips()] )
)
if ( if (
block.block.height block.block.height
> tip_height + self.config["sync_blocks_behind_threshold"] > tip_height + self.config["sync_blocks_behind_threshold"]
): ):
async with self.store.lock: async with self.store.lock:
if await self.store.get_sync_mode(): if self.store.get_sync_mode():
return return
await self.store.clear_sync_info() await self.store.clear_sync_info()
await self.store.add_potential_tip(block.block) self.store.add_potential_tip(block.block)
await self.store.set_sync_mode(True) self.store.set_sync_mode(True)
log.info( log.info(
f"We are too far behind this block. Our height is {tip_height} and block is at " f"We are too far behind this block. Our height is {tip_height} and block is at "
f"{block.block.height}" f"{block.block.height}"
@ -1113,27 +1088,22 @@ class FullNode:
"request_block", "request_block",
peer_protocol.RequestBlock(block.block.prev_header_hash), peer_protocol.RequestBlock(block.block.prev_header_hash),
) )
async with self.store.lock: self.store.add_disconnected_block(block.block)
await self.store.add_disconnected_block(block.block)
yield OutboundMessage(NodeType.FULL_NODE, msg, Delivery.RESPOND) yield OutboundMessage(NodeType.FULL_NODE, msg, Delivery.RESPOND)
return return
elif added == ReceiveBlockResult.ADDED_TO_HEAD: elif added == ReceiveBlockResult.ADDED_TO_HEAD:
# Only propagate blocks which extend the blockchain (becomes one of the heads) # Only propagate blocks which extend the blockchain (becomes one of the heads)
ips_changed: bool = False log.info(
async with self.store.lock: f"Updated heads, new heights: {[b.height for b in self.blockchain.get_current_tips()]}"
log.info( )
f"Updated heads, new heights: {[b.height for b in self.blockchain.get_current_tips()]}"
)
difficulty = self.blockchain.get_next_difficulty( difficulty = self.blockchain.get_next_difficulty(
block.block.prev_header_hash block.block.prev_header_hash
) )
next_vdf_ips = self.blockchain.get_next_ips(block.block.header_block) next_vdf_ips = self.blockchain.get_next_ips(block.block.header_block)
log.info(f"Difficulty {difficulty} IPS {next_vdf_ips}") log.info(f"Difficulty {difficulty} IPS {next_vdf_ips}")
if next_vdf_ips != await self.store.get_proof_of_time_estimate_ips(): if next_vdf_ips != self.store.get_proof_of_time_estimate_ips():
await self.store.set_proof_of_time_estimate_ips(next_vdf_ips) self.store.set_proof_of_time_estimate_ips(next_vdf_ips)
ips_changed = True
if ips_changed:
rate_update = farmer_protocol.ProofOfTimeRate(next_vdf_ips) rate_update = farmer_protocol.ProofOfTimeRate(next_vdf_ips)
log.info(f"Sending proof of time rate {next_vdf_ips}") log.info(f"Sending proof of time rate {next_vdf_ips}")
yield OutboundMessage( yield OutboundMessage(
@ -1193,21 +1163,20 @@ class FullNode:
# Recursively process the next block if we have it # Recursively process the next block if we have it
# This code path is reached if added == ADDED_AS_ORPHAN or ADDED_TO_HEAD # This code path is reached if added == ADDED_AS_ORPHAN or ADDED_TO_HEAD
async with self.store.lock: next_block: Optional[FullBlock] = self.store.get_disconnected_block_by_prev(
next_block: Optional[ block.block.header_hash
FullBlock )
] = await self.store.get_disconnected_block_by_prev(block.block.header_hash)
if next_block is not None: if next_block is not None:
async for ret_msg in self.block(peer_protocol.Block(next_block)): async for ret_msg in self.block(peer_protocol.Block(next_block)):
yield ret_msg yield ret_msg
async with self.store.lock: # Removes all temporary data for old blocks
# Removes all temporary data for old blocks lowest_tip = min(tip.height for tip in self.blockchain.get_current_tips())
lowest_tip = min(tip.height for tip in self.blockchain.get_current_tips()) clear_height = uint32(max(0, lowest_tip - 30))
clear_height = uint32(max(0, lowest_tip - 30)) self.store.clear_candidate_blocks_below(clear_height)
await self.store.clear_candidate_blocks_below(clear_height) self.store.clear_unfinished_blocks_below(clear_height)
await self.store.clear_unfinished_blocks_below(clear_height) self.store.clear_disconnected_blocks_below(clear_height)
await self.store.clear_disconnected_blocks_below(clear_height)
@api_request @api_request
async def request_block( async def request_block(

View File

@ -56,7 +56,7 @@ class RpcApiHandler:
tips: List[SmallHeaderBlock] = self.full_node.blockchain.get_current_tips() tips: List[SmallHeaderBlock] = self.full_node.blockchain.get_current_tips()
lca: SmallHeaderBlock = self.full_node.blockchain.lca_block lca: SmallHeaderBlock = self.full_node.blockchain.lca_block
assert lca.challenge is not None assert lca.challenge is not None
sync_mode: bool = await self.full_node.store.get_sync_mode() sync_mode: bool = self.full_node.store.get_sync_mode()
difficulty: uint64 = self.full_node.blockchain.get_next_difficulty( difficulty: uint64 = self.full_node.blockchain.get_next_difficulty(
lca.header_hash lca.header_hash
) )
@ -176,10 +176,9 @@ class RpcApiHandler:
TODO: remove after transactions and coins are added. TODO: remove after transactions and coins are added.
""" """
async with self.full_node.store.lock: ppks: List[
ppks: List[ Tuple[uint32, PublicKey]
Tuple[uint32, PublicKey] ] = await self.full_node.store.get_pool_pks_hack()
] = await self.full_node.store.get_pool_pks_hack()
coin_balances: Dict[str, uint64] = {} coin_balances: Dict[str, uint64] = {}
for height, pk in ppks: for height, pk in ppks:
@ -201,8 +200,8 @@ class RpcApiHandler:
assert tips[i].challenge is not None assert tips[i].challenge is not None
challenge: Challenge = tips[i].challenge # type: ignore challenge: Challenge = tips[i].challenge # type: ignore
max_tip: SmallHeaderBlock = SmallHeaderBlock(tips[i].header, challenge) max_tip: SmallHeaderBlock = SmallHeaderBlock(tips[i].header, challenge)
if await self.full_node.store.get_sync_mode(): if self.full_node.store.get_sync_mode():
potential_tips = await self.full_node.store.get_potential_tips_tuples() potential_tips = self.full_node.store.get_potential_tips_tuples()
for _, pot_block in potential_tips: for _, pot_block in potential_tips:
if pot_block.weight > max_tip.weight: if pot_block.weight > max_tip.weight:
assert pot_block.header_block.challenge is not None assert pot_block.header_block.challenge is not None

View File

@ -191,10 +191,10 @@ class FullNodeStore:
return FullBlock.from_bytes(row[1]) return FullBlock.from_bytes(row[1])
return None return None
async def add_disconnected_block(self, block: FullBlock) -> None: def add_disconnected_block(self, block: FullBlock) -> None:
self.disconnected_blocks[block.header_hash] = block self.disconnected_blocks[block.header_hash] = block
async def get_disconnected_block_by_prev( def get_disconnected_block_by_prev(
self, prev_header_hash: bytes32 self, prev_header_hash: bytes32
) -> Optional[FullBlock]: ) -> Optional[FullBlock]:
for _, block in self.disconnected_blocks.items(): for _, block in self.disconnected_blocks.items():
@ -202,18 +202,18 @@ class FullNodeStore:
return block return block
return None return None
async def get_disconnected_block(self, header_hash: bytes32) -> Optional[FullBlock]: def get_disconnected_block(self, header_hash: bytes32) -> Optional[FullBlock]:
return self.disconnected_blocks.get(header_hash, None) return self.disconnected_blocks.get(header_hash, None)
async def clear_disconnected_blocks_below(self, height: uint32) -> None: def clear_disconnected_blocks_below(self, height: uint32) -> None:
for key in list(self.disconnected_blocks.keys()): for key in list(self.disconnected_blocks.keys()):
if self.disconnected_blocks[key].height < height: if self.disconnected_blocks[key].height < height:
del self.disconnected_blocks[key] del self.disconnected_blocks[key]
async def set_sync_mode(self, sync_mode: bool) -> None: def set_sync_mode(self, sync_mode: bool) -> None:
self.sync_mode = sync_mode self.sync_mode = sync_mode
async def get_sync_mode(self) -> bool: def get_sync_mode(self) -> bool:
return self.sync_mode return self.sync_mode
async def clear_sync_info(self): async def clear_sync_info(self):
@ -224,13 +224,13 @@ class FullNodeStore:
self.potential_blocks_received.clear() self.potential_blocks_received.clear()
self.potential_future_blocks.clear() self.potential_future_blocks.clear()
async def get_potential_tips_tuples(self) -> List[Tuple[bytes32, FullBlock]]: def get_potential_tips_tuples(self) -> List[Tuple[bytes32, FullBlock]]:
return list(self.potential_tips.items()) return list(self.potential_tips.items())
async def add_potential_tip(self, block: FullBlock) -> None: def add_potential_tip(self, block: FullBlock) -> None:
self.potential_tips[block.header_hash] = block self.potential_tips[block.header_hash] = block
async def get_potential_tip(self, header_hash: bytes32) -> Optional[FullBlock]: def get_potential_tip(self, header_hash: bytes32) -> Optional[FullBlock]:
return self.potential_tips.get(header_hash, None) return self.potential_tips.get(header_hash, None)
def add_potential_header(self, block: HeaderBlock) -> None: def add_potential_header(self, block: HeaderBlock) -> None:
@ -266,18 +266,18 @@ class FullNodeStore:
def get_potential_blocks_received(self, height: uint32) -> asyncio.Event: def get_potential_blocks_received(self, height: uint32) -> asyncio.Event:
return self.potential_blocks_received[height] return self.potential_blocks_received[height]
async def add_potential_future_block(self, block: FullBlock): def add_potential_future_block(self, block: FullBlock):
self.potential_future_blocks.append(block) self.potential_future_blocks.append(block)
async def get_potential_future_blocks(self): def get_potential_future_blocks(self):
return self.potential_future_blocks return self.potential_future_blocks
async def add_candidate_block( def add_candidate_block(
self, pos_hash: bytes32, body: Body, header: HeaderData, pos: ProofOfSpace, self, pos_hash: bytes32, body: Body, header: HeaderData, pos: ProofOfSpace,
): ):
self.candidate_blocks[pos_hash] = (body, header, pos, body.coinbase.height) self.candidate_blocks[pos_hash] = (body, header, pos, body.coinbase.height)
async def get_candidate_block( def get_candidate_block(
self, pos_hash: bytes32 self, pos_hash: bytes32
) -> Optional[Tuple[Body, HeaderData, ProofOfSpace]]: ) -> Optional[Tuple[Body, HeaderData, ProofOfSpace]]:
res = self.candidate_blocks.get(pos_hash, None) res = self.candidate_blocks.get(pos_hash, None)
@ -285,19 +285,17 @@ class FullNodeStore:
return None return None
return (res[0], res[1], res[2]) return (res[0], res[1], res[2])
async def clear_candidate_blocks_below(self, height: uint32) -> None: def clear_candidate_blocks_below(self, height: uint32) -> None:
for key in list(self.candidate_blocks.keys()): for key in list(self.candidate_blocks.keys()):
if self.candidate_blocks[key][3] < height: if self.candidate_blocks[key][3] < height:
del self.candidate_blocks[key] del self.candidate_blocks[key]
async def add_unfinished_block( def add_unfinished_block(
self, key: Tuple[bytes32, uint64], block: FullBlock self, key: Tuple[bytes32, uint64], block: FullBlock
) -> None: ) -> None:
self.unfinished_blocks[key] = block self.unfinished_blocks[key] = block
async def get_unfinished_block( def get_unfinished_block(self, key: Tuple[bytes32, uint64]) -> Optional[FullBlock]:
self, key: Tuple[bytes32, uint64]
) -> Optional[FullBlock]:
return self.unfinished_blocks.get(key, None) return self.unfinished_blocks.get(key, None)
def seen_unfinished_block(self, header_hash: bytes32) -> bool: def seen_unfinished_block(self, header_hash: bytes32) -> bool:
@ -309,10 +307,10 @@ class FullNodeStore:
def clear_seen_unfinished_blocks(self) -> None: def clear_seen_unfinished_blocks(self) -> None:
self.seen_unfinished_blocks.clear() self.seen_unfinished_blocks.clear()
async def get_unfinished_blocks(self) -> Dict[Tuple[bytes32, uint64], FullBlock]: def get_unfinished_blocks(self) -> Dict[Tuple[bytes32, uint64], FullBlock]:
return self.unfinished_blocks.copy() return self.unfinished_blocks.copy()
async def clear_unfinished_blocks_below(self, height: uint32) -> None: def clear_unfinished_blocks_below(self, height: uint32) -> None:
for key in list(self.unfinished_blocks.keys()): for key in list(self.unfinished_blocks.keys()):
if self.unfinished_blocks[key].height < height: if self.unfinished_blocks[key].height < height:
del self.unfinished_blocks[key] del self.unfinished_blocks[key]
@ -323,8 +321,8 @@ class FullNodeStore:
def get_unfinished_block_leader(self) -> Tuple[bytes32, uint64]: def get_unfinished_block_leader(self) -> Tuple[bytes32, uint64]:
return self.unfinished_blocks_leader return self.unfinished_blocks_leader
async def set_proof_of_time_estimate_ips(self, estimate: uint64): def set_proof_of_time_estimate_ips(self, estimate: uint64):
self.proof_of_time_estimate_ips = estimate self.proof_of_time_estimate_ips = estimate
async def get_proof_of_time_estimate_ips(self) -> uint64: def get_proof_of_time_estimate_ips(self) -> uint64:
return self.proof_of_time_estimate_ips return self.proof_of_time_estimate_ips

View File

@ -76,15 +76,15 @@ class TestStore:
# Save/get sync # Save/get sync
for sync_mode in (False, True): for sync_mode in (False, True):
await db.set_sync_mode(sync_mode) db.set_sync_mode(sync_mode)
assert sync_mode == await db.get_sync_mode() assert sync_mode == db.get_sync_mode()
# clear sync info # clear sync info
await db.clear_sync_info() await db.clear_sync_info()
# add/get potential tip, get potential tips num # add/get potential tip, get potential tips num
await db.add_potential_tip(blocks[6]) db.add_potential_tip(blocks[6])
assert blocks[6] == await db.get_potential_tip(blocks[6].header_hash) assert blocks[6] == db.get_potential_tip(blocks[6].header_hash)
# add/get potential trunk # add/get potential trunk
header = genesis.header_block header = genesis.header_block
@ -96,16 +96,16 @@ class TestStore:
assert genesis == await db.get_potential_block(uint32(0)) assert genesis == await db.get_potential_block(uint32(0))
# Add/get candidate block # Add/get candidate block
assert await db.get_candidate_block(0) is None assert db.get_candidate_block(0) is None
partial = ( partial = (
blocks[5].body, blocks[5].body,
blocks[5].header_block.header.data, blocks[5].header_block.header.data,
blocks[5].header_block.proof_of_space, blocks[5].header_block.proof_of_space,
) )
await db.add_candidate_block(blocks[5].header_hash, *partial) db.add_candidate_block(blocks[5].header_hash, *partial)
assert await db.get_candidate_block(blocks[5].header_hash) == partial assert db.get_candidate_block(blocks[5].header_hash) == partial
await db.clear_candidate_blocks_below(uint32(8)) db.clear_candidate_blocks_below(uint32(8))
assert await db.get_candidate_block(blocks[5].header_hash) is None assert db.get_candidate_block(blocks[5].header_hash) is None
# Add/get unfinished block # Add/get unfinished block
i = 1 i = 1
@ -113,29 +113,29 @@ class TestStore:
key = (block.header_hash, uint64(1000)) key = (block.header_hash, uint64(1000))
# Different database should have different data # Different database should have different data
await db_2.add_unfinished_block(key, block) db_2.add_unfinished_block(key, block)
assert await db.get_unfinished_block(key) is None assert db.get_unfinished_block(key) is None
await db.add_unfinished_block(key, block) db.add_unfinished_block(key, block)
assert await db.get_unfinished_block(key) == block assert db.get_unfinished_block(key) == block
assert len(await db.get_unfinished_blocks()) == i assert len(db.get_unfinished_blocks()) == i
i += 1 i += 1
await db.clear_unfinished_blocks_below(uint32(5)) db.clear_unfinished_blocks_below(uint32(5))
assert len(await db.get_unfinished_blocks()) == 5 assert len(db.get_unfinished_blocks()) == 5
# Set/get unf block leader # Set/get unf block leader
assert db.get_unfinished_block_leader() == (0, (1 << 64) - 1) assert db.get_unfinished_block_leader() == (0, (1 << 64) - 1)
db.set_unfinished_block_leader(key) db.set_unfinished_block_leader(key)
assert db.get_unfinished_block_leader() == key assert db.get_unfinished_block_leader() == key
assert await db.get_disconnected_block(blocks[0].prev_header_hash) is None assert db.get_disconnected_block(blocks[0].prev_header_hash) is None
# Disconnected blocks # Disconnected blocks
for block in blocks: for block in blocks:
await db.add_disconnected_block(block) db.add_disconnected_block(block)
await db.get_disconnected_block(block.prev_header_hash) == block db.get_disconnected_block(block.prev_header_hash) == block
await db.clear_disconnected_blocks_below(uint32(5)) db.clear_disconnected_blocks_below(uint32(5))
assert await db.get_disconnected_block(blocks[4].prev_header_hash) is None assert db.get_disconnected_block(blocks[4].prev_header_hash) is None
h_hash_1 = bytes32(token_bytes(32)) h_hash_1 = bytes32(token_bytes(32))
assert not db.seen_unfinished_block(h_hash_1) assert not db.seen_unfinished_block(h_hash_1)
@ -172,10 +172,18 @@ class TestStore:
if random.random() < 0.5: if random.random() < 0.5:
tasks.append(asyncio.create_task(db.add_block(blocks[rand_i]))) tasks.append(asyncio.create_task(db.add_block(blocks[rand_i])))
if random.random() < 0.5: if random.random() < 0.5:
tasks.append(asyncio.create_task(db.add_potential_block(blocks[rand_i]))) tasks.append(
asyncio.create_task(db.add_potential_block(blocks[rand_i]))
)
if random.random() < 0.5: if random.random() < 0.5:
tasks.append(asyncio.create_task(db.get_block(blocks[rand_i].header_hash))) tasks.append(
asyncio.create_task(db.get_block(blocks[rand_i].header_hash))
)
if random.random() < 0.5: if random.random() < 0.5:
tasks.append(asyncio.create_task(db.get_potential_block(blocks[rand_i].header_hash))) tasks.append(
asyncio.create_task(
db.get_potential_block(blocks[rand_i].header_hash)
)
)
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
await db.close() await db.close()