Improved timelord logic
This commit is contained in:
parent
c78d870ec1
commit
5247032828
|
@ -686,12 +686,12 @@ void NWesolowskiMain(integer D, form x, int64_t num_iterations, WesolowskiCallba
|
||||||
}
|
}
|
||||||
|
|
||||||
void PollTimelord(tcp::socket& sock, bool& got_iters) {
|
void PollTimelord(tcp::socket& sock, bool& got_iters) {
|
||||||
// Wait for 60s, if no iters come, poll each 15 seconds the timelord.
|
// Wait for 15s, if no iters come, poll each 5 seconds the timelord.
|
||||||
int seconds = 0;
|
int seconds = 0;
|
||||||
while (!got_iters) {
|
while (!got_iters) {
|
||||||
std::this_thread::sleep_for (std::chrono::seconds(1));
|
std::this_thread::sleep_for (std::chrono::seconds(1));
|
||||||
seconds++;
|
seconds++;
|
||||||
if (seconds >= 60 && (seconds - 60) % 15 == 0) {
|
if (seconds >= 15 && (seconds - 15) % 5 == 0) {
|
||||||
socket_mutex.lock();
|
socket_mutex.lock();
|
||||||
boost::asio::write(sock, boost::asio::buffer("POLL", 4));
|
boost::asio::write(sock, boost::asio::buffer("POLL", 4));
|
||||||
socket_mutex.unlock();
|
socket_mutex.unlock();
|
||||||
|
@ -801,10 +801,18 @@ void session(tcp::socket sock) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (std::exception& e) {
|
||||||
|
std::cerr << "Exception in thread: " << e.what() << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
// Tell client I've stopped everything, wait for ACK and close.
|
// Tell client I've stopped everything, wait for ACK and close.
|
||||||
|
boost::system::error_code error;
|
||||||
|
|
||||||
|
std::cout << "Stopped everything! Ready for the next challenge.\n";
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(socket_mutex);
|
std::lock_guard<std::mutex> lock(socket_mutex);
|
||||||
boost::asio::write(sock, boost::asio::buffer("STOP", 4));
|
boost::asio::write(sock, boost::asio::buffer("STOP", 4));
|
||||||
std::cout << "Stopped everything! Ready for the next challenge.\n";
|
|
||||||
|
|
||||||
char ack[5];
|
char ack[5];
|
||||||
memset(ack,0x00,sizeof(ack));
|
memset(ack,0x00,sizeof(ack));
|
||||||
|
|
|
@ -361,7 +361,6 @@ class Blockchain:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# 4. Check PoT
|
# 4. Check PoT
|
||||||
# TODO(Florin): Change the hardcode of the genesis with new pot format.
|
|
||||||
if not block.trunk_block.proof_of_time.is_valid(self.constants["DISCRIMINANT_SIZE_BITS"]):
|
if not block.trunk_block.proof_of_time.is_valid(self.constants["DISCRIMINANT_SIZE_BITS"]):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,7 @@ async def send_challenges_to_timelords() -> AsyncGenerator[OutboundMessage, None
|
||||||
async with db.lock:
|
async with db.lock:
|
||||||
for head in db.blockchain.get_current_heads():
|
for head in db.blockchain.get_current_heads():
|
||||||
challenge_hash = head.challenge.get_hash()
|
challenge_hash = head.challenge.get_hash()
|
||||||
requests.append(timelord_protocol.ChallengeStart(challenge_hash))
|
requests.append(timelord_protocol.ChallengeStart(challenge_hash, head.challenge.height))
|
||||||
for request in requests:
|
for request in requests:
|
||||||
yield OutboundMessage(NodeType.TIMELORD, Message("challenge_start", request), Delivery.BROADCAST)
|
yield OutboundMessage(NodeType.TIMELORD, Message("challenge_start", request), Delivery.BROADCAST)
|
||||||
|
|
||||||
|
@ -608,8 +608,9 @@ async def block(block: peer_protocol.Block) -> AsyncGenerator[OutboundMessage, N
|
||||||
block.block.trunk_block.challenge.height,
|
block.block.trunk_block.challenge.height,
|
||||||
pos_quality,
|
pos_quality,
|
||||||
difficulty)
|
difficulty)
|
||||||
timelord_request = timelord_protocol.ChallengeStart(block.block.trunk_block.challenge.get_hash())
|
timelord_request = timelord_protocol.ChallengeStart(block.block.trunk_block.challenge.get_hash(),
|
||||||
timelord_request_end = timelord_protocol.ChallengeStart(block.block.trunk_block.proof_of_time.
|
block.block.trunk_block.challenge.height)
|
||||||
|
timelord_request_end = timelord_protocol.ChallengeEnd(block.block.trunk_block.proof_of_time.
|
||||||
output.challenge_hash)
|
output.challenge_hash)
|
||||||
# Tell timelord to stop previous challenge and start with new one
|
# Tell timelord to stop previous challenge and start with new one
|
||||||
yield OutboundMessage(NodeType.TIMELORD, Message("challenge_end", timelord_request_end), Delivery.BROADCAST)
|
yield OutboundMessage(NodeType.TIMELORD, Message("challenge_end", timelord_request_end), Delivery.BROADCAST)
|
||||||
|
@ -620,10 +621,9 @@ async def block(block: peer_protocol.Block) -> AsyncGenerator[OutboundMessage, N
|
||||||
|
|
||||||
# Tell farmer about the new block
|
# Tell farmer about the new block
|
||||||
yield OutboundMessage(NodeType.FARMER, Message("proof_of_space_finalized", farmer_request), Delivery.BROADCAST)
|
yield OutboundMessage(NodeType.FARMER, Message("proof_of_space_finalized", farmer_request), Delivery.BROADCAST)
|
||||||
else:
|
elif added == ReceiveBlockResult.ADDED_AS_ORPHAN:
|
||||||
# Note(Florin): This is a hack...
|
log.info("I've received an orphan, stopping the proof of time challenge.")
|
||||||
log.info("I've received a block, stopping the challenge to free up the VDF server...")
|
log.info(f"Height of the orphan block is {block.block.trunk_block.challenge.height}")
|
||||||
log.info(f"Height of received block = {block.block.trunk_block.challenge.height}")
|
timelord_request_end = timelord_protocol.ChallengeEnd(block.block.trunk_block.proof_of_time.
|
||||||
timelord_request_end = timelord_protocol.ChallengeStart(block.block.trunk_block.proof_of_time.
|
|
||||||
output.challenge_hash)
|
output.challenge_hash)
|
||||||
yield OutboundMessage(NodeType.TIMELORD, Message("challenge_end", timelord_request_end), Delivery.BROADCAST)
|
yield OutboundMessage(NodeType.TIMELORD, Message("challenge_end", timelord_request_end), Delivery.BROADCAST)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
from src.util.cbor_message import cbor_message
|
from src.util.cbor_message import cbor_message
|
||||||
from src.types.sized_bytes import bytes32
|
from src.types.sized_bytes import bytes32
|
||||||
from src.util.ints import uint64
|
from src.util.ints import uint32, uint64
|
||||||
from src.types.proof_of_time import ProofOfTime
|
from src.types.proof_of_time import ProofOfTime
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@ -20,7 +20,7 @@ class ProofOfTimeFinished:
|
||||||
@cbor_message(tag=3001)
|
@cbor_message(tag=3001)
|
||||||
class ChallengeStart:
|
class ChallengeStart:
|
||||||
challenge_hash: bytes32
|
challenge_hash: bytes32
|
||||||
|
height: uint32
|
||||||
|
|
||||||
@cbor_message(tag=3002)
|
@cbor_message(tag=3002)
|
||||||
class ChallengeEnd:
|
class ChallengeEnd:
|
||||||
|
|
|
@ -21,10 +21,11 @@ class Database:
|
||||||
lock: Lock = Lock()
|
lock: Lock = Lock()
|
||||||
free_servers: List[int] = []
|
free_servers: List[int] = []
|
||||||
active_discriminants: Dict = {}
|
active_discriminants: Dict = {}
|
||||||
|
pending_iters: Dict = {}
|
||||||
|
best_height = 0
|
||||||
done_discriminants = []
|
done_discriminants = []
|
||||||
seen_discriminants = []
|
seen_discriminants = []
|
||||||
counter = 0
|
active_heights = []
|
||||||
active_counters = []
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -43,27 +44,35 @@ async def challenge_start(challenge_start: timelord_protocol.ChallengeStart):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
disc: int = create_discriminant(challenge_start.challenge_hash, constants["DISCRIMINANT_SIZE_BITS"])
|
disc: int = create_discriminant(challenge_start.challenge_hash, constants["DISCRIMINANT_SIZE_BITS"])
|
||||||
|
|
||||||
async with db.lock:
|
async with db.lock:
|
||||||
if (challenge_start.challenge_hash in db.seen_discriminants):
|
if (challenge_start.challenge_hash in db.seen_discriminants):
|
||||||
log.info("Already seen this one... Ignoring")
|
log.info("Already seen this one... Ignoring")
|
||||||
return
|
return
|
||||||
db.seen_discriminants.append(challenge_start.challenge_hash)
|
db.seen_discriminants.append(challenge_start.challenge_hash)
|
||||||
db.counter += 1
|
db.active_heights.append(challenge_start.height)
|
||||||
current_counter = db.counter
|
db.best_height = max(db.best_height, challenge_start.height)
|
||||||
db.active_counters.append(db.counter)
|
|
||||||
|
|
||||||
# Wait for a server to become free.
|
# Wait for a server to become free.
|
||||||
port: int = -1
|
port: int = -1
|
||||||
while port == -1:
|
while port == -1:
|
||||||
async with db.lock:
|
async with db.lock:
|
||||||
if (len(db.free_servers) != 0):
|
if (challenge_start.height <= db.best_height - 5):
|
||||||
port = db.free_servers[0]
|
db.done_discriminants.append(challenge_start.challenge_hash)
|
||||||
db.free_servers = db.free_servers[1:]
|
db.active_heights.remove(challenge_start.height)
|
||||||
log.info(f"Discriminant {disc} attached to port {port}.")
|
log.info(f"Stopping challenge at height {challenge_start.height}")
|
||||||
|
return
|
||||||
|
assert(len(db.active_heights) > 0)
|
||||||
|
if (challenge_start.height == max(db.active_heights)):
|
||||||
|
if (len(db.free_servers) != 0):
|
||||||
|
port = db.free_servers[0]
|
||||||
|
db.free_servers = db.free_servers[1:]
|
||||||
|
log.info(f"Discriminant {disc} attached to port {port}.")
|
||||||
|
log.info(f"Height attached is {challenge_start.height}")
|
||||||
|
db.active_heights.remove(challenge_start.height)
|
||||||
|
|
||||||
# Poll until a server becomes free.
|
# Poll until a server becomes free.
|
||||||
if port == -1:
|
if port == -1:
|
||||||
await asyncio.sleep(3)
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
# TODO(Florin): Handle connection failure (attempt another server)
|
# TODO(Florin): Handle connection failure (attempt another server)
|
||||||
try:
|
try:
|
||||||
|
@ -83,6 +92,13 @@ async def challenge_start(challenge_start: timelord_protocol.ChallengeStart):
|
||||||
async with db.lock:
|
async with db.lock:
|
||||||
db.active_discriminants[challenge_start.challenge_hash] = writer
|
db.active_discriminants[challenge_start.challenge_hash] = writer
|
||||||
|
|
||||||
|
async with db.lock:
|
||||||
|
if (challenge_start.challenge_hash in db.pending_iters):
|
||||||
|
for iter in db.pending_iters[challenge_start.challenge_hash]:
|
||||||
|
writer.write((str(len(str(iter)))
|
||||||
|
+ str(iter)).encode())
|
||||||
|
await writer.drain()
|
||||||
|
|
||||||
# Listen to the server until "STOP" is received.
|
# Listen to the server until "STOP" is received.
|
||||||
while True:
|
while True:
|
||||||
data = await reader.readexactly(4)
|
data = await reader.readexactly(4)
|
||||||
|
@ -96,7 +112,7 @@ async def challenge_start(challenge_start: timelord_protocol.ChallengeStart):
|
||||||
elif (data.decode() == "POLL"):
|
elif (data.decode() == "POLL"):
|
||||||
async with db.lock:
|
async with db.lock:
|
||||||
# If I have a newer discriminant... Free up the VDF server
|
# If I have a newer discriminant... Free up the VDF server
|
||||||
if (current_counter < max(db.active_counters)):
|
if (challenge_start.height < max(db.active_heights)):
|
||||||
log.info("Got poll, stopping the challenge!")
|
log.info("Got poll, stopping the challenge!")
|
||||||
writer.write(b'10')
|
writer.write(b'10')
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
|
@ -156,15 +172,16 @@ async def proof_of_space_info(proof_of_space_info: timelord_protocol.ProofOfSpac
|
||||||
many iterations to run for.
|
many iterations to run for.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
while True:
|
async with db.lock:
|
||||||
async with db.lock:
|
if (proof_of_space_info.challenge_hash in db.active_discriminants):
|
||||||
if (proof_of_space_info.challenge_hash in db.active_discriminants):
|
writer = db.active_discriminants[proof_of_space_info.challenge_hash]
|
||||||
writer = db.active_discriminants[proof_of_space_info.challenge_hash]
|
writer.write((str(len(str(proof_of_space_info.iterations_needed)))
|
||||||
writer.write((str(len(str(proof_of_space_info.iterations_needed)))
|
+ str(proof_of_space_info.iterations_needed)).encode())
|
||||||
+ str(proof_of_space_info.iterations_needed)).encode())
|
await writer.drain()
|
||||||
await writer.drain()
|
return
|
||||||
return
|
if (proof_of_space_info.challenge_hash in db.done_discriminants):
|
||||||
if (proof_of_space_info.challenge_hash in db.done_discriminants):
|
return
|
||||||
log.info("Got iters for a finished challenge")
|
if (proof_of_space_info.challenge_hash not in db.pending_iters):
|
||||||
return
|
db.pending_iters[proof_of_space_info.challenge_hash] = []
|
||||||
await asyncio.sleep(0.5)
|
db.pending_iters[proof_of_space_info.challenge_hash].append(proof_of_space_info.iterations_needed)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue