Full node signage points and sub slots working
This commit is contained in:
parent
3acde9e667
commit
8964825f37
|
@ -6,7 +6,7 @@ testnet_kwargs = {
|
|||
"SLOT_SUB_BLOCKS_TARGET": 32,
|
||||
"MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK": 16,
|
||||
"MAX_SUB_SLOT_SUB_BLOCKS": 128,
|
||||
"NUM_SPS_SUB_SLOT": 64,
|
||||
"NUM_SPS_SUB_SLOT": 16,
|
||||
"SUB_SLOT_ITERS_STARTING": 2 ** 24,
|
||||
# DIFFICULTY_STARTING is the starting difficulty for the first epoch, which is then further
|
||||
# multiplied by another factor of 2^25, to be used in the VDF iter calculation formula.
|
||||
|
|
|
@ -199,18 +199,19 @@ class WebSocketServer:
|
|||
if service_name in self.connections:
|
||||
sockets = self.connections[service_name]
|
||||
for socket in sockets:
|
||||
try:
|
||||
self.log.info(f"About to ping: {service_name}")
|
||||
await socket.ping()
|
||||
except asyncio.CancelledError:
|
||||
self.log.info("Ping task received Cancel")
|
||||
restart = False
|
||||
break
|
||||
except Exception as e:
|
||||
self.log.info(f"Ping error: {e}")
|
||||
self.log.warning("Ping failed, connection closed.")
|
||||
self.remove_connection(socket)
|
||||
await socket.close()
|
||||
if socket.remote_address[1] == remote_address:
|
||||
try:
|
||||
self.log.info(f"About to ping: {service_name}")
|
||||
await socket.ping()
|
||||
except asyncio.CancelledError:
|
||||
self.log.info("Ping task received Cancel")
|
||||
restart = False
|
||||
break
|
||||
except Exception as e:
|
||||
self.log.info(f"Ping error: {e}")
|
||||
self.log.warning("Ping failed, connection closed.")
|
||||
self.remove_connection(socket)
|
||||
await socket.close()
|
||||
if restart is True:
|
||||
self.ping_job = asyncio.create_task(self.ping_task())
|
||||
|
||||
|
|
|
@ -491,6 +491,7 @@ class FullNode:
|
|||
# If there were pending end of slots that happen after this peak, broadcast them if they are added
|
||||
if added_eos is not None:
|
||||
broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot(
|
||||
added_eos.challenge_chain.challenge_chain_end_of_slot_vdf.challenge,
|
||||
added_eos.challenge_chain.get_hash(),
|
||||
uint8(0),
|
||||
added_eos.reward_chain.end_of_slot_vdf.challenge,
|
||||
|
|
|
@ -254,12 +254,25 @@ class FullNodeAPI:
|
|||
):
|
||||
return
|
||||
|
||||
if self.full_node.full_node_store.get_sub_slot(new_sp.challenge_hash) is None:
|
||||
full_node_request = full_node_protocol.RequestSignagePointOrEndOfSubSlot(
|
||||
new_sp.challenge_hash, uint8(0), new_sp.last_rc_infusion
|
||||
)
|
||||
if new_sp.index_from_challenge == 0 and new_sp.prev_challenge_hash is not None:
|
||||
if self.full_node.full_node_store.get_sub_slot(new_sp.prev_challenge_hash) is None:
|
||||
# If this is an end of sub slot, and we don't have the prev, request the prev instead
|
||||
full_node_request = full_node_protocol.RequestSignagePointOrEndOfSubSlot(
|
||||
new_sp.prev_challenge_hash, uint8(0), new_sp.last_rc_infusion
|
||||
)
|
||||
return Message("request_signage_point_or_end_of_sub_slot", full_node_request)
|
||||
if new_sp.index_from_challenge > 0:
|
||||
if (
|
||||
new_sp.challenge_hash != self.full_node.constants.FIRST_CC_CHALLENGE
|
||||
and self.full_node.full_node_store.get_sub_slot(new_sp.challenge_hash) is None
|
||||
):
|
||||
# If this is a normal signage point,, and we don't have the end of sub slot, request the end of sub slot
|
||||
full_node_request = full_node_protocol.RequestSignagePointOrEndOfSubSlot(
|
||||
new_sp.challenge_hash, uint8(0), new_sp.last_rc_infusion
|
||||
)
|
||||
return Message("request_signage_point_or_end_of_sub_slot", full_node_request)
|
||||
|
||||
return Message("request_signage_point_or_end_of_sub_slot", full_node_request)
|
||||
# Otherwise (we have the prev or the end of sub slot), request it normally
|
||||
full_node_request = full_node_protocol.RequestSignagePointOrEndOfSubSlot(
|
||||
new_sp.challenge_hash, new_sp.index_from_challenge, new_sp.last_rc_infusion
|
||||
)
|
||||
|
@ -275,13 +288,13 @@ class FullNodeAPI:
|
|||
request.challenge_hash
|
||||
)
|
||||
if sub_slot is not None:
|
||||
return Message("respond_end_of_slot", full_node_protocol.RespondEndOfSubSlot(sub_slot[0]))
|
||||
return Message("respond_end_of_sub_slot", full_node_protocol.RespondEndOfSubSlot(sub_slot[0]))
|
||||
else:
|
||||
self.log.warning("Don't have sub slot")
|
||||
else:
|
||||
if self.full_node.full_node_store.get_sub_slot(request.challenge_hash) is None:
|
||||
if request.challenge_hash != self.full_node.constants.FIRST_CC_CHALLENGE:
|
||||
self.log.warning(f"Done have challenge hash {request.challenge_hash}")
|
||||
self.log.warning(f"Don't have challenge hash {request.challenge_hash}")
|
||||
|
||||
sp: Optional[SignagePoint] = self.full_node.full_node_store.get_signage_point_by_index(
|
||||
request.challenge_hash, request.index_from_challenge, request.last_rc_infusion
|
||||
|
@ -331,19 +344,32 @@ class FullNodeAPI:
|
|||
)
|
||||
|
||||
if added:
|
||||
self.log.info(
|
||||
f"Finished signage point {request.index_from_challenge}/{self.full_node.constants.NUM_SPS_SUB_SLOT}"
|
||||
)
|
||||
sub_slot_tuple = self.full_node.full_node_store.get_sub_slot(request.challenge_chain_vdf.challenge)
|
||||
if sub_slot_tuple is not None:
|
||||
prev_challenge = sub_slot_tuple[0].challenge_chain.challenge_chain_end_of_slot_vdf.challenge
|
||||
else:
|
||||
prev_challenge = None
|
||||
# Notify nodes of the new signage point
|
||||
broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot(
|
||||
prev_challenge,
|
||||
request.challenge_chain_vdf.challenge,
|
||||
request.index_from_challenge,
|
||||
request.reward_chain_vdf.challenge,
|
||||
)
|
||||
msg = Message("new_signage_point_or_end_of_sub_slot", broadcast)
|
||||
await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id)
|
||||
|
||||
if peak is not None and peak.height > 2:
|
||||
# Makes sure to potentially update the difficulty if we are past the peak (into a new sub-slot)
|
||||
assert ip_sub_slot is not None
|
||||
if request.challenge_chain_vdf.challenge != ip_sub_slot.challenge_chain.get_hash():
|
||||
difficulty = next_difficulty
|
||||
sub_slot_iters = next_sub_slot_iters
|
||||
|
||||
# Notify farmers of the new signage point
|
||||
broadcast_farmer = farmer_protocol.NewSignagePoint(
|
||||
request.challenge_chain_vdf.challenge,
|
||||
request.challenge_chain_vdf.output.get_hash(),
|
||||
|
@ -371,14 +397,21 @@ class FullNodeAPI:
|
|||
else:
|
||||
next_sub_slot_iters = self.full_node.constants.SUB_SLOT_ITERS_STARTING
|
||||
next_difficulty = self.full_node.constants.DIFFICULTY_STARTING
|
||||
|
||||
# Adds the sub slot and potentially get new infusions
|
||||
new_infusions = self.full_node.full_node_store.new_finished_sub_slot(
|
||||
request.end_of_slot_bundle, self.full_node.blockchain.sub_blocks, self.full_node.blockchain.get_peak()
|
||||
)
|
||||
|
||||
# It may be an empty list, even if it's not None. Not None means added successfully
|
||||
if new_infusions is not None:
|
||||
print(f"Finished sub slot {request.end_of_slot_bundle.challenge_chain.get_hash()}")
|
||||
self.log.info(
|
||||
f"Finished sub slot {request.end_of_slot_bundle.challenge_chain.get_hash()}, number of sub-slots: "
|
||||
f"{len(self.full_node.full_node_store.finished_sub_slots)}"
|
||||
)
|
||||
# Notify full nodes of the new sub-slot
|
||||
broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot(
|
||||
request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge,
|
||||
request.end_of_slot_bundle.challenge_chain.get_hash(),
|
||||
uint8(0),
|
||||
request.end_of_slot_bundle.reward_chain.end_of_slot_vdf.challenge,
|
||||
|
@ -389,6 +422,7 @@ class FullNodeAPI:
|
|||
for infusion in new_infusions:
|
||||
await self.new_infusion_point_vdf(infusion)
|
||||
|
||||
# Notify farmers of the new sub-slot
|
||||
broadcast_farmer = farmer_protocol.NewSignagePoint(
|
||||
request.end_of_slot_bundle.challenge_chain.get_hash(),
|
||||
request.end_of_slot_bundle.challenge_chain.get_hash(),
|
||||
|
|
|
@ -34,7 +34,7 @@ class Introducer:
|
|||
return
|
||||
try:
|
||||
self.log.info("Vetting random peers.")
|
||||
await asyncio.sleep(5)
|
||||
await asyncio.sleep(60)
|
||||
if self.server.introducer_peers is None:
|
||||
continue
|
||||
rawpeers = self.server.introducer_peers.get_peers(100, True, 3 * self.recent_peer_threshold)
|
||||
|
|
|
@ -94,8 +94,8 @@ class RespondUnfinishedSubBlock:
|
|||
@dataclass(frozen=True)
|
||||
@cbor_message
|
||||
class NewSignagePointOrEndOfSubSlot:
|
||||
prev_challenge_hash: uint32
|
||||
challenge_hash: uint32
|
||||
prev_challenge_hash: Optional[bytes32]
|
||||
challenge_hash: bytes32
|
||||
index_from_challenge: uint8
|
||||
last_rc_infusion: bytes32
|
||||
|
||||
|
@ -103,7 +103,7 @@ class NewSignagePointOrEndOfSubSlot:
|
|||
@dataclass(frozen=True)
|
||||
@cbor_message
|
||||
class RequestSignagePointOrEndOfSubSlot:
|
||||
challenge_hash: uint32
|
||||
challenge_hash: bytes32
|
||||
index_from_challenge: uint8
|
||||
last_rc_infusion: bytes32
|
||||
|
||||
|
|
|
@ -93,7 +93,6 @@ class RpcServer:
|
|||
con_info = [
|
||||
{
|
||||
"type": con.connection_type,
|
||||
"local_host": con.local_host,
|
||||
"local_port": con.local_port,
|
||||
"peer_host": con.peer_host,
|
||||
"peer_port": con.peer_port,
|
||||
|
|
|
@ -157,10 +157,13 @@ class ChiaServer:
|
|||
if self._local_type is NodeType.INTRODUCER and connection.connection_type is NodeType.FULL_NODE:
|
||||
self.introducer_peers.add(connection.get_peer_info())
|
||||
except Exception as e:
|
||||
error_stack = traceback.format_exc()
|
||||
self.log.error(f"Exception: {e}")
|
||||
self.log.error(f"Exception Stack: {error_stack}")
|
||||
close_event.set()
|
||||
if isinstance(e, ProtocolError) and e.code == Err.SELF_CONNECTION:
|
||||
close_event.set()
|
||||
else:
|
||||
error_stack = traceback.format_exc()
|
||||
self.log.error(f"Exception: {e}")
|
||||
self.log.error(f"Exception Stack: {error_stack}")
|
||||
close_event.set()
|
||||
|
||||
await close_event.wait()
|
||||
return ws
|
||||
|
@ -218,9 +221,12 @@ class ChiaServer:
|
|||
except aiohttp.client_exceptions.ClientConnectorError as e:
|
||||
self.log.warning(f"{e}")
|
||||
except Exception as e:
|
||||
error_stack = traceback.format_exc()
|
||||
self.log.error(f"Exception: {e}")
|
||||
self.log.error(f"Exception Stack: {error_stack}")
|
||||
if isinstance(e, ProtocolError) and e.code == Err.SELF_CONNECTION:
|
||||
pass
|
||||
else:
|
||||
error_stack = traceback.format_exc()
|
||||
self.log.error(f"Exception: {e}")
|
||||
self.log.error(f"Exception Stack: {error_stack}")
|
||||
if session is not None:
|
||||
await session.close()
|
||||
|
||||
|
|
|
@ -48,7 +48,6 @@ class WSChiaConnection:
|
|||
# Local properties
|
||||
self.ws: Any = ws
|
||||
self.local_type = local_type
|
||||
self.local_host = ""
|
||||
self.local_port = server_port
|
||||
# Remote properties
|
||||
self.peer_host = peer_host
|
||||
|
@ -108,6 +107,7 @@ class WSChiaConnection:
|
|||
self.peer_node_id = inbound_handshake.node_id
|
||||
self.peer_server_port = int(inbound_handshake.server_port)
|
||||
self.connection_type = inbound_handshake.node_type
|
||||
|
||||
else:
|
||||
payload = await self._read_one_message()
|
||||
inbound_handshake = Handshake(**payload.msg.data)
|
||||
|
@ -129,6 +129,9 @@ class WSChiaConnection:
|
|||
self.peer_server_port = int(inbound_handshake.server_port)
|
||||
self.connection_type = inbound_handshake.node_type
|
||||
|
||||
if self.peer_node_id == node_id:
|
||||
raise ProtocolError(Err.SELF_CONNECTION)
|
||||
|
||||
self.outbound_task = asyncio.create_task(self.outbound_handler())
|
||||
self.inbound_task = asyncio.create_task(self.inbound_handler())
|
||||
return True
|
||||
|
@ -276,6 +279,8 @@ class WSChiaConnection:
|
|||
f"{self.peer_port}"
|
||||
)
|
||||
asyncio.create_task(self.close())
|
||||
elif message.type == WSMsgType.CLOSED:
|
||||
pass
|
||||
elif message.type == WSMsgType.BINARY:
|
||||
data = message.data
|
||||
full_message_loaded: Any = cbor.loads(data)
|
||||
|
@ -286,7 +291,7 @@ class WSChiaConnection:
|
|||
payload = Payload(msg, payload_id)
|
||||
return payload
|
||||
else:
|
||||
self.log.error(f"Not binary message: {message}")
|
||||
self.log.error(f"Unexpected WebSocket message type: {message}")
|
||||
await self.close()
|
||||
return None
|
||||
|
||||
|
|
|
@ -299,7 +299,8 @@ class Timelord:
|
|||
# Adjust all signage points iterations to the peak.
|
||||
iters_per_signage = uint64(sub_slot_iters // self.constants.NUM_SPS_SUB_SLOT)
|
||||
self.signage_point_iters = [
|
||||
(k * iters_per_signage - ip_iters, k) for k in range(1, self.constants.NUM_SPS_SUB_SLOT)
|
||||
(k * iters_per_signage - ip_iters, k)
|
||||
for k in range(1, self.constants.NUM_SPS_SUB_SLOT)
|
||||
if k * iters_per_signage - ip_iters > 0
|
||||
]
|
||||
for sp, k in self.signage_point_iters:
|
||||
|
@ -446,7 +447,6 @@ class Timelord:
|
|||
rc_info,
|
||||
rc_proof,
|
||||
)
|
||||
log.info(f"Finished signage point {signage_point_index}/{self.constants.NUM_SPS_SUB_SLOT}")
|
||||
if self.server is not None:
|
||||
msg = Message("new_signage_point_vdf", response)
|
||||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||||
|
@ -469,8 +469,7 @@ class Timelord:
|
|||
|
||||
async def _check_for_new_ip(self):
|
||||
infusion_iters = [
|
||||
iteration for iteration, t in self.iteration_to_proof_type.items()
|
||||
if t == IterationType.INFUSION_POINT
|
||||
iteration for iteration, t in self.iteration_to_proof_type.items() if t == IterationType.INFUSION_POINT
|
||||
]
|
||||
for iteration in infusion_iters:
|
||||
proofs_with_iter = [
|
||||
|
@ -514,9 +513,7 @@ class Timelord:
|
|||
icc_info = info
|
||||
icc_proof = proof
|
||||
if cc_info is None or cc_proof is None or rc_info is None or rc_proof is None:
|
||||
log.error(
|
||||
f"Insufficient VDF proofs for infusion point ch: {challenge} iterations:{iteration}"
|
||||
)
|
||||
log.error(f"Insufficient VDF proofs for infusion point ch: {challenge} iterations:{iteration}")
|
||||
log.info(f"Generated infusion point for challenge: {challenge} iterations: {iteration}.")
|
||||
response = timelord_protocol.NewInfusionPointVDF(
|
||||
challenge,
|
||||
|
|
|
@ -175,7 +175,7 @@ class TestFullNodeProtocol:
|
|||
assert full_node_1.full_node.blockchain.get_peak().height == 29
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_respond_end_of_slot(self, two_empty_nodes):
|
||||
async def test_respond_end_of_sub_slot(self, two_empty_nodes):
|
||||
full_node_1, full_node_2, server_1, server_2 = two_empty_nodes
|
||||
|
||||
incoming_queue, dummy_node_id = await add_dummy_connection(server_1, 12312)
|
||||
|
|
Loading…
Reference in New Issue