From 5247032828e6b5ccbb0dc79244f7ff9ce4499f35 Mon Sep 17 00:00:00 2001 From: fchirica Date: Fri, 11 Oct 2019 01:16:41 +0300 Subject: [PATCH] Improved timelord logic --- lib/chiavdf/fast_vdf/vdf.cpp | 14 +++++-- src/blockchain.py | 1 - src/full_node.py | 16 ++++---- src/protocols/timelord_protocol.py | 4 +- src/timelord.py | 65 +++++++++++++++++++----------- 5 files changed, 62 insertions(+), 38 deletions(-) diff --git a/lib/chiavdf/fast_vdf/vdf.cpp b/lib/chiavdf/fast_vdf/vdf.cpp index 8675386c..255834ac 100644 --- a/lib/chiavdf/fast_vdf/vdf.cpp +++ b/lib/chiavdf/fast_vdf/vdf.cpp @@ -686,12 +686,12 @@ void NWesolowskiMain(integer D, form x, int64_t num_iterations, WesolowskiCallba } void PollTimelord(tcp::socket& sock, bool& got_iters) { - // Wait for 60s, if no iters come, poll each 15 seconds the timelord. + // Wait for 15s, if no iters come, poll each 5 seconds the timelord. int seconds = 0; while (!got_iters) { std::this_thread::sleep_for (std::chrono::seconds(1)); seconds++; - if (seconds >= 60 && (seconds - 60) % 15 == 0) { + if (seconds >= 15 && (seconds - 15) % 5 == 0) { socket_mutex.lock(); boost::asio::write(sock, boost::asio::buffer("POLL", 4)); socket_mutex.unlock(); @@ -801,10 +801,18 @@ void session(tcp::socket sock) { } } } + } catch (std::exception& e) { + std::cerr << "Exception in thread: " << e.what() << "\n"; + } + + try { // Tell client I've stopped everything, wait for ACK and close. + boost::system::error_code error; + + std::cout << "Stopped everything! Ready for the next challenge.\n"; + std::lock_guard lock(socket_mutex); boost::asio::write(sock, boost::asio::buffer("STOP", 4)); - std::cout << "Stopped everything! Ready for the next challenge.\n"; char ack[5]; memset(ack,0x00,sizeof(ack)); diff --git a/src/blockchain.py b/src/blockchain.py index b06411b8..f8c63f8c 100644 --- a/src/blockchain.py +++ b/src/blockchain.py @@ -361,7 +361,6 @@ class Blockchain: return False # 4. Check PoT - # TODO(Florin): Change the hardcode of the genesis with new pot format. if not block.trunk_block.proof_of_time.is_valid(self.constants["DISCRIMINANT_SIZE_BITS"]): return False diff --git a/src/full_node.py b/src/full_node.py index 7361395a..b14e66c5 100644 --- a/src/full_node.py +++ b/src/full_node.py @@ -100,7 +100,7 @@ async def send_challenges_to_timelords() -> AsyncGenerator[OutboundMessage, None async with db.lock: for head in db.blockchain.get_current_heads(): challenge_hash = head.challenge.get_hash() - requests.append(timelord_protocol.ChallengeStart(challenge_hash)) + requests.append(timelord_protocol.ChallengeStart(challenge_hash, head.challenge.height)) for request in requests: yield OutboundMessage(NodeType.TIMELORD, Message("challenge_start", request), Delivery.BROADCAST) @@ -608,8 +608,9 @@ async def block(block: peer_protocol.Block) -> AsyncGenerator[OutboundMessage, N block.block.trunk_block.challenge.height, pos_quality, difficulty) - timelord_request = timelord_protocol.ChallengeStart(block.block.trunk_block.challenge.get_hash()) - timelord_request_end = timelord_protocol.ChallengeStart(block.block.trunk_block.proof_of_time. + timelord_request = timelord_protocol.ChallengeStart(block.block.trunk_block.challenge.get_hash(), + block.block.trunk_block.challenge.height) + timelord_request_end = timelord_protocol.ChallengeEnd(block.block.trunk_block.proof_of_time. output.challenge_hash) # Tell timelord to stop previous challenge and start with new one yield OutboundMessage(NodeType.TIMELORD, Message("challenge_end", timelord_request_end), Delivery.BROADCAST) @@ -620,10 +621,9 @@ async def block(block: peer_protocol.Block) -> AsyncGenerator[OutboundMessage, N # Tell farmer about the new block yield OutboundMessage(NodeType.FARMER, Message("proof_of_space_finalized", farmer_request), Delivery.BROADCAST) - else: - # Note(Florin): This is a hack... - log.info("I've received a block, stopping the challenge to free up the VDF server...") - log.info(f"Height of received block = {block.block.trunk_block.challenge.height}") - timelord_request_end = timelord_protocol.ChallengeStart(block.block.trunk_block.proof_of_time. + elif added == ReceiveBlockResult.ADDED_AS_ORPHAN: + log.info("I've received an orphan, stopping the proof of time challenge.") + log.info(f"Height of the orphan block is {block.block.trunk_block.challenge.height}") + timelord_request_end = timelord_protocol.ChallengeEnd(block.block.trunk_block.proof_of_time. output.challenge_hash) yield OutboundMessage(NodeType.TIMELORD, Message("challenge_end", timelord_request_end), Delivery.BROADCAST) diff --git a/src/protocols/timelord_protocol.py b/src/protocols/timelord_protocol.py index 81e76584..f61ddc5e 100644 --- a/src/protocols/timelord_protocol.py +++ b/src/protocols/timelord_protocol.py @@ -1,6 +1,6 @@ from src.util.cbor_message import cbor_message from src.types.sized_bytes import bytes32 -from src.util.ints import uint64 +from src.util.ints import uint32, uint64 from src.types.proof_of_time import ProofOfTime """ @@ -20,7 +20,7 @@ class ProofOfTimeFinished: @cbor_message(tag=3001) class ChallengeStart: challenge_hash: bytes32 - + height: uint32 @cbor_message(tag=3002) class ChallengeEnd: diff --git a/src/timelord.py b/src/timelord.py index c2e11303..3a08577e 100644 --- a/src/timelord.py +++ b/src/timelord.py @@ -21,10 +21,11 @@ class Database: lock: Lock = Lock() free_servers: List[int] = [] active_discriminants: Dict = {} + pending_iters: Dict = {} + best_height = 0 done_discriminants = [] seen_discriminants = [] - counter = 0 - active_counters = [] + active_heights = [] log = logging.getLogger(__name__) @@ -43,27 +44,35 @@ async def challenge_start(challenge_start: timelord_protocol.ChallengeStart): """ disc: int = create_discriminant(challenge_start.challenge_hash, constants["DISCRIMINANT_SIZE_BITS"]) - async with db.lock: if (challenge_start.challenge_hash in db.seen_discriminants): log.info("Already seen this one... Ignoring") return db.seen_discriminants.append(challenge_start.challenge_hash) - db.counter += 1 - current_counter = db.counter - db.active_counters.append(db.counter) + db.active_heights.append(challenge_start.height) + db.best_height = max(db.best_height, challenge_start.height) # Wait for a server to become free. port: int = -1 while port == -1: async with db.lock: - if (len(db.free_servers) != 0): - port = db.free_servers[0] - db.free_servers = db.free_servers[1:] - log.info(f"Discriminant {disc} attached to port {port}.") + if (challenge_start.height <= db.best_height - 5): + db.done_discriminants.append(challenge_start.challenge_hash) + db.active_heights.remove(challenge_start.height) + log.info(f"Stopping challenge at height {challenge_start.height}") + return + assert(len(db.active_heights) > 0) + if (challenge_start.height == max(db.active_heights)): + if (len(db.free_servers) != 0): + port = db.free_servers[0] + db.free_servers = db.free_servers[1:] + log.info(f"Discriminant {disc} attached to port {port}.") + log.info(f"Height attached is {challenge_start.height}") + db.active_heights.remove(challenge_start.height) + # Poll until a server becomes free. if port == -1: - await asyncio.sleep(3) + await asyncio.sleep(0.1) # TODO(Florin): Handle connection failure (attempt another server) try: @@ -83,6 +92,13 @@ async def challenge_start(challenge_start: timelord_protocol.ChallengeStart): async with db.lock: db.active_discriminants[challenge_start.challenge_hash] = writer + async with db.lock: + if (challenge_start.challenge_hash in db.pending_iters): + for iter in db.pending_iters[challenge_start.challenge_hash]: + writer.write((str(len(str(iter))) + + str(iter)).encode()) + await writer.drain() + # Listen to the server until "STOP" is received. while True: data = await reader.readexactly(4) @@ -96,7 +112,7 @@ async def challenge_start(challenge_start: timelord_protocol.ChallengeStart): elif (data.decode() == "POLL"): async with db.lock: # If I have a newer discriminant... Free up the VDF server - if (current_counter < max(db.active_counters)): + if (challenge_start.height < max(db.active_heights)): log.info("Got poll, stopping the challenge!") writer.write(b'10') await writer.drain() @@ -156,15 +172,16 @@ async def proof_of_space_info(proof_of_space_info: timelord_protocol.ProofOfSpac many iterations to run for. """ - while True: - async with db.lock: - if (proof_of_space_info.challenge_hash in db.active_discriminants): - writer = db.active_discriminants[proof_of_space_info.challenge_hash] - writer.write((str(len(str(proof_of_space_info.iterations_needed))) - + str(proof_of_space_info.iterations_needed)).encode()) - await writer.drain() - return - if (proof_of_space_info.challenge_hash in db.done_discriminants): - log.info("Got iters for a finished challenge") - return - await asyncio.sleep(0.5) + async with db.lock: + if (proof_of_space_info.challenge_hash in db.active_discriminants): + writer = db.active_discriminants[proof_of_space_info.challenge_hash] + writer.write((str(len(str(proof_of_space_info.iterations_needed))) + + str(proof_of_space_info.iterations_needed)).encode()) + await writer.drain() + return + if (proof_of_space_info.challenge_hash in db.done_discriminants): + return + if (proof_of_space_info.challenge_hash not in db.pending_iters): + db.pending_iters[proof_of_space_info.challenge_hash] = [] + db.pending_iters[proof_of_space_info.challenge_hash].append(proof_of_space_info.iterations_needed) +