This commit is contained in:
Yostra 2020-03-20 22:29:39 -07:00
parent 5f6789fb59
commit 7e4bc9fae0
5 changed files with 97 additions and 63 deletions

View File

@ -18,12 +18,27 @@ void CreateAndWriteProof(integer D, form x, int64_t num_iterations, WesolowskiCa
PrintInfo("Got stop signal before completing the proof!");
return ;
}
std::vector<unsigned char> bytes = ConvertIntegerToBytes(integer(num_iterations), 8);
// Writes the y, with prepended size
std::vector<unsigned char> y_size = ConvertIntegerToBytes(integer(result.y.size()), 8);
bytes.insert(bytes.end(), y_size.begin(), y_size.end());
bytes.insert(bytes.end(), result.y.begin(), result.y.end());
bytes.insert(bytes.end(), result.proof.begin(), result.proof.end());
std::string str_result = BytesToStr(bytes);
const uint32_t length = str_result.size();
std::vector<unsigned char> prefix_bytes = ConvertIntegerToBytes(integer(length), 4);
std::string prefix = BytesToStr(prefix_bytes);
std::lock_guard<std::mutex> lock(socket_mutex);
PrintInfo("Generated proof = " + str_result);;
PrintInfo("Sending length = " + to_string(length));
PrintInfo("Generated proof = " + str_result);
boost::asio::write(sock, boost::asio::buffer(prefix_bytes, 4));
boost::asio::write(sock, boost::asio::buffer(str_result.c_str(), str_result.size()));
}

View File

@ -3,6 +3,8 @@ import signal
import logging
from typing import Optional
from src.consensus.constants import constants
try:
import uvloop
except ImportError:
@ -24,7 +26,7 @@ async def main():
log = logging.getLogger(__name__)
setproctitle("chia_timelord")
timelord = Timelord(config)
timelord = Timelord(config, constants)
server = ChiaServer(config["port"], timelord, NodeType.TIMELORD)
_ = await server.start_server(config["host"], None, config)

View File

@ -1,29 +1,29 @@
import asyncio
import io
import logging
import time
from asyncio import Lock, StreamReader, StreamWriter
from typing import Dict, List, Optional, Tuple
from lib.chiavdf.inkfish.classgroup import ClassGroup
from lib.chiavdf.inkfish.create_discriminant import create_discriminant
from src.consensus.constants import constants
from lib.chiavdf.inkfish.proof_of_time import check_proof_of_time_nwesolowski
from src.protocols import timelord_protocol
from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage
from src.types.classgroup import ClassgroupElement
from src.types.proof_of_time import ProofOfTime
from src.types.sized_bytes import bytes32
from src.util.api_decorators import api_request
from src.util.ints import uint64, uint128, int512
from src.util.ints import uint8, uint64, int512
log = logging.getLogger(__name__)
class Timelord:
def __init__(
self, config: Dict, discrimant_size_bits=constants["DISCRIMINANT_SIZE_BITS"]
):
def __init__(self, config: Dict, constants: Dict):
self.constants = constants
self.config: Dict = config
self.discriminant_size_bits = discrimant_size_bits
self.ips_estimate = {
k: v
for k, v in list(
@ -44,12 +44,11 @@ class Timelord:
self.seen_discriminants: List[bytes32] = []
self.proof_count: Dict = {}
self.avg_ips: Dict = {}
self.discriminant_queue: List[Tuple[bytes32, uint128]] = []
self.discriminant_queue: List[Tuple[bytes32, uint64]] = []
self.max_connection_time = self.config["max_connection_time"]
self.potential_free_clients: List = []
self.free_clients: List[Tuple[str, StreamReader, StreamWriter]] = []
self._is_shutdown = False
self.tasks: List[asyncio.Task] = []
async def _handle_client(self, reader: StreamReader, writer: StreamWriter):
async with self.lock:
@ -219,9 +218,14 @@ class Timelord:
async def _do_process_communication(
self, challenge_hash, challenge_weight, ip, reader, writer
):
disc: int = create_discriminant(challenge_hash, self.discriminant_size_bits)
disc: int = create_discriminant(
challenge_hash, self.constants["DISCRIMINANT_SIZE_BITS"]
)
writer.write((str(len(str(disc))) + str(disc)).encode())
prefix = str(len(str(disc)))
if len(prefix) == 1:
prefix = "00" + prefix
writer.write((prefix + str(disc)).encode())
await writer.drain()
try:
@ -242,9 +246,7 @@ class Timelord:
self.active_discriminants[challenge_hash] = (writer, challenge_weight, ip)
self.active_discriminants_start_time[challenge_hash] = time.time()
self.tasks.append(
asyncio.create_task(self._send_iterations(challenge_hash, writer))
)
asyncio.create_task(self._send_iterations(challenge_hash, writer))
# Listen to the client until "STOP" is received.
while True:
@ -267,42 +269,16 @@ class Timelord:
writer.write(b"ACK")
await writer.drain()
break
elif data.decode() == "WESO":
# n-wesolowski
else:
try:
# TODO: change protocol to use bytes and same ProofOfTime format (instead of hex)
# Reads 16 bytes of hex, for the 8 byte iterations
bytes_read = await reader.readexactly(16)
iterations_needed = uint64(
int.from_bytes(
bytes.fromhex(bytes_read.decode()), "big", signed=True
)
)
bytes_read = await reader.readexactly(16)
# Reads 16 bytes of hex, for the 8 byte y_size
y_size = uint64(
int.from_bytes(
bytes.fromhex(bytes_read.decode()), "big", signed=True
)
)
# reads 2 * y_size of hex bytes
y_bytes = bytes.fromhex(
(await reader.readexactly(2 * y_size)).decode()
)
# Reads 16 bytes of hex, for the 8 byte proof size
proof_size_bytes = await reader.readexactly(16)
proof_size = int.from_bytes(
bytes.fromhex(proof_size_bytes.decode()), "big", signed=True
)
# reads 2 * proof_size of hex bytes
proof_bytes = bytes.fromhex(
(await reader.readexactly(2 * proof_size)).decode()
# This must be a proof, 4bytes is length prefix
length = int.from_bytes(data, "big")
proof = await reader.readexactly(length)
stdout_bytes_io: io.BytesIO = io.BytesIO(
bytes.fromhex(proof.decode())
)
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
e_to_str = str(e)
log.warning(f"{type(e)} {e}")
log.error(f"Socket error: {e_to_str}")
async with self.lock:
if challenge_hash in self.active_discriminants:
del self.active_discriminants[challenge_hash]
@ -311,17 +287,26 @@ class Timelord:
if challenge_hash not in self.done_discriminants:
self.done_discriminants.append(challenge_hash)
break
a = int.from_bytes(y_bytes[:129], "big", signed=True)
b = int.from_bytes(y_bytes[129:], "big", signed=True)
output = ClassgroupElement(int512(a), int512(b))
iterations_needed = uint64(
int.from_bytes(stdout_bytes_io.read(8), "big", signed=True)
)
y = ClassgroupElement.parse(stdout_bytes_io)
y_size_bytes = stdout_bytes_io.read(8)
y_size = uint64(
int.from_bytes(y_size_bytes, "big", signed=True)
)
y_bytes = stdout_bytes_io.read(y_size)
proof_bytes: bytes = stdout_bytes_io.read()
# Verifies our own proof just in case
a = int.from_bytes(y_bytes[:129], "big", signed=True)
b = int.from_bytes(y_bytes[129:], "big", signed=True)
output = ClassgroupElement(int512(a), int512(b))
proof_of_time = ProofOfTime(
challenge_hash,
iterations_needed,
@ -329,7 +314,8 @@ class Timelord:
self.config["n_wesolowski"],
proof_bytes,
)
if not proof_of_time.is_valid(self.discriminant_size_bits):
if not proof_of_time.is_valid(self.constants["DISCRIMINANT_SIZE_BITS"]):
log.error("Invalid proof of time")
response = timelord_protocol.ProofOfTimeFinished(proof_of_time)
@ -350,8 +336,6 @@ class Timelord:
async def _manage_discriminant_queue(self):
while not self._is_shutdown:
async with self.lock:
# Clear done tasks
self.tasks = [t for t in self.tasks if not t.done()]
if len(self.discriminant_queue) > 0:
max_weight = max([h for _, h in self.discriminant_queue])
if max_weight <= self.best_weight_three_proofs:
@ -469,7 +453,13 @@ class Timelord:
many iterations to run for.
"""
async with self.lock:
log.info(
f"proof_of_space_info {proof_of_space_info.challenge_hash} {proof_of_space_info.iterations_needed}"
)
if proof_of_space_info.challenge_hash in self.done_discriminants:
log.info(
f"proof_of_space_info {proof_of_space_info.challenge_hash} already done, returning"
)
return
if proof_of_space_info.challenge_hash not in self.pending_iters:
@ -483,6 +473,11 @@ class Timelord:
and proof_of_space_info.iterations_needed
not in self.submitted_iters[proof_of_space_info.challenge_hash]
):
log.info(
f"proof_of_space_info {proof_of_space_info.challenge_hash} adding "
f"{proof_of_space_info.iterations_needed} to "
f"{self.pending_iters[proof_of_space_info.challenge_hash]}"
)
self.pending_iters[proof_of_space_info.challenge_hash].append(
proof_of_space_info.iterations_needed
)
)

View File

@ -2,12 +2,12 @@ import signal
import asyncio
import logging
from src.util.logging import initialize_logging
from src.util.config import load_config_cli
from src.util.config import load_config
from asyncio import Lock
from typing import List
from setproctitle import setproctitle
config = load_config_cli("config.yaml", "timelord_launcher")
config = load_config("config.yaml", "timelord_launcher")
active_processes: List = []
stopped = False

View File

@ -1,9 +1,11 @@
from typing import Any, Dict
import signal
from typing import Any, Dict, Optional
from pathlib import Path
import asyncio
import blspy
from secrets import token_bytes
from src.consensus.constants import constants
from src.full_node.blockchain import Blockchain
from src.full_node.mempool_manager import MempoolManager
from src.full_node.store import FullNodeStore
@ -11,6 +13,7 @@ from src.full_node.full_node import FullNode
from src.server.connection import NodeType
from src.server.server import ChiaServer
from src.simulator.full_node_simulator import FullNodeSimulator
from src.timelord_launcher import spawn_process, kill_processes
from src.wallet.wallet_node import WalletNode
from src.types.full_block import FullBlock
from src.full_node.coin_store import CoinStore
@ -233,16 +236,31 @@ async def setup_introducer(port, dic={}):
server.close_all()
await server.await_closed()
async def setup_vdf_clients(port):
vdf_task = asyncio.create_task(spawn_process("127.0.0.1", port, 1))
yield vdf_task
await kill_processes()
async def setup_timelord(port, dic={}):
config = load_config("config.yaml", "timelord")
test_constants_copy = test_constants.copy()
for k in dic.keys():
test_constants_copy[k] = dic[k]
timelord = Timelord(config, test_constants_copy["DISCRIMINANT_SIZE_BITS"])
timelord = Timelord(config, test_constants_copy)
server = ChiaServer(port, timelord, NodeType.TIMELORD)
_ = await server.start_server(port, None)
_ = await server.start_server(port, None, config)
coro = asyncio.start_server(
timelord._handle_client,
config["vdf_server"]["host"],
config["vdf_server"]["port"],
loop=asyncio.get_running_loop()
)
vdf_server = asyncio.ensure_future(coro)
async def run_timelord():
async for msg in timelord._manage_discriminant_queue():
@ -252,6 +270,7 @@ async def setup_timelord(port, dic={}):
yield (timelord, server)
vdf_server.cancel()
server.close_all()
await timelord._shutdown()
await timelord_task
@ -388,6 +407,7 @@ async def setup_full_system(dic={}):
setup_harvester(21234),
setup_farmer(21235),
setup_timelord(21236),
setup_vdf_clients(8000),
setup_full_node("blockchain_test.db", 21237, 21233, dic),
setup_full_node("blockchain_test_2.db", 21238, 21233, dic),
]
@ -396,8 +416,9 @@ async def setup_full_system(dic={}):
harvester, harvester_server = await node_iters[1].__anext__()
farmer, farmer_server = await node_iters[2].__anext__()
timelord, timelord_server = await node_iters[3].__anext__()
node1, node1_server = await node_iters[4].__anext__()
node2, node2_server = await node_iters[5].__anext__()
vdf = await node_iters[4].__anext__()
node1, node1_server = await node_iters[5].__anext__()
node2, node2_server = await node_iters[6].__anext__()
await harvester_server.start_client(
PeerInfo(farmer_server._host, uint16(farmer_server._port)), None
@ -405,6 +426,7 @@ async def setup_full_system(dic={}):
await farmer_server.start_client(
PeerInfo(node1_server._host, uint16(node1_server._port)), None
)
await timelord_server.start_client(
PeerInfo(node1_server._host, uint16(node1_server._port)), None
)