Ms.sync full sync improvements (#218)

* Refactor blockchain.py into a few different files (difficulty_adjustment.py, block_header_validation.py)
* Faster sync pipelining of requests, and parallel verification (13 minutes to 9 minutes on 2 cores)
* Memory usage of node during sync is significantly reduced, only the latest blocks are in memory
* Networking fixes and logging levels changed
* SPECIFIC delivery which allows you to send messages to a specific node id
* Check plots acts like the normal harvester
This commit is contained in:
Mariano Sorgente 2020-05-14 12:40:19 +09:00 committed by wjblanke
parent 2d0f1986b7
commit 1d21a6d777
29 changed files with 1402 additions and 1169 deletions

View File

@ -11,10 +11,13 @@ and this project does not yet adhere to [Semantic Versioning](https://semver.org
### Changed
- chiapos is now easier to compile with MSVC.
- Syncing is now faster and uses less memory
### Fixed
- Build status shield layout fixed in README.md
- Relic and thus blspy would crash on processors older than Haswell as they don't support lzc.
- chia-check-plots works with plot root (the same as harvester)
- Some networking errors are no longer printed
## [1.0beta5] aka Beta 1.5 - 2020-05-05
### Added
@ -41,7 +44,7 @@ and this project does not yet adhere to [Semantic Versioning](https://semver.org
- Currently, there is no way to restore a Coloured Coin Wallet.
## [1.0beta4] aka Beta 1.4 - 2020-04-29
### Added
### Added
- This release adds Coloured coin support with offers. Yes that is the correct spelling. Coloured coins allow you to issue a coin, token, or asset with nearly unlimited issuance plans and functionality. They support inner smart transactions so they can inherit any of the other functionality you can implement in Chialisp. Offers are especially cool as they create a truly decentralized exchange capability. Read much more about them in Bram's [blog post on Coloured coins](https://chia.net/2020/04/29/coloured-coins-launch.en.html).
- This release adds support for native Windows via a (mostly) automated installer and MacOS Mojave. Windows still requires some PowerShell command line use. You should expect ongoing improvements in ease of install and replication of the command line tools in the GUI. Again huge thanks to @dkackman for continued Windows installer development. Native Windows is currently slightly slower than the same version running in WSL 2 on the same machine for both block verification and plotting.
- We made some speed improvements that positively affected all platforms while trying to increase plotting speed in Windows.

View File

@ -45,6 +45,8 @@ class FullNodeView {
lca_height: 0,
lca_timestamp: 1585023165,
syncing: false,
sync_tip_height: 0,
sync_progress_height: 0,
difficulty: 0,
ips: 0,
min_iters: 0,
@ -226,7 +228,8 @@ class FullNodeView {
}
async update_view(redisplay_blocks) {
syncing_textfield.innerHTML = this.state.syncing ? "Yes" : "No";
let sync_info = "Verif. blocks " + this.state.sync_progress_height + "/" + this.state.sync_tip_height;
syncing_textfield.innerHTML = this.state.syncing ? sync_info : "No";
block_height_textfield.innerHTML = this.state.lca_height;
max_block_height_textfield.innerHTML = this.state.max_height;
lca_time_textfield.innerHTML = unix_to_short_date(this.state.lca_timestamp);
@ -345,7 +348,9 @@ class FullNodeView {
this.state.lca_height = blockchain_state.lca.data.height;
this.state.lca_hash = await hash_header(blockchain_state.lca);
this.state.lca_timestamp = blockchain_state.lca.data.timestamp;
this.state.syncing = blockchain_state.sync_mode;
this.state.syncing = blockchain_state.sync.sync_mode;
this.state.sync_tip_height = blockchain_state.sync.sync_tip_height;
this.state.sync_progress_height = blockchain_state.sync.sync_progress_height;
this.state.difficulty = blockchain_state.difficulty;
this.state.ips = blockchain_state.ips;
this.state.min_iters = blockchain_state.min_iters;

View File

@ -1,16 +1,16 @@
import argparse
from pathlib import Path
import logging
from blspy import PrivateKey, PublicKey
from chiapos import DiskProver, Verifier
from src.types.proof_of_space import ProofOfSpace
from src.types.sized_bytes import bytes32
from chiapos import Verifier
from src.util.config import load_config
from src.util.logging import initialize_logging
from src.util.default_root import DEFAULT_ROOT_PATH
from src.util.hash import std_hash
from src.harvester import load_plots
plot_config_filename = "plots.yaml"
config_filename = "plots.yaml"
def main():
@ -20,33 +20,22 @@ def main():
parser = argparse.ArgumentParser(description="Chia plot checking script.")
parser.add_argument(
"-n", "--num", help="Number of challenges", type=int, default=1000
"-n", "--num", help="Number of challenges", type=int, default=100
)
args = parser.parse_args()
print("Checking plots in plots.yaml")
root_path = DEFAULT_ROOT_PATH
plot_config = load_config(root_path, plot_config_filename)
config = load_config(root_path, config_filename)
initialize_logging("%(name)-22s", {"log_stdout": True}, root_path)
log = logging.getLogger(__name__)
v = Verifier()
for plot_filename, plot_info in plot_config["plots"].items():
plot_seed: bytes32 = ProofOfSpace.calculate_plot_seed(
PublicKey.from_bytes(bytes.fromhex(plot_info["pool_pk"])),
PrivateKey.from_bytes(bytes.fromhex(plot_info["sk"])).get_public_key(),
)
if not Path(plot_filename).exists():
# Tries relative path
full_path: Path = DEFAULT_ROOT_PATH / plot_filename
if not full_path.exists():
# Tries absolute path
full_path = Path(plot_filename)
if not full_path.exists():
print(f"Plot file {full_path} not found.")
continue
pr = DiskProver(str(full_path))
else:
pr = DiskProver(plot_filename)
log.info("Loading plots in plots.yaml using harvester loading code\n")
provers = load_plots(config, plot_config, None)
log.info("\n\nStarting to test each plot with {args.num} challenges each\n")
for plot_path, pr in provers.items():
total_proofs = 0
try:
for i in range(args.num):
@ -57,17 +46,22 @@ def main():
proof = pr.get_full_proof(challenge, index)
total_proofs += 1
ver_quality_str = v.validate_proof(
plot_seed, pr.get_size(), challenge, proof
pr.get_id(), pr.get_size(), challenge, proof
)
assert quality_str == ver_quality_str
except BaseException as e:
if isinstance(e, KeyboardInterrupt):
print("Interrupted, closing")
log.warning("Interrupted, closing")
return
print(f"{type(e)}: {e} error in proving/verifying for plot {plot_filename}")
print(
f"{plot_filename}: Proofs {total_proofs} / {args.num}, {round(total_proofs/float(args.num), 4)}"
)
log.error(f"{type(e)}: {e} error in proving/verifying for plot {plot_path}")
if total_proofs > 0:
log.info(
f"{plot_path}: Proofs {total_proofs} / {args.num}, {round(total_proofs/float(args.num), 4)}"
)
else:
log.error(
f"{plot_path}: Proofs {total_proofs} / {args.num}, {round(total_proofs/float(args.num), 4)}"
)
if __name__ == "__main__":

View File

@ -29,7 +29,6 @@ def make_parser(parser):
def start(args, parser):
processes: List = []
for service in services_for_groups(args.group):
if pid_path_for_service(args.root_path, service).is_file():
@ -57,14 +56,14 @@ def start(args, parser):
process.wait()
except KeyboardInterrupt:
for process, pid_path in processes:
process.kill()
process.terminate()
for process, pid_path in processes:
try:
process.wait()
pid_path_killed = pid_path.with_suffix(".pid-killed")
os.rename(pid_path, pid_path_killed)
except Exception:
pass
except Exception as e:
print(f"Exception in chia start {e} {type(e)}")
if len(processes) > 0:
print("chia start complete")
return 0

View File

@ -11,7 +11,9 @@ SERVICES_FOR_GROUP = {
"harvester": "chia_harvester".split(),
"farmer": "chia_harvester chia_farmer chia_full_node".split(),
"timelord": "chia_timelord chia_timelord_launcher chia_full_node".split(),
"wallet": [f"npm run --prefix {str(Path(__file__).parent.parent.parent / 'electron-ui')} start"],
"wallet": [
f"npm run --prefix {str(Path(__file__).parent.parent.parent / 'electron-ui')} start"
],
"wallet-server": "chia-wallet".split(),
"introducer": "chia_introducer".split(),
}

View File

@ -0,0 +1,265 @@
import logging
import time
from typing import Dict, List, Optional, Tuple
import blspy
from src.consensus.block_rewards import calculate_block_reward
from src.consensus.pot_iterations import calculate_iterations_quality
from src.full_node.difficulty_adjustment import get_next_difficulty, get_next_min_iters
from src.types.challenge import Challenge
from src.types.header import Header
from src.types.header_block import HeaderBlock
from src.types.full_block import FullBlock
from src.types.proof_of_space import ProofOfSpace
from src.types.sized_bytes import bytes32
from src.util.errors import Err
from src.util.hash import std_hash
from src.util.ints import uint32, uint64
from src.util.significant_bits import count_significant_bits
log = logging.getLogger(__name__)
async def validate_unfinished_block_header(
constants: Dict,
headers: Dict[bytes32, Header],
height_to_hash: Dict[uint32, bytes32],
block_header: Header,
proof_of_space: ProofOfSpace,
prev_header_block: Optional[HeaderBlock],
pre_validated: bool = False,
pos_quality_string: bytes32 = None,
) -> Tuple[Optional[Err], Optional[uint64]]:
"""
Block validation algorithm. Returns the number of VDF iterations that this block's
proof of time must have, if the candidate block is fully valid (except for proof of
time). The same as validate_block, but without proof of time and challenge validation.
If the block is invalid, an error code is returned.
Does NOT validate transactions and fees.
"""
if not pre_validated:
# 1. The hash of the proof of space must match header_data.proof_of_space_hash
if proof_of_space.get_hash() != block_header.data.proof_of_space_hash:
return (Err.INVALID_POSPACE_HASH, None)
# 2. The coinbase signature must be valid, according the the pool public key
pair = block_header.data.coinbase_signature.PkMessagePair(
proof_of_space.pool_pubkey, block_header.data.coinbase.name(),
)
if not block_header.data.coinbase_signature.validate([pair]):
return (Err.INVALID_COINBASE_SIGNATURE, None)
# 3. Check harvester signature of header data is valid based on harvester key
if not block_header.harvester_signature.verify(
[blspy.Util.hash256(block_header.data.get_hash())],
[proof_of_space.plot_pubkey],
):
return (Err.INVALID_HARVESTER_SIGNATURE, None)
# 4. If not genesis, the previous block must exist
if prev_header_block is not None and block_header.prev_header_hash not in headers:
return (Err.DOES_NOT_EXTEND, None)
# 5. If not genesis, the timestamp must be >= the average timestamp of last 11 blocks
# and less than 2 hours in the future (if block height < 11, average all previous blocks).
# Average is the sum, int diveded by the number of timestamps
if prev_header_block is not None:
last_timestamps: List[uint64] = []
curr = prev_header_block.header
while len(last_timestamps) < constants["NUMBER_OF_TIMESTAMPS"]:
last_timestamps.append(curr.data.timestamp)
fetched = headers.get(curr.prev_header_hash, None)
if not fetched:
break
curr = fetched
if len(last_timestamps) != constants["NUMBER_OF_TIMESTAMPS"]:
# For blocks 1 to 10, average timestamps of all previous blocks
assert curr.height == 0
prev_time: uint64 = uint64(int(sum(last_timestamps) // len(last_timestamps)))
if block_header.data.timestamp < prev_time:
return (Err.TIMESTAMP_TOO_FAR_IN_PAST, None)
if block_header.data.timestamp > time.time() + constants["MAX_FUTURE_TIME"]:
return (Err.TIMESTAMP_TOO_FAR_IN_FUTURE, None)
# 7. Extension data must be valid, if any is present
# Compute challenge of parent
challenge_hash: bytes32
if prev_header_block is not None:
challenge: Challenge = prev_header_block.challenge
challenge_hash = challenge.get_hash()
# 8. Check challenge hash of prev is the same as in pos
if challenge_hash != proof_of_space.challenge_hash:
return (Err.INVALID_POSPACE_CHALLENGE, None)
# 10. The proof of space must be valid on the challenge
if pos_quality_string is None:
pos_quality_string = proof_of_space.verify_and_get_quality_string()
if not pos_quality_string:
return (Err.INVALID_POSPACE, None)
if prev_header_block is not None:
# 11. If not genesis, the height on the previous block must be one less than on this block
if block_header.height != prev_header_block.height + 1:
return (Err.INVALID_HEIGHT, None)
else:
# 12. If genesis, the height must be 0
if block_header.height != 0:
return (Err.INVALID_HEIGHT, None)
# 13. The coinbase reward must match the block schedule
coinbase_reward = calculate_block_reward(block_header.height)
if coinbase_reward != block_header.data.coinbase.amount:
return (Err.INVALID_COINBASE_AMOUNT, None)
# 13b. The coinbase parent id must be the height
if block_header.data.coinbase.parent_coin_info != block_header.height.to_bytes(
32, "big"
):
return (Err.INVALID_COINBASE_PARENT, None)
# 13c. The fees coin parent id must be hash(hash(height))
if block_header.data.fees_coin.parent_coin_info != std_hash(
std_hash(uint32(block_header.height))
):
return (Err.INVALID_FEES_COIN_PARENT, None)
difficulty: uint64
if prev_header_block is not None:
difficulty = get_next_difficulty(
constants, headers, height_to_hash, prev_header_block.header
)
min_iters = get_next_min_iters(
constants, headers, height_to_hash, prev_header_block
)
else:
difficulty = uint64(constants["DIFFICULTY_STARTING"])
min_iters = uint64(constants["MIN_ITERS_STARTING"])
number_of_iters: uint64 = calculate_iterations_quality(
pos_quality_string, proof_of_space.size, difficulty, min_iters,
)
assert count_significant_bits(difficulty) <= constants["SIGNIFICANT_BITS"]
assert count_significant_bits(min_iters) <= constants["SIGNIFICANT_BITS"]
if prev_header_block is not None:
# 17. If not genesis, the total weight must be the parent weight + difficulty
if block_header.weight != prev_header_block.weight + difficulty:
return (Err.INVALID_WEIGHT, None)
# 18. If not genesis, the total iters must be parent iters + number_iters
if (
block_header.data.total_iters
!= prev_header_block.header.data.total_iters + number_of_iters
):
return (Err.INVALID_TOTAL_ITERS, None)
else:
# 19. If genesis, the total weight must be starting difficulty
if block_header.weight != difficulty:
return (Err.INVALID_WEIGHT, None)
# 20. If genesis, the total iters must be number iters
if block_header.data.total_iters != number_of_iters:
return (Err.INVALID_TOTAL_ITERS, None)
return (None, number_of_iters)
async def validate_finished_block_header(
constants: Dict,
headers: Dict[bytes32, Header],
height_to_hash: Dict[uint32, bytes32],
block: HeaderBlock,
prev_header_block: Optional[HeaderBlock],
genesis: bool,
pre_validated: bool = False,
pos_quality_string: bytes32 = None,
) -> Optional[Err]:
"""
Block validation algorithm. Returns None iff the candidate block is valid,
and extends something in the blockchain.
Does NOT validate transactions and fees.
"""
if not genesis:
if prev_header_block is None:
return Err.DOES_NOT_EXTEND
else:
assert prev_header_block is None
# 0. Validate unfinished block (check the rest of the conditions)
err, number_of_iters = await validate_unfinished_block_header(
constants,
headers,
height_to_hash,
block.header,
block.proof_of_space,
prev_header_block,
pre_validated,
pos_quality_string,
)
if err is not None:
return err
assert number_of_iters is not None
if block.proof_of_time is None:
return Err.BLOCK_IS_NOT_FINISHED
# 1. The number of iterations (based on quality, pos, difficulty, ips) must be the same as in the PoT
if number_of_iters != block.proof_of_time.number_of_iterations:
return Err.INVALID_NUM_ITERATIONS
# 2. the PoT must be valid, on a discriminant of size 1024, and the challenge_hash
if not pre_validated:
if not block.proof_of_time.is_valid(constants["DISCRIMINANT_SIZE_BITS"]):
return Err.INVALID_POT
# 3. If not genesis, the challenge_hash in the proof of time must match the challenge on the previous block
if not genesis:
assert prev_header_block is not None
prev_challenge: Optional[Challenge] = prev_header_block.challenge
assert prev_challenge is not None
if block.proof_of_time.challenge_hash != prev_challenge.get_hash():
return Err.INVALID_POT_CHALLENGE
else:
# 9. If genesis, the challenge hash in the proof of time must be the same as in the proof of space
assert block.proof_of_time is not None
challenge_hash = block.proof_of_time.challenge_hash
if challenge_hash != block.proof_of_space.challenge_hash:
return Err.INVALID_POSPACE_CHALLENGE
return None
def pre_validate_finished_block_header(constants: Dict, data: bytes):
"""
Validates all parts of block that don't need to be serially checked
"""
block = FullBlock.from_bytes(data)
if not block.proof_of_time:
return False, None
# 4. Check PoT
if not block.proof_of_time.is_valid(constants["DISCRIMINANT_SIZE_BITS"]):
return False, None
# 9. Check harvester signature of header data is valid based on harvester key
if not block.header.harvester_signature.verify(
[blspy.Util.hash256(block.header.data.get_hash())],
[block.proof_of_space.plot_pubkey],
):
return False, None
# 10. Check proof of space based on challenge
pos_quality_string = block.proof_of_space.verify_and_get_quality_string()
if not pos_quality_string:
return False, None
return True, bytes(pos_quality_string)

View File

@ -1,44 +1,40 @@
import asyncio
import collections
import logging
import multiprocessing
import time
from enum import Enum
from typing import Dict, List, Optional, Tuple, Set
import asyncio
import multiprocessing
import concurrent
import blspy
from typing import Dict, List, Optional, Set, Tuple
from chiabip158 import PyBIP158
from clvm.casts import int_from_bytes
from src.consensus.block_rewards import calculate_block_reward, calculate_base_fee
from src.consensus.constants import constants as consensus_constants
from src.consensus.pot_iterations import (
calculate_min_iters_from_iterations,
calculate_iterations_quality,
from src.consensus.block_rewards import calculate_base_fee
from src.full_node.block_header_validation import (
validate_unfinished_block_header,
validate_finished_block_header,
pre_validate_finished_block_header,
)
from src.full_node.block_store import BlockStore
from src.types.condition_opcodes import ConditionOpcode
from src.types.condition_var_pair import ConditionVarPair
from src.types.full_block import FullBlock, additions_for_npc
from src.full_node.coin_store import CoinStore
from src.full_node.difficulty_adjustment import get_next_difficulty, get_next_min_iters
from src.types.challenge import Challenge
from src.types.coin import Coin, hash_coin_list
from src.types.coin_record import CoinRecord
from src.types.header_block import HeaderBlock
from src.types.condition_opcodes import ConditionOpcode
from src.types.condition_var_pair import ConditionVarPair
from src.types.full_block import FullBlock, additions_for_npc
from src.types.header import Header
from src.types.header_block import HeaderBlock
from src.types.sized_bytes import bytes32
from src.full_node.coin_store import CoinStore
from src.util.errors import Err, ConsensusError
from src.util.cost_calculator import calculate_cost_of_program
from src.util.merkle_set import MerkleSet
from src.util.blockchain_check_conditions import blockchain_check_conditions_dict
from src.util.condition_tools import hash_key_pairs_for_conditions_dict
from src.util.ints import uint32, uint64
from src.types.challenge import Challenge
from src.util.cost_calculator import calculate_cost_of_program
from src.util.errors import ConsensusError, Err
from src.util.hash import std_hash
from src.util.significant_bits import (
truncate_to_significant_bits,
count_significant_bits,
)
from src.util.ints import uint32, uint64
from src.util.merkle_set import MerkleSet
log = logging.getLogger(__name__)
@ -70,8 +66,6 @@ class Blockchain:
height_to_hash: Dict[uint32, bytes32]
# All headers (but not orphans) from genesis to the tip are guaranteed to be in headers
headers: Dict[bytes32, Header]
# Process pool to verify blocks
pool: concurrent.futures.ProcessPoolExecutor
# Genesis block
genesis: FullBlock
# Unspent Store
@ -80,6 +74,11 @@ class Blockchain:
block_store: BlockStore
# Coinbase freeze period
coinbase_freeze: uint32
# Used to verify blocks in parallel
pool: concurrent.futures.ProcessPoolExecutor
# Whether blockchain is shut down or not
_shut_down: bool
# Lock to prevent simultaneous reads and writes
lock: asyncio.Lock
@ -95,6 +94,10 @@ class Blockchain:
"""
self = Blockchain()
self.lock = asyncio.Lock() # External lock handled by full node
cpu_count = multiprocessing.cpu_count()
self.pool = concurrent.futures.ProcessPoolExecutor(
max_workers=max(cpu_count - 1, 1)
)
self.constants = consensus_constants.copy()
for key, value in override_constants.items():
self.constants[key] = value
@ -103,11 +106,16 @@ class Blockchain:
self.headers = {}
self.coin_store = coin_store
self.block_store = block_store
self._shut_down = False
self.genesis = FullBlock.from_bytes(self.constants["GENESIS_BLOCK"])
self.coinbase_freeze = self.constants["COINBASE_FREEZE_PERIOD"]
await self._load_chain_from_store()
return self
def shut_down(self):
self._shut_down = True
self.pool.shutdown(wait=True)
async def _load_chain_from_store(self,) -> None:
"""
Initializes the state of the Blockchain class from the database. Sets the LCA, tips,
@ -190,7 +198,7 @@ class Blockchain:
def get_challenge(self, block: FullBlock) -> Optional[Challenge]:
if block.proof_of_time is None:
return None
if block.header_hash not in self.headers:
if block.prev_header_hash not in self.headers and block.height > 0:
return None
prev_challenge_hash = block.proof_of_space.challenge_hash
@ -199,7 +207,9 @@ class Blockchain:
if (block.height + 1) % self.constants["DIFFICULTY_EPOCH"] == self.constants[
"DIFFICULTY_DELAY"
]:
new_difficulty = self.get_next_difficulty(block.header_hash)
new_difficulty = get_next_difficulty(
self.constants, self.headers, self.height_to_hash, block.header
)
else:
new_difficulty = None
return Challenge(
@ -212,7 +222,7 @@ class Blockchain:
def get_header_block(self, block: FullBlock) -> Optional[HeaderBlock]:
challenge: Optional[Challenge] = self.get_challenge(block)
if not challenge or not block.proof_of_time:
if challenge is None or block.proof_of_time is None:
return None
return HeaderBlock(
block.proof_of_space, block.proof_of_time, challenge, block.header
@ -261,239 +271,6 @@ class Blockchain:
else:
raise ValueError("Invalid genesis block")
def get_next_difficulty(self, header_hash: bytes32) -> uint64:
"""
Returns the difficulty of the next block that extends onto header_hash.
Used to calculate the number of iterations. When changing this, also change the implementation
in wallet_state_manager.py.
"""
block: Header = self.headers[header_hash]
next_height: uint32 = uint32(block.height + 1)
if next_height < self.constants["DIFFICULTY_EPOCH"]:
# We are in the first epoch
return uint64(self.constants["DIFFICULTY_STARTING"])
# Epochs are diffined as intervals of DIFFICULTY_EPOCH blocks, inclusive and indexed at 0.
# For example, [0-2047], [2048-4095], etc. The difficulty changes DIFFICULTY_DELAY into the
# epoch, as opposed to the first block (as in Bitcoin).
elif (
next_height % self.constants["DIFFICULTY_EPOCH"]
!= self.constants["DIFFICULTY_DELAY"]
):
# Not at a point where difficulty would change
prev_block: Header = self.headers[block.prev_header_hash]
assert prev_block is not None
if prev_block is None:
raise Exception("Previous block is invalid.")
return uint64(block.weight - prev_block.weight)
# old diff curr diff new diff
# ----------|-----|----------------------|-----|-----...
# h1 h2 h3 i-1
# Height1 is the last block 2 epochs ago, so we can include the time to mine 1st block in previous epoch
height1 = uint32(
next_height
- self.constants["DIFFICULTY_EPOCH"]
- self.constants["DIFFICULTY_DELAY"]
- 1
)
# Height2 is the DIFFICULTY DELAYth block in the previous epoch
height2 = uint32(next_height - self.constants["DIFFICULTY_EPOCH"] - 1)
# Height3 is the last block in the previous epoch
height3 = uint32(next_height - self.constants["DIFFICULTY_DELAY"] - 1)
# h1 to h2 timestamps are mined on previous difficulty, while and h2 to h3 timestamps are mined on the
# current difficulty
block1, block2, block3 = None, None, None
if block not in self.get_current_tips() or height3 not in self.height_to_hash:
# This means we are either on a fork, or on one of the chains, but after the LCA,
# so we manually backtrack.
curr: Optional[Header] = block
assert curr is not None
while (
curr.height not in self.height_to_hash
or self.height_to_hash[curr.height] != curr.header_hash
):
if curr.height == height1:
block1 = curr
elif curr.height == height2:
block2 = curr
elif curr.height == height3:
block3 = curr
curr = self.headers.get(curr.prev_header_hash, None)
assert curr is not None
# Once we are before the fork point (and before the LCA), we can use the height_to_hash map
if not block1 and height1 >= 0:
# height1 could be -1, for the first difficulty calculation
block1 = self.headers[self.height_to_hash[height1]]
if not block2:
block2 = self.headers[self.height_to_hash[height2]]
if not block3:
block3 = self.headers[self.height_to_hash[height3]]
assert block2 is not None and block3 is not None
# Current difficulty parameter (diff of block h = i - 1)
Tc = self.get_next_difficulty(block.prev_header_hash)
# Previous difficulty parameter (diff of block h = i - 2048 - 1)
Tp = self.get_next_difficulty(block2.prev_header_hash)
if block1:
timestamp1 = block1.data.timestamp # i - 512 - 1
else:
# In the case of height == -1, there is no timestamp here, so assume the genesis block
# took constants["BLOCK_TIME_TARGET"] seconds to mine.
genesis = self.headers[self.height_to_hash[uint32(0)]]
timestamp1 = genesis.data.timestamp - self.constants["BLOCK_TIME_TARGET"]
timestamp2 = block2.data.timestamp # i - 2048 + 512 - 1
timestamp3 = block3.data.timestamp # i - 512 - 1
# Numerator fits in 128 bits, so big int is not necessary
# We multiply by the denominators here, so we only have one fraction in the end (avoiding floating point)
term1 = (
self.constants["DIFFICULTY_DELAY"]
* Tp
* (timestamp3 - timestamp2)
* self.constants["BLOCK_TIME_TARGET"]
)
term2 = (
(self.constants["DIFFICULTY_WARP_FACTOR"] - 1)
* (self.constants["DIFFICULTY_EPOCH"] - self.constants["DIFFICULTY_DELAY"])
* Tc
* (timestamp2 - timestamp1)
* self.constants["BLOCK_TIME_TARGET"]
)
# Round down after the division
new_difficulty_precise: uint64 = uint64(
(term1 + term2)
// (
self.constants["DIFFICULTY_WARP_FACTOR"]
* (timestamp3 - timestamp2)
* (timestamp2 - timestamp1)
)
)
# Take only DIFFICULTY_SIGNIFICANT_BITS significant bits
new_difficulty = uint64(
truncate_to_significant_bits(
new_difficulty_precise, self.constants["SIGNIFICANT_BITS"]
)
)
assert (
count_significant_bits(new_difficulty) <= self.constants["SIGNIFICANT_BITS"]
)
# Only change by a max factor, to prevent attacks, as in greenpaper, and must be at least 1
max_diff = uint64(
truncate_to_significant_bits(
self.constants["DIFFICULTY_FACTOR"] * Tc,
self.constants["SIGNIFICANT_BITS"],
)
)
min_diff = uint64(
truncate_to_significant_bits(
Tc // self.constants["DIFFICULTY_FACTOR"],
self.constants["SIGNIFICANT_BITS"],
)
)
if new_difficulty >= Tc:
return min(new_difficulty, max_diff)
else:
return max([uint64(1), new_difficulty, min_diff])
def get_next_min_iters(self, block: FullBlock) -> uint64:
"""
Returns the VDF speed in iterations per seconds, to be used for the next block. This depends on
the number of iterations of the last epoch, and changes at the same block as the difficulty.
"""
next_height: uint32 = uint32(block.height + 1)
if next_height < self.constants["DIFFICULTY_EPOCH"]:
# First epoch has a hardcoded vdf speed
return self.constants["MIN_ITERS_STARTING"]
prev_block_header: Header = self.headers[block.prev_header_hash]
proof_of_space = block.proof_of_space
difficulty = self.get_next_difficulty(prev_block_header.header_hash)
iterations = uint64(
block.header.data.total_iters - prev_block_header.data.total_iters
)
prev_min_iters = calculate_min_iters_from_iterations(
proof_of_space, difficulty, iterations
)
if (
next_height % self.constants["DIFFICULTY_EPOCH"]
!= self.constants["DIFFICULTY_DELAY"]
):
# Not at a point where ips would change, so return the previous ips
# TODO: cache this for efficiency
return prev_min_iters
# min iters (along with difficulty) will change in this block, so we need to calculate the new one.
# The calculation is (iters_2 - iters_1) // epoch size
# 1 and 2 correspond to height_1 and height_2, being the last block of the second to last, and last
# block of the last epochs. Basically, it's total iterations per block on average.
# Height1 is the last block 2 epochs ago, so we can include the iterations taken for mining first block in epoch
height1 = uint32(
next_height
- self.constants["DIFFICULTY_EPOCH"]
- self.constants["DIFFICULTY_DELAY"]
- 1
)
# Height2 is the last block in the previous epoch
height2 = uint32(next_height - self.constants["DIFFICULTY_DELAY"] - 1)
block1: Optional[Header] = None
block2: Optional[Header] = None
if block not in self.get_current_tips() or height2 not in self.height_to_hash:
# This means we are either on a fork, or on one of the chains, but after the LCA,
# so we manually backtrack.
curr: Optional[Header] = block.header
assert curr is not None
while (
curr.height not in self.height_to_hash
or self.height_to_hash[curr.height] != curr.header_hash
):
if curr.height == height1:
block1 = curr
elif curr.height == height2:
block2 = curr
curr = self.headers.get(curr.prev_header_hash, None)
assert curr is not None
# Once we are before the fork point (and before the LCA), we can use the height_to_hash map
if block1 is None and height1 >= 0:
# height1 could be -1, for the first difficulty calculation
block1 = self.headers.get(self.height_to_hash[height1], None)
if block2 is None:
block2 = self.headers.get(self.height_to_hash[height2], None)
assert block2 is not None
if block1 is not None:
iters1 = block1.data.total_iters
else:
# In the case of height == -1, iters = 0
iters1 = uint64(0)
iters2 = block2.data.total_iters
min_iters_precise = uint64(
(iters2 - iters1)
// (
self.constants["DIFFICULTY_EPOCH"]
* self.constants["MIN_ITERS_PROPORTION"]
)
)
min_iters = uint64(
truncate_to_significant_bits(
min_iters_precise, self.constants["SIGNIFICANT_BITS"]
)
)
assert count_significant_bits(min_iters) <= self.constants["SIGNIFICANT_BITS"]
return min_iters
async def receive_block(
self,
block: FullBlock,
@ -515,10 +292,33 @@ class Blockchain:
if block.prev_header_hash not in self.headers and not genesis:
return ReceiveBlockResult.DISCONNECTED_BLOCK, None, None
error_code: Optional[Err] = await self.validate_block(
block, genesis, pre_validated, pos_quality_string
prev_header_block: Optional[HeaderBlock] = None
if not genesis:
prev_full_block = await self.block_store.get_block(block.prev_header_hash)
assert prev_full_block is not None
prev_header_block = self.get_header_block(prev_full_block)
assert prev_header_block is not None
curr_header_block = self.get_header_block(block)
assert curr_header_block is not None
# Validate block header
error_code: Optional[Err] = await validate_finished_block_header(
self.constants,
self.headers,
self.height_to_hash,
curr_header_block,
prev_header_block,
genesis,
pre_validated,
pos_quality_string,
)
if error_code is not None:
return ReceiveBlockResult.INVALID_BLOCK, None, error_code
# Validate block body
error_code = await self.validate_block_body(block)
if error_code is not None:
return ReceiveBlockResult.INVALID_BLOCK, None, error_code
@ -533,312 +333,6 @@ class Blockchain:
else:
return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None
async def validate_unfinished_block(
self,
block: FullBlock,
prev_full_block: Optional[FullBlock],
pre_validated: bool = True,
pos_quality_string: bytes32 = None,
) -> Tuple[Optional[Err], Optional[uint64]]:
"""
Block validation algorithm. Returns the number of VDF iterations that this block's
proof of time must have, if the candidate block is fully valid (except for proof of
time). The same as validate_block, but without proof of time and challenge validation.
If the block is invalid, an error code is returned.
"""
if not pre_validated:
# 1. The hash of the proof of space must match header_data.proof_of_space_hash
if block.proof_of_space.get_hash() != block.header.data.proof_of_space_hash:
return (Err.INVALID_POSPACE_HASH, None)
# 2. The coinbase signature must be valid, according the the pool public key
pair = block.header.data.coinbase_signature.PkMessagePair(
block.proof_of_space.pool_pubkey, block.header.data.coinbase.name(),
)
if not block.header.data.coinbase_signature.validate([pair]):
return (Err.INVALID_COINBASE_SIGNATURE, None)
# 3. Check harvester signature of header data is valid based on harvester key
if not block.header.harvester_signature.verify(
[blspy.Util.hash256(block.header.data.get_hash())],
[block.proof_of_space.plot_pubkey],
):
return (Err.INVALID_HARVESTER_SIGNATURE, None)
# 4. If not genesis, the previous block must exist
if prev_full_block is not None and block.prev_header_hash not in self.headers:
return (Err.DOES_NOT_EXTEND, None)
# 5. If not genesis, the timestamp must be >= the average timestamp of last 11 blocks
# and less than 2 hours in the future (if block height < 11, average all previous blocks).
# Average is the sum, int diveded by the number of timestamps
if prev_full_block is not None:
last_timestamps: List[uint64] = []
curr = prev_full_block.header
while len(last_timestamps) < self.constants["NUMBER_OF_TIMESTAMPS"]:
last_timestamps.append(curr.data.timestamp)
fetched = self.headers.get(curr.prev_header_hash, None)
if not fetched:
break
curr = fetched
if len(last_timestamps) != self.constants["NUMBER_OF_TIMESTAMPS"]:
# For blocks 1 to 10, average timestamps of all previous blocks
assert curr.height == 0
prev_time: uint64 = uint64(
int(sum(last_timestamps) // len(last_timestamps))
)
if block.header.data.timestamp < prev_time:
return (Err.TIMESTAMP_TOO_FAR_IN_PAST, None)
if (
block.header.data.timestamp
> time.time() + self.constants["MAX_FUTURE_TIME"]
):
return (Err.TIMESTAMP_TOO_FAR_IN_FUTURE, None)
# 6. The compact block filter must be correct, according to the body (BIP158)
if block.header.data.filter_hash != bytes32([0] * 32):
if block.transactions_filter is None:
return (Err.INVALID_TRANSACTIONS_FILTER_HASH, None)
if std_hash(block.transactions_filter) != block.header.data.filter_hash:
return (Err.INVALID_TRANSACTIONS_FILTER_HASH, None)
elif block.transactions_filter is not None:
return (Err.INVALID_TRANSACTIONS_FILTER_HASH, None)
# 7. Extension data must be valid, if any is present
# Compute challenge of parent
challenge_hash: bytes32
if prev_full_block is not None:
challenge: Optional[Challenge] = self.get_challenge(prev_full_block)
assert challenge is not None
challenge_hash = challenge.get_hash()
# 8. Check challenge hash of prev is the same as in pos
if challenge_hash != block.proof_of_space.challenge_hash:
return (Err.INVALID_POSPACE_CHALLENGE, None)
else:
# 9. If genesis, the challenge hash in the proof of time must be the same as in the proof of space
assert block.proof_of_time is not None
challenge_hash = block.proof_of_time.challenge_hash
if challenge_hash != block.proof_of_space.challenge_hash:
return (Err.INVALID_POSPACE_CHALLENGE, None)
# 10. The proof of space must be valid on the challenge
if pos_quality_string is None:
pos_quality_string = block.proof_of_space.verify_and_get_quality_string()
if not pos_quality_string:
return (Err.INVALID_POSPACE, None)
if prev_full_block is not None:
# 11. If not genesis, the height on the previous block must be one less than on this block
if block.height != prev_full_block.height + 1:
return (Err.INVALID_HEIGHT, None)
else:
# 12. If genesis, the height must be 0
if block.height != 0:
return (Err.INVALID_HEIGHT, None)
# 13. The coinbase reward must match the block schedule
coinbase_reward = calculate_block_reward(block.height)
if coinbase_reward != block.header.data.coinbase.amount:
return (Err.INVALID_COINBASE_AMOUNT, None)
# 13b. The coinbase parent id must be the height
if block.header.data.coinbase.parent_coin_info != block.height.to_bytes(
32, "big"
):
return (Err.INVALID_COINBASE_PARENT, None)
# 13c. The fees coin parent id must be hash(hash(height))
fee_base = calculate_base_fee(block.height)
if block.header.data.fees_coin.parent_coin_info != std_hash(
std_hash(uint32(block.height))
):
return (Err.INVALID_FEES_COIN_PARENT, None)
# target reward_fee = 1/8 coinbase reward + tx fees
if block.transactions_generator is not None:
# 14. Make sure transactions generator hash is valid (or all 0 if not present)
if (
block.transactions_generator.get_tree_hash()
!= block.header.data.generator_hash
):
return (Err.INVALID_TRANSACTIONS_GENERATOR_HASH, None)
# 15. If not genesis, the transactions must be valid and fee must be valid
# Verifies that fee_base + TX fees = fee_coin.amount
err = await self._validate_transactions(block, fee_base)
if err is not None:
return (err, None)
else:
# Make sure transactions generator hash is valid (or all 0 if not present)
if block.header.data.generator_hash != bytes32(bytes([0] * 32)):
return (Err.INVALID_TRANSACTIONS_GENERATOR_HASH, None)
# 16. If genesis, the fee must be the base fee, agg_sig must be None, and merkle roots must be valid
if fee_base != block.header.data.fees_coin.amount:
return (Err.INVALID_BLOCK_FEE_AMOUNT, None)
root_error = self._validate_merkle_root(block)
if root_error:
return (root_error, None)
if block.header.data.aggregated_signature is not None:
log.error("1")
return (Err.BAD_AGGREGATE_SIGNATURE, None)
difficulty: uint64
if prev_full_block is not None:
difficulty = self.get_next_difficulty(prev_full_block.header_hash)
min_iters = self.get_next_min_iters(prev_full_block)
else:
difficulty = uint64(self.constants["DIFFICULTY_STARTING"])
min_iters = uint64(self.constants["MIN_ITERS_STARTING"])
number_of_iters: uint64 = calculate_iterations_quality(
pos_quality_string, block.proof_of_space.size, difficulty, min_iters,
)
assert count_significant_bits(difficulty) <= self.constants["SIGNIFICANT_BITS"]
assert count_significant_bits(min_iters) <= self.constants["SIGNIFICANT_BITS"]
if prev_full_block is not None:
# 17. If not genesis, the total weight must be the parent weight + difficulty
if block.weight != prev_full_block.weight + difficulty:
return (Err.INVALID_WEIGHT, None)
# 18. If not genesis, the total iters must be parent iters + number_iters
if (
block.header.data.total_iters
!= prev_full_block.header.data.total_iters + number_of_iters
):
return (Err.INVALID_TOTAL_ITERS, None)
else:
# 19. If genesis, the total weight must be starting difficulty
if block.weight != difficulty:
return (Err.INVALID_WEIGHT, None)
# 20. If genesis, the total iters must be number iters
if block.header.data.total_iters != number_of_iters:
return (Err.INVALID_TOTAL_ITERS, None)
return (None, number_of_iters)
async def validate_block(
self,
block: FullBlock,
genesis: bool = False,
pre_validated: bool = False,
pos_quality_string: bytes32 = None,
) -> Optional[Err]:
"""
Block validation algorithm. Returns true iff the candidate block is fully valid,
and extends something in the blockchain.
"""
prev_full_block: Optional[FullBlock]
if not genesis:
prev_full_block = await self.block_store.get_block(block.prev_header_hash)
if prev_full_block is None:
return Err.DOES_NOT_EXTEND
else:
prev_full_block = None
# 0. Validate unfinished block (check the rest of the conditions)
err, number_of_iters = await self.validate_unfinished_block(
block, prev_full_block, pre_validated, pos_quality_string
)
if err is not None:
return err
assert number_of_iters is not None
if block.proof_of_time is None:
return Err.BLOCK_IS_NOT_FINISHED
# 1. The number of iterations (based on quality, pos, difficulty, ips) must be the same as in the PoT
if number_of_iters != block.proof_of_time.number_of_iterations:
return Err.INVALID_NUM_ITERATIONS
# 2. the PoT must be valid, on a discriminant of size 1024, and the challenge_hash
if not pre_validated:
if not block.proof_of_time.is_valid(
self.constants["DISCRIMINANT_SIZE_BITS"]
):
return Err.INVALID_POT
# 3. If not genesis, the challenge_hash in the proof of time must match the challenge on the previous block
if not genesis:
assert prev_full_block is not None
prev_challenge: Optional[Challenge] = self.get_challenge(prev_full_block)
assert prev_challenge is not None
if block.proof_of_time.challenge_hash != prev_challenge.get_hash():
return Err.INVALID_POT_CHALLENGE
return None
async def pre_validate_blocks(
self, blocks: List[FullBlock]
) -> List[Tuple[bool, Optional[bytes32]]]:
results = []
for block in blocks:
val, pos = self.pre_validate_block_multi(bytes(block))
if pos is not None:
pos = bytes32(pos)
results.append((val, pos))
return results
async def pre_validate_blocks_multiprocessing(
self, blocks: List[FullBlock]
) -> List[Tuple[bool, Optional[bytes32]]]:
futures = []
cpu_count = multiprocessing.cpu_count()
# Pool of workers to validate blocks concurrently
pool = concurrent.futures.ProcessPoolExecutor(max_workers=max(cpu_count - 1, 1))
for block in blocks:
futures.append(
asyncio.get_running_loop().run_in_executor(
pool, self.pre_validate_block_multi, bytes(block)
)
)
results = await asyncio.gather(*futures)
for i, (val, pos) in enumerate(results):
if pos is not None:
pos = bytes32(pos)
results[i] = val, pos
pool.shutdown(wait=True)
return results
def pre_validate_block_multi(self, data) -> Tuple[bool, Optional[bytes]]:
"""
Validates all parts of FullBlock that don't need to be serially checked
"""
block = FullBlock.from_bytes(data)
if not block.proof_of_time:
return False, None
# 4. Check PoT
if not block.proof_of_time.is_valid(self.constants["DISCRIMINANT_SIZE_BITS"]):
return False, None
# 9. Check harvester signature of header data is valid based on harvester key
if not block.header.harvester_signature.verify(
[blspy.Util.hash256(block.header.data.get_hash())],
[block.proof_of_space.plot_pubkey],
):
return False, None
# 10. Check proof of space based on challenge
pos_quality_string = block.proof_of_space.verify_and_get_quality_string()
if not pos_quality_string:
return False, None
return True, bytes(pos_quality_string)
async def _reconsider_heads(
self, block: Header, genesis: bool, sync_mode: bool
) -> Tuple[bool, Optional[Header]]:
@ -990,6 +484,101 @@ class Blockchain:
await self.coin_store.add_lcas(blocks)
def get_next_difficulty(self, header_hash: bytes32) -> uint64:
return get_next_difficulty(
self.constants, self.headers, self.height_to_hash, header_hash
)
def get_next_min_iters(self, header_hash: bytes32) -> uint64:
return get_next_min_iters(
self.constants, self.headers, self.height_to_hash, header_hash
)
async def pre_validate_blocks_multiprocessing(
self, blocks: List[FullBlock]
) -> List[Tuple[bool, Optional[bytes32]]]:
futures = []
# Pool of workers to validate blocks concurrently
for block in blocks:
if self._shut_down:
return [(False, None) for _ in range(len(blocks))]
futures.append(
asyncio.get_running_loop().run_in_executor(
self.pool,
pre_validate_finished_block_header,
self.constants,
bytes(block),
)
)
results = await asyncio.gather(*futures)
for i, (val, pos) in enumerate(results):
if pos is not None:
pos = bytes32(pos)
results[i] = val, pos
return results
async def validate_unfinished_block(
self, block: FullBlock, prev_full_block: FullBlock
) -> Tuple[Optional[Err], Optional[uint64]]:
prev_hb = self.get_header_block(prev_full_block)
assert prev_hb is not None
return await validate_unfinished_block_header(
self.constants,
self.headers,
self.height_to_hash,
block.header,
block.proof_of_space,
prev_hb,
False,
)
async def validate_block_body(self, block: FullBlock) -> Optional[Err]:
"""
Validates the transactions and body of the block. Returns None if everything
validates correctly, or an Err if something does not validate.
"""
# 6. The compact block filter must be correct, according to the body (BIP158)
if block.header.data.filter_hash != bytes32([0] * 32):
if block.transactions_filter is None:
return Err.INVALID_TRANSACTIONS_FILTER_HASH
if std_hash(block.transactions_filter) != block.header.data.filter_hash:
return Err.INVALID_TRANSACTIONS_FILTER_HASH
elif block.transactions_filter is not None:
return Err.INVALID_TRANSACTIONS_FILTER_HASH
fee_base = calculate_base_fee(block.height)
# target reward_fee = 1/8 coinbase reward + tx fees
if block.transactions_generator is not None:
# 14. Make sure transactions generator hash is valid (or all 0 if not present)
if (
block.transactions_generator.get_tree_hash()
!= block.header.data.generator_hash
):
return Err.INVALID_TRANSACTIONS_GENERATOR_HASH
# 15. If not genesis, the transactions must be valid and fee must be valid
# Verifies that fee_base + TX fees = fee_coin.amount
err = await self._validate_transactions(block, fee_base)
if err is not None:
return err
else:
# Make sure transactions generator hash is valid (or all 0 if not present)
if block.header.data.generator_hash != bytes32(bytes([0] * 32)):
return Err.INVALID_TRANSACTIONS_GENERATOR_HASH
# 16. If genesis, the fee must be the base fee, agg_sig must be None, and merkle roots must be valid
if fee_base != block.header.data.fees_coin.amount:
return Err.INVALID_BLOCK_FEE_AMOUNT
root_error = self._validate_merkle_root(block)
if root_error:
return root_error
if block.header.data.aggregated_signature is not None:
log.error("1")
return Err.BAD_AGGREGATE_SIGNATURE
return None
def _validate_merkle_root(
self,
block: FullBlock,
@ -1241,6 +830,7 @@ class Blockchain:
)
# Verify aggregated signature
# TODO: move this to pre_validate_blocks_multiprocessing so we can sync faster
if not block.header.data.aggregated_signature:
return Err.BAD_AGGREGATE_SIGNATURE
if not block.header.data.aggregated_signature.validate(hash_key_pairs):

View File

@ -0,0 +1,240 @@
from typing import Dict, Optional, Union
from src.consensus.pot_iterations import calculate_min_iters_from_iterations
from src.types.full_block import FullBlock
from src.types.header import Header
from src.types.header_block import HeaderBlock
from src.types.sized_bytes import bytes32
from src.util.ints import uint32, uint64
from src.util.significant_bits import (
count_significant_bits,
truncate_to_significant_bits,
)
def get_next_difficulty(
constants: Dict,
headers: Dict[bytes32, Header],
height_to_hash: Dict[uint32, bytes32],
block: Header,
) -> uint64:
"""
Returns the difficulty of the next block that extends onto block.
Used to calculate the number of iterations. When changing this, also change the implementation
in wallet_state_manager.py.
"""
next_height: uint32 = uint32(block.height + 1)
if next_height < constants["DIFFICULTY_EPOCH"]:
# We are in the first epoch
return uint64(constants["DIFFICULTY_STARTING"])
# Epochs are diffined as intervals of DIFFICULTY_EPOCH blocks, inclusive and indexed at 0.
# For example, [0-2047], [2048-4095], etc. The difficulty changes DIFFICULTY_DELAY into the
# epoch, as opposed to the first block (as in Bitcoin).
elif next_height % constants["DIFFICULTY_EPOCH"] != constants["DIFFICULTY_DELAY"]:
# Not at a point where difficulty would change
prev_block: Header = headers[block.prev_header_hash]
return uint64(block.weight - prev_block.weight)
# old diff curr diff new diff
# ----------|-----|----------------------|-----|-----...
# h1 h2 h3 i-1
# Height1 is the last block 2 epochs ago, so we can include the time to mine 1st block in previous epoch
height1 = uint32(
next_height - constants["DIFFICULTY_EPOCH"] - constants["DIFFICULTY_DELAY"] - 1
)
# Height2 is the DIFFICULTY DELAYth block in the previous epoch
height2 = uint32(next_height - constants["DIFFICULTY_EPOCH"] - 1)
# Height3 is the last block in the previous epoch
height3 = uint32(next_height - constants["DIFFICULTY_DELAY"] - 1)
# h1 to h2 timestamps are mined on previous difficulty, while and h2 to h3 timestamps are mined on the
# current difficulty
block1, block2, block3 = None, None, None
# We need to backtrack until we merge with the LCA chain, so we can use the height_to_hash dict.
# This is important if we are on a fork, or beyond the LCA.
curr: Optional[Header] = block
assert curr is not None
while (
curr.height not in height_to_hash
or height_to_hash[curr.height] != curr.header_hash
):
if curr.height == height1:
block1 = curr
elif curr.height == height2:
block2 = curr
elif curr.height == height3:
block3 = curr
curr = headers.get(curr.prev_header_hash, None)
assert curr is not None
# Once we are before the fork point (and before the LCA), we can use the height_to_hash map
if not block1 and height1 >= 0:
# height1 could be -1, for the first difficulty calculation
block1 = headers[height_to_hash[height1]]
if not block2:
block2 = headers[height_to_hash[height2]]
if not block3:
block3 = headers[height_to_hash[height3]]
assert block2 is not None and block3 is not None
# Current difficulty parameter (diff of block h = i - 1)
Tc = get_next_difficulty(
constants, headers, height_to_hash, headers[block.prev_header_hash]
)
# Previous difficulty parameter (diff of block h = i - 2048 - 1)
Tp = get_next_difficulty(
constants, headers, height_to_hash, headers[block2.prev_header_hash]
)
if block1:
timestamp1 = block1.data.timestamp # i - 512 - 1
else:
# In the case of height == -1, there is no timestamp here, so assume the genesis block
# took constants["BLOCK_TIME_TARGET"] seconds to mine.
genesis = headers[height_to_hash[uint32(0)]]
timestamp1 = genesis.data.timestamp - constants["BLOCK_TIME_TARGET"]
timestamp2 = block2.data.timestamp # i - 2048 + 512 - 1
timestamp3 = block3.data.timestamp # i - 512 - 1
# Numerator fits in 128 bits, so big int is not necessary
# We multiply by the denominators here, so we only have one fraction in the end (avoiding floating point)
term1 = (
constants["DIFFICULTY_DELAY"]
* Tp
* (timestamp3 - timestamp2)
* constants["BLOCK_TIME_TARGET"]
)
term2 = (
(constants["DIFFICULTY_WARP_FACTOR"] - 1)
* (constants["DIFFICULTY_EPOCH"] - constants["DIFFICULTY_DELAY"])
* Tc
* (timestamp2 - timestamp1)
* constants["BLOCK_TIME_TARGET"]
)
# Round down after the division
new_difficulty_precise: uint64 = uint64(
(term1 + term2)
// (
constants["DIFFICULTY_WARP_FACTOR"]
* (timestamp3 - timestamp2)
* (timestamp2 - timestamp1)
)
)
# Take only DIFFICULTY_SIGNIFICANT_BITS significant bits
new_difficulty = uint64(
truncate_to_significant_bits(
new_difficulty_precise, constants["SIGNIFICANT_BITS"]
)
)
assert count_significant_bits(new_difficulty) <= constants["SIGNIFICANT_BITS"]
# Only change by a max factor, to prevent attacks, as in greenpaper, and must be at least 1
max_diff = uint64(
truncate_to_significant_bits(
constants["DIFFICULTY_FACTOR"] * Tc, constants["SIGNIFICANT_BITS"],
)
)
min_diff = uint64(
truncate_to_significant_bits(
Tc // constants["DIFFICULTY_FACTOR"], constants["SIGNIFICANT_BITS"],
)
)
if new_difficulty >= Tc:
return min(new_difficulty, max_diff)
else:
return max([uint64(1), new_difficulty, min_diff])
def get_next_min_iters(
constants: Dict,
headers: Dict[bytes32, Header],
height_to_hash: Dict[uint32, bytes32],
block: Union[FullBlock, HeaderBlock],
) -> uint64:
"""
Returns the VDF speed in iterations per seconds, to be used for the next block. This depends on
the number of iterations of the last epoch, and changes at the same block as the difficulty.
"""
next_height: uint32 = uint32(block.height + 1)
if next_height < constants["DIFFICULTY_EPOCH"]:
# First epoch has a hardcoded vdf speed
return constants["MIN_ITERS_STARTING"]
prev_block_header: Header = headers[block.prev_header_hash]
proof_of_space = block.proof_of_space
difficulty = get_next_difficulty(
constants, headers, height_to_hash, prev_block_header
)
iterations = uint64(
block.header.data.total_iters - prev_block_header.data.total_iters
)
prev_min_iters = calculate_min_iters_from_iterations(
proof_of_space, difficulty, iterations
)
if next_height % constants["DIFFICULTY_EPOCH"] != constants["DIFFICULTY_DELAY"]:
# Not at a point where ips would change, so return the previous ips
# TODO: cache this for efficiency
return prev_min_iters
# min iters (along with difficulty) will change in this block, so we need to calculate the new one.
# The calculation is (iters_2 - iters_1) // epoch size
# 1 and 2 correspond to height_1 and height_2, being the last block of the second to last, and last
# block of the last epochs. Basically, it's total iterations per block on average.
# Height1 is the last block 2 epochs ago, so we can include the iterations taken for mining first block in epoch
height1 = uint32(
next_height - constants["DIFFICULTY_EPOCH"] - constants["DIFFICULTY_DELAY"] - 1
)
# Height2 is the last block in the previous epoch
height2 = uint32(next_height - constants["DIFFICULTY_DELAY"] - 1)
block1: Optional[Header] = None
block2: Optional[Header] = None
# We need to backtrack until we merge with the LCA chain, so we can use the height_to_hash dict.
# This is important if we are on a fork, or beyond the LCA.
curr: Optional[Header] = block.header
assert curr is not None
while (
curr.height not in height_to_hash
or height_to_hash[curr.height] != curr.header_hash
):
if curr.height == height1:
block1 = curr
elif curr.height == height2:
block2 = curr
curr = headers.get(curr.prev_header_hash, None)
assert curr is not None
# Once we are before the fork point (and before the LCA), we can use the height_to_hash map
if block1 is None and height1 >= 0:
# height1 could be -1, for the first difficulty calculation
block1 = headers.get(height_to_hash[height1], None)
if block2 is None:
block2 = headers.get(height_to_hash[height2], None)
assert block2 is not None
if block1 is not None:
iters1 = block1.data.total_iters
else:
# In the case of height == -1, iters = 0
iters1 = uint64(0)
iters2 = block2.data.total_iters
min_iters_precise = uint64(
(iters2 - iters1)
// (constants["DIFFICULTY_EPOCH"] * constants["MIN_ITERS_PROPORTION"])
)
min_iters = uint64(
truncate_to_significant_bits(min_iters_precise, constants["SIGNIFICANT_BITS"])
)
assert count_significant_bits(min_iters) <= constants["SIGNIFICANT_BITS"]
return min_iters

View File

@ -1,20 +1,27 @@
import asyncio
import concurrent
import logging
import traceback
import time
from asyncio import Event
from pathlib import Path
from typing import AsyncGenerator, List, Optional, Tuple, Dict, Type
import aiosqlite
from typing import AsyncGenerator, Dict, List, Optional, Tuple, Type
import aiosqlite
from chiabip158 import PyBIP158
from chiapos import Verifier
from src.full_node.blockchain import Blockchain, ReceiveBlockResult
from src.consensus.block_rewards import calculate_base_fee
from src.consensus.constants import constants as consensus_constants
from src.consensus.block_rewards import calculate_base_fee
from src.consensus.pot_iterations import calculate_iterations
from src.consensus.weight_verifier import verify_weight
from src.full_node.block_store import BlockStore
from src.full_node.blockchain import Blockchain, ReceiveBlockResult
from src.full_node.coin_store import CoinStore
from src.full_node.full_node_store import FullNodeStore
from src.full_node.mempool_manager import MempoolManager
from src.full_node.sync_blocks_processor import SyncBlocksProcessor
from src.full_node.sync_peers_handler import SyncPeersHandler
from src.full_node.sync_store import SyncStore
from src.protocols import (
farmer_protocol,
full_node_protocol,
@ -22,34 +29,29 @@ from src.protocols import (
wallet_protocol,
)
from src.protocols.wallet_protocol import GeneratorResponse
from src.types.mempool_item import MempoolItem
from src.util.merkle_set import MerkleSet
from src.util.bundle_tools import best_solution_program
from src.full_node.mempool_manager import MempoolManager
from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage
from src.server.server import ChiaServer
from src.types.challenge import Challenge
from src.types.full_block import FullBlock
from src.types.coin import Coin, hash_coin_list
from src.types.BLSSignature import BLSSignature
from src.util.cost_calculator import calculate_cost_of_program
from src.util.hash import std_hash
from src.types.spend_bundle import SpendBundle
from src.types.program import Program
from src.types.challenge import Challenge
from src.types.coin import Coin, hash_coin_list
from src.types.full_block import FullBlock
from src.types.header import Header, HeaderData
from src.types.header_block import HeaderBlock
from src.types.mempool_inclusion_status import MempoolInclusionStatus
from src.types.mempool_item import MempoolItem
from src.types.peer_info import PeerInfo
from src.types.program import Program
from src.types.proof_of_space import ProofOfSpace
from src.types.sized_bytes import bytes32
from src.full_node.coin_store import CoinStore
from src.types.spend_bundle import SpendBundle
from src.util.api_decorators import api_request
from src.util.bundle_tools import best_solution_program
from src.util.cost_calculator import calculate_cost_of_program
from src.util.errors import ConsensusError, Err
from src.util.hash import std_hash
from src.util.ints import uint32, uint64, uint128
from src.util.errors import Err, ConsensusError
from src.types.mempool_inclusion_status import MempoolInclusionStatus
from src.util.merkle_set import MerkleSet
from src.util.path import mkdir, path_from_root
from src.full_node.block_store import BlockStore
from src.full_node.full_node_store import FullNodeStore
from src.full_node.sync_store import SyncStore
OutboundMessageGenerator = AsyncGenerator[OutboundMessage, None]
@ -61,6 +63,7 @@ class FullNode:
coin_store: CoinStore
mempool_manager: MempoolManager
connection: aiosqlite.Connection
sync_peers_handler: Optional[SyncPeersHandler]
blockchain: Blockchain
config: Dict
server: Optional[ChiaServer]
@ -82,6 +85,7 @@ class FullNode:
self.server = None
self._shut_down = False # Set to true to close all infinite loops
self.constants = consensus_constants.copy()
self.sync_peers_handler = None
for key, value in override_constants.items():
self.constants[key] = value
if name:
@ -96,7 +100,7 @@ class FullNode:
self.connection = await aiosqlite.connect(db_path)
self.block_store = await BlockStore.create(self.connection)
self.full_node_store = await FullNodeStore.create(self.connection)
self.sync_store = await SyncStore.create(self.connection)
self.sync_store = await SyncStore.create()
self.coin_store = await CoinStore.create(self.connection)
self.log.info("Initializing blockchain from disk")
@ -134,7 +138,7 @@ class FullNode:
challenge_hash = challenge.get_hash()
if tip.height > 0:
difficulty: uint64 = self.blockchain.get_next_difficulty(
tip.prev_header_hash
self.blockchain.headers[tip.prev_header_hash]
)
else:
difficulty = uint64(tip.weight)
@ -274,8 +278,11 @@ class FullNode:
self.introducer_task = asyncio.create_task(introducer_client())
async def _shutdown(self):
def _close(self):
self._shut_down = True
self.blockchain.shut_down()
async def _await_closed(self):
await self.connection.close()
async def _sync(self) -> OutboundMessageGenerator:
@ -283,17 +290,18 @@ class FullNode:
Performs a full sync of the blockchain.
- Check which are the heaviest tips
- Request headers for the heaviest
- Find the fork point to see where to start downloading headers
- Verify the weight of the tip, using the headers
- Find the fork point to see where to start downloading blocks
- Blacklist peers that provide invalid blocks
- Sync blockchain up to heads (request blocks in batches)
- Download all blocks
- Disconnect peers that provide invalid blocks or don't have the blocks
"""
self.log.info("Starting to perform sync with peers.")
self.log.info("Waiting to receive tips from peers.")
self.sync_peers_handler = None
self.sync_store.set_waiting_for_tips(True)
# TODO: better way to tell that we have finished receiving tips
# TODO: fix DOS issue. Attacker can request syncing to an invalid blockchain
await asyncio.sleep(5)
await asyncio.sleep(2)
highest_weight: uint128 = uint128(0)
tip_block: FullBlock
tip_height = 0
@ -307,6 +315,9 @@ class FullNode:
Tuple[bytes32, FullBlock]
] = self.sync_store.get_potential_tips_tuples()
self.log.info(f"Have collected {len(potential_tips)} potential tips")
if self._shut_down:
return
for header_hash, potential_tip_block in potential_tips:
if potential_tip_block.proof_of_time is None:
raise ValueError(
@ -327,17 +338,14 @@ class FullNode:
f"Tip block {tip_block.header_hash} tip height {tip_block.height}"
)
for height in range(0, tip_block.height + 1):
self.sync_store.set_potential_headers_received(uint32(height), Event())
self.sync_store.set_potential_blocks_received(uint32(height), Event())
self.sync_store.set_potential_hashes_received(Event())
self.sync_store.set_potential_hashes_received(Event())
timeout = 200
sleep_interval = 10
total_time_slept = 0
# TODO: verify weight here once we have the correct protocol messages (interative flyclient)
while True:
if total_time_slept > timeout:
if total_time_slept > 30:
raise TimeoutError("Took too long to fetch header hashes.")
if self._shut_down:
return
@ -361,259 +369,82 @@ class FullNode:
# Finding the fork point allows us to only download headers and blocks from the fork point
header_hashes = self.sync_store.get_potential_hashes()
fork_point_height: uint32 = self.blockchain.find_fork_point_alternate_chain(
header_hashes
)
async with self.blockchain.lock:
# Lock blockchain so we can copy over the headers without any reorgs
fork_point_height: uint32 = self.blockchain.find_fork_point_alternate_chain(
header_hashes
)
fork_point_hash: bytes32 = header_hashes[fork_point_height]
self.log.info(f"Fork point: {fork_point_hash} at height {fork_point_height}")
# Now, we download all of the headers in order to verify the weight, in batches
headers: List[HeaderBlock] = []
assert self.server is not None
peers = [
con.node_id
for con in self.server.global_connections.get_connections()
if (con.node_id is not None and con.connection_type == NodeType.FULL_NODE)
]
# Download headers in batches. We download a few batches ahead in case there are delays or peers
# that don't have the headers that we need.
last_request_time: float = 0
highest_height_requested: uint32 = uint32(0)
request_made: bool = False
for height_checkpoint in range(
fork_point_height + 1, tip_height + 1, self.config["max_headers_to_send"]
):
end_height = min(
height_checkpoint + self.config["max_headers_to_send"], tip_height + 1
)
total_time_slept = 0
while True:
if self._shut_down:
return
if total_time_slept > timeout:
raise TimeoutError("Took too long to fetch blocks")
# Request batches that we don't have yet
for batch in range(0, self.config["num_sync_batches"]):
batch_start = (
height_checkpoint + batch * self.config["max_headers_to_send"]
)
batch_end = min(
batch_start + self.config["max_headers_to_send"], tip_height + 1
)
if batch_start > tip_height:
# We have asked for all blocks
break
blocks_missing = any(
[
not (
self.sync_store.get_potential_headers_received(
uint32(h)
)
).is_set()
for h in range(batch_start, batch_end)
]
)
if (
time.time() - last_request_time > sleep_interval
and blocks_missing
) or (batch_end - 1) > highest_height_requested:
# If we are missing header blocks in this batch, and we haven't made a request in a while,
# Make a request for this batch. Also, if we have never requested this batch, make
# the request
if batch_end - 1 > highest_height_requested:
highest_height_requested = batch_end - 1
request_made = True
request_hb = full_node_protocol.RequestHeaderBlock(
batch_start, header_hashes[batch_start],
)
self.log.info(f"Requesting header block {batch_start}.")
yield OutboundMessage(
NodeType.FULL_NODE,
Message("request_header_block", request_hb),
Delivery.RANDOM,
)
if request_made:
# Reset the timer for requests, so we don't overload other peers with requests
last_request_time = time.time()
request_made = False
# Wait for the first batch (the next "max_blocks_to_send" blocks to arrive)
awaitables = [
(
self.sync_store.get_potential_headers_received(uint32(height))
).wait()
for height in range(height_checkpoint, end_height)
]
future = asyncio.gather(*awaitables, return_exceptions=True)
try:
await asyncio.wait_for(future, timeout=sleep_interval)
break
except concurrent.futures.TimeoutError:
try:
await future
except asyncio.CancelledError:
pass
total_time_slept += sleep_interval
self.log.info(f"Did not receive desired header blocks")
for h in range(fork_point_height + 1, tip_height + 1):
header = self.sync_store.get_potential_header(uint32(h))
assert header is not None
headers.append(header)
self.log.info(f"Downloaded headers up to tip height: {tip_height}")
if not verify_weight(
tip_block.header, headers, self.blockchain.headers[fork_point_hash],
):
raise ConsensusError(Err.INVALID_WEIGHT, [tip_block.header])
self.log.info(
f"Validated weight of headers. Downloaded {len(headers)} headers, tip height {tip_height}"
self.sync_peers_handler = SyncPeersHandler(
self.sync_store, peers, fork_point_height, self.blockchain
)
assert tip_height == fork_point_height + len(headers)
self.sync_store.clear_potential_headers()
headers.clear()
# Download blocks in batches, and verify them as they come in. We download a few batches ahead,
# in case there are delays.
last_request_time = 0
highest_height_requested = uint32(0)
request_made = False
# Start processing blocks that we have received (no block yet)
block_processor = SyncBlocksProcessor(
self.sync_store, fork_point_height, uint32(tip_height), self.blockchain,
)
block_processor_task = asyncio.create_task(block_processor.process())
for height_checkpoint in range(
fork_point_height + 1, tip_height + 1, self.config["max_blocks_to_send"]
):
end_height = min(
height_checkpoint + self.config["max_blocks_to_send"], tip_height + 1
while not self.sync_peers_handler.done():
# Periodically checks for done, timeouts, shutdowns, new peers or disconnected peers.
if self._shut_down:
block_processor.shut_down()
break
if block_processor_task.done():
break
async for msg in self.sync_peers_handler.monitor_timeouts():
yield msg # Disconnects from peers that are not responding
cur_peers = [
con.node_id
for con in self.server.global_connections.get_connections()
if (
con.node_id is not None
and con.connection_type == NodeType.FULL_NODE
)
]
for node_id in cur_peers:
if node_id not in peers:
self.sync_peers_handler.new_node_connected(node_id)
for node_id in peers:
if node_id not in cur_peers:
# Disconnected peer, removes requests that are being sent to it
self.sync_peers_handler.node_disconnected(node_id)
peers = cur_peers
async for msg in self.sync_peers_handler._add_to_request_sets():
yield msg # Send more requests if we can
await asyncio.sleep(2)
# Awaits for all blocks to be processed, a timeout to happen, or the node to shutdown
await block_processor_task
block_processor_task.result() # If there was a timeout, this will raise TimeoutError
if self._shut_down:
return
current_tips = self.blockchain.get_current_tips()
assert max([h.height for h in current_tips]) == tip_height
self.full_node_store.set_proof_of_time_estimate_ips(
self.blockchain.get_next_min_iters(tip_block)
// (
self.constants["BLOCK_TIME_TARGET"]
/ self.constants["MIN_ITERS_PROPORTION"]
)
)
total_time_slept = 0
while True:
if self._shut_down:
return
if total_time_slept > timeout:
raise TimeoutError("Took too long to fetch blocks")
# Request batches that we don't have yet
for batch in range(0, self.config["num_sync_batches"]):
batch_start = (
height_checkpoint + batch * self.config["max_blocks_to_send"]
)
batch_end = min(
batch_start + self.config["max_blocks_to_send"], tip_height + 1
)
if batch_start > tip_height:
# We have asked for all blocks
break
blocks_missing = any(
[
not (
self.sync_store.get_potential_blocks_received(uint32(h))
).is_set()
for h in range(batch_start, batch_end)
]
)
if (
time.time() - last_request_time > sleep_interval
and blocks_missing
) or (batch_end - 1) > highest_height_requested:
# If we are missing blocks in this batch, and we haven't made a request in a while,
# Make a request for this batch. Also, if we have never requested this batch, make
# the request
self.log.info(f"Requesting sync block {batch_start}")
if batch_end - 1 > highest_height_requested:
highest_height_requested = batch_end - 1
request_made = True
request_sync = full_node_protocol.RequestBlock(
batch_start, header_hashes[batch_start],
)
yield OutboundMessage(
NodeType.FULL_NODE,
Message("request_block", request_sync),
Delivery.RANDOM,
)
if request_made:
# Reset the timer for requests, so we don't overload other peers with requests
last_request_time = time.time()
request_made = False
# Wait for the first batch (the next "max_blocks_to_send" blocks to arrive)
awaitables = [
(
self.sync_store.get_potential_blocks_received(uint32(height))
).wait()
for height in range(height_checkpoint, end_height)
]
future = asyncio.gather(*awaitables, return_exceptions=True)
try:
await asyncio.wait_for(future, timeout=sleep_interval)
break
except concurrent.futures.TimeoutError:
try:
await future
except asyncio.CancelledError:
pass
total_time_slept += sleep_interval
self.log.info("Did not receive desired blocks")
# Verifies this batch, which we are guaranteed to have (since we broke from the above loop)
blocks = []
for height in range(height_checkpoint, end_height):
b: Optional[FullBlock] = await self.sync_store.get_potential_block(
uint32(height)
)
assert b is not None
blocks.append(b)
validation_start_time = time.time()
prevalidate_results = await self.blockchain.pre_validate_blocks(blocks)
index = 0
for height in range(height_checkpoint, end_height):
if self._shut_down:
return
block: Optional[FullBlock] = await self.sync_store.get_potential_block(
uint32(height)
)
assert block is not None
# The block gets permanantly added to the blockchain
validated, pos = prevalidate_results[index]
index += 1
async with self.blockchain.lock:
(
result,
header_block,
error_code,
) = await self.blockchain.receive_block(
block, validated, pos, sync_mode=True
)
if (
result == ReceiveBlockResult.INVALID_BLOCK
or result == ReceiveBlockResult.DISCONNECTED_BLOCK
):
if error_code is not None:
raise ConsensusError(error_code, block.header_hash)
raise RuntimeError(f"Invalid block {block.header_hash}")
assert (
max([h.height for h in self.blockchain.get_current_tips()])
>= height
)
self.full_node_store.set_proof_of_time_estimate_ips(
self.blockchain.get_next_min_iters(block)
// (
self.constants["BLOCK_TIME_TARGET"]
/ self.constants["MIN_ITERS_PROPORTION"]
)
)
self.log.info(
f"Took {time.time() - validation_start_time} seconds to validate and add blocks "
f"{height_checkpoint} to {end_height}."
)
assert max([h.height for h in self.blockchain.get_current_tips()]) == tip_height
self.log.info(
f"Finished sync up to height {tip_height}. Total time: "
f"{round((time.time() - sync_start_time)/60, 2)} minutes."
@ -625,7 +456,7 @@ class FullNode:
blocks that we have finalized recently.
"""
potential_fut_blocks = (self.sync_store.get_potential_future_blocks()).copy()
self.full_node_store.set_sync_mode(False)
self.sync_store.set_sync_mode(False)
async with self.blockchain.lock:
await self.sync_store.clear_sync_info()
@ -688,7 +519,7 @@ class FullNode:
Requests a full transaction if we haven't seen it previously, and if the fees are enough.
"""
# Ignore if syncing
if self.full_node_store.get_sync_mode():
if self.sync_store.get_sync_mode():
return
# Ignore if already seen
if self.mempool_manager.seen(transaction.transaction_id):
@ -710,7 +541,7 @@ class FullNode:
) -> OutboundMessageGenerator:
""" Peer has requested a full transaction from us. """
# Ignore if syncing
if self.full_node_store.get_sync_mode():
if self.sync_store.get_sync_mode():
return
spend_bundle = self.mempool_manager.get_spendbundle(request.transaction_id)
if spend_bundle is None:
@ -740,7 +571,7 @@ class FullNode:
If tx is added to mempool, send tx_id to others. (new_transaction)
"""
# Ignore if syncing
if self.full_node_store.get_sync_mode():
if self.sync_store.get_sync_mode():
return
async with self.blockchain.lock:
cost, status, error = await self.mempool_manager.add_spendbundle(
@ -1286,23 +1117,17 @@ class FullNode:
Receive header blocks from a peer.
"""
self.log.info(f"Received header block {request.header_block.height}.")
self.sync_store.add_potential_header(request.header_block)
(
self.sync_store.get_potential_headers_received(request.header_block.height)
).set()
for _ in []: # Yields nothing
yield _
if self.sync_peers_handler is not None:
async for req in self.sync_peers_handler.new_block(request.header_block):
yield req
@api_request
async def reject_header_block_request(
self, request: full_node_protocol.RejectHeaderBlockRequest
) -> OutboundMessageGenerator:
self.log.warning(f"Reject header block request, {request}")
if self.full_node_store.get_sync_mode():
if self.sync_store.get_sync_mode():
yield OutboundMessage(NodeType.FULL_NODE, Message("", None), Delivery.CLOSE)
for _ in []:
yield _
@api_request
async def request_header_hash(
@ -1391,7 +1216,7 @@ class FullNode:
encoded_filter = bytes(bip158.GetEncoded())
proof_of_space_hash: bytes32 = request.proof_of_space.get_hash()
difficulty = self.blockchain.get_next_difficulty(target_tip.header_hash)
difficulty = self.blockchain.get_next_difficulty(target_tip)
assert target_tip_block is not None
vdf_min_iters: uint64 = self.blockchain.get_next_min_iters(target_tip_block)
@ -1544,7 +1369,7 @@ class FullNode:
unfinished_block_obj.transactions_filter,
)
if self.full_node_store.get_sync_mode():
if self.sync_store.get_sync_mode():
self.sync_store.add_potential_future_block(new_full_block)
else:
async for msg in self.respond_block(
@ -1581,7 +1406,7 @@ class FullNode:
"""
Receive a full block from a peer full node (or ourselves).
"""
if self.full_node_store.get_sync_mode():
if self.sync_store.get_sync_mode():
# This is a tip sent to us by another peer
if self.sync_store.get_waiting_for_tips():
# Add the block to our potential tips list
@ -1589,16 +1414,9 @@ class FullNode:
return
# This is a block we asked for during sync
await self.sync_store.add_potential_block(respond_block.block)
if (
self.full_node_store.get_sync_mode()
and respond_block.block.height
in self.sync_store.potential_blocks_received
):
# If we are still in sync mode, set it
self.sync_store.get_potential_blocks_received(
respond_block.block.height
).set()
if self.sync_peers_handler is not None:
async for req in self.sync_peers_handler.new_block(respond_block.block):
yield req
return
# Adds the block to seen, and check if it's seen before (which means header is in memory)
@ -1606,16 +1424,12 @@ class FullNode:
if self.blockchain.contains_block(header_hash):
return
prevalidate_block = await self.blockchain.pre_validate_blocks(
[respond_block.block]
)
val, pos = prevalidate_block[0]
prev_lca = self.blockchain.lca_block
async with self.blockchain.lock:
# Tries to add the block to the blockchain
added, replaced, error_code = await self.blockchain.receive_block(
respond_block.block, val, pos, sync_mode=False
respond_block.block, False, None, sync_mode=False
)
if added == ReceiveBlockResult.ADDED_TO_HEAD:
await self.mempool_manager.new_tips(
@ -1632,7 +1446,9 @@ class FullNode:
raise ConsensusError(error_code, header_hash)
elif added == ReceiveBlockResult.DISCONNECTED_BLOCK:
self.log.warning(f"Disconnected block {header_hash}")
self.log.info(
f"Disconnected block {header_hash} at height {respond_block.block.height}"
)
tip_height = min(
[head.height for head in self.blockchain.get_current_tips()]
)
@ -1642,11 +1458,11 @@ class FullNode:
> tip_height + self.config["sync_blocks_behind_threshold"]
):
async with self.blockchain.lock:
if self.full_node_store.get_sync_mode():
if self.sync_store.get_sync_mode():
return
await self.sync_store.clear_sync_info()
self.sync_store.add_potential_tip(respond_block.block)
self.full_node_store.set_sync_mode(True)
self.sync_store.set_sync_mode(True)
self.log.info(
f"We are too far behind this block. Our height is {tip_height} and block is at "
f"{respond_block.block.height}"
@ -1658,7 +1474,8 @@ class FullNode:
except asyncio.CancelledError:
self.log.error("Syncing failed, CancelledError")
except BaseException as e:
self.log.error(f"Error {type(e)}{e} with syncing")
tb = traceback.format_exc()
self.log.error(f"Error with syncing: {type(e)}{tb}")
finally:
async for ret_msg in self._finish_sync():
yield ret_msg
@ -1685,7 +1502,7 @@ class FullNode:
)
difficulty = self.blockchain.get_next_difficulty(
respond_block.block.prev_header_hash
self.blockchain.headers[respond_block.block.prev_header_hash]
)
next_vdf_min_iters = self.blockchain.get_next_min_iters(respond_block.block)
next_vdf_ips = next_vdf_min_iters // (
@ -1800,7 +1617,7 @@ class FullNode:
self, reject: full_node_protocol.RejectBlockRequest
) -> OutboundMessageGenerator:
self.log.warning(f"Rejected block request {reject}")
if self.full_node_store.get_sync_mode():
if self.sync_store.get_sync_mode():
yield OutboundMessage(NodeType.FULL_NODE, Message("", None), Delivery.CLOSE)
for _ in []:
yield _
@ -1840,10 +1657,8 @@ class FullNode:
return
self.log.info(f"Trying to connect to peers: {to_connect}")
tasks = []
for peer in to_connect:
tasks.append(asyncio.create_task(self.server.start_client(peer, None)))
await asyncio.gather(*tasks)
asyncio.create_task(self.server.start_client(peer, None))
@api_request
async def request_mempool_transactions(
@ -1869,7 +1684,7 @@ class FullNode:
self, tx: wallet_protocol.SendTransaction
) -> OutboundMessageGenerator:
# Ignore if syncing
if self.full_node_store.get_sync_mode():
if self.sync_store.get_sync_mode():
cost = None
status = MempoolInclusionStatus.FAILED
error: Optional[Err] = Err.UNKNOWN
@ -1933,7 +1748,7 @@ class FullNode:
== self.constants["DIFFICULTY_DELAY"]
):
difficulty_update = self.blockchain.get_next_difficulty(
curr.prev_header_hash
self.blockchain.headers[curr.prev_header_hash]
)
if (curr.height + 1) % self.constants["DIFFICULTY_EPOCH"] == 0:
iters_update = curr.data.total_iters

View File

@ -14,8 +14,6 @@ log = logging.getLogger(__name__)
class FullNodeStore:
db: aiosqlite.Connection
# Whether or not we are syncing
sync_mode: bool
# Current estimate of the speed of the network timelords
proof_of_time_estimate_ips: uint64
# Proof of time heights
@ -40,7 +38,6 @@ class FullNodeStore:
await self.db.commit()
self.sync_mode = False
self.proof_of_time_estimate_ips = uint64(10000)
self.proof_of_time_heights = {}
self.unfinished_blocks_leader = (
@ -81,12 +78,6 @@ class FullNodeStore:
if self.disconnected_blocks[key].height < height:
del self.disconnected_blocks[key]
def set_sync_mode(self, sync_mode: bool) -> None:
self.sync_mode = sync_mode
def get_sync_mode(self) -> bool:
return self.sync_mode
def add_candidate_block(
self,
pos_hash: bytes32,

View File

@ -0,0 +1,123 @@
import asyncio
import concurrent
import logging
import time
from typing import Optional
from src.full_node.blockchain import Blockchain, ReceiveBlockResult
from src.full_node.sync_store import SyncStore
from src.types.full_block import FullBlock
from src.util.errors import ConsensusError
from src.util.ints import uint32
log = logging.getLogger(__name__)
class SyncBlocksProcessor:
def __init__(
self,
sync_store: SyncStore,
fork_height: uint32,
tip_height: uint32,
blockchain: Blockchain,
):
self.sync_store = sync_store
self.blockchain = blockchain
self.fork_height = fork_height
self.tip_height = tip_height
self._shut_down = False
self.BATCH_SIZE = 10
self.SLEEP_INTERVAL = 10
self.TOTAL_TIMEOUT = 200
def shut_down(self):
self._shut_down = True
async def process(self) -> None:
header_hashes = self.sync_store.get_potential_hashes()
# TODO: run this in a new process so it doesn't have to share CPU time with other things
for batch_start_height in range(
self.fork_height + 1, self.tip_height + 1, self.BATCH_SIZE
):
total_time_slept = 0
batch_end_height = min(
batch_start_height + self.BATCH_SIZE - 1, self.tip_height
)
for height in range(batch_start_height, batch_end_height + 1):
# If we have already added this block to the chain, skip it
if header_hashes[height] in self.blockchain.headers:
batch_start_height = height + 1
while True:
if self._shut_down:
return
if total_time_slept > self.TOTAL_TIMEOUT:
raise TimeoutError("Took too long to fetch blocks")
awaitables = [
(self.sync_store.potential_blocks_received[uint32(height)]).wait()
for height in range(batch_start_height, batch_end_height + 1)
]
future = asyncio.gather(*awaitables, return_exceptions=True)
try:
await asyncio.wait_for(future, timeout=self.SLEEP_INTERVAL)
break
except concurrent.futures.TimeoutError:
try:
await future
except asyncio.CancelledError:
pass
total_time_slept += self.SLEEP_INTERVAL
log.info(
f"Did not receive desired blocks ({batch_start_height}, {batch_end_height})"
)
# Verifies this batch, which we are guaranteed to have (since we broke from the above loop)
blocks = []
for height in range(batch_start_height, batch_end_height + 1):
b: Optional[FullBlock] = self.sync_store.potential_blocks[
uint32(height)
]
assert b is not None
blocks.append(b)
validation_start_time = time.time()
prevalidate_results = await self.blockchain.pre_validate_blocks_multiprocessing(
blocks
)
if self._shut_down:
return
for index, block in enumerate(blocks):
assert block is not None
# The block gets permanantly added to the blockchain
validated, pos = prevalidate_results[index]
async with self.blockchain.lock:
(
result,
header_block,
error_code,
) = await self.blockchain.receive_block(
block, validated, pos, sync_mode=True
)
if (
result == ReceiveBlockResult.INVALID_BLOCK
or result == ReceiveBlockResult.DISCONNECTED_BLOCK
):
if error_code is not None:
raise ConsensusError(error_code, block.header_hash)
raise RuntimeError(f"Invalid block {block.header_hash}")
assert (
max([h.height for h in self.blockchain.get_current_tips()])
>= block.height
)
del self.sync_store.potential_blocks[block.height]
log.info(
f"Took {time.time() - validation_start_time} seconds to validate and add blocks "
f"{batch_start_height} to {batch_end_height + 1}."
)

View File

@ -0,0 +1,243 @@
import asyncio
import logging
import time
from typing import Any, AsyncGenerator, Dict, List, Union
from src.full_node.blockchain import Blockchain
from src.full_node.sync_store import SyncStore
from src.protocols import full_node_protocol
from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage
from src.types.full_block import FullBlock
from src.types.header_block import HeaderBlock
from src.types.sized_bytes import bytes32
from src.util.ints import uint32, uint64
log = logging.getLogger(__name__)
OutboundMessageGenerator = AsyncGenerator[OutboundMessage, None]
class SyncPeersHandler:
"""
Handles the sync process by downloading blocks from all connected peers. Requests for blocks
are sent to alternating peers, with a total max outgoing count for each peer, and a total
download limit beyond what has already been processed into the blockchain.
This works both for downloading HeaderBlocks, and downloading FullBlocks.
Successfully downloaded blocks are saved to the SyncStore, which then are collected by the
BlockProcessor and added to the chain.
"""
# Node id -> (block_hash -> time). For each node, the blocks that have been requested,
# and the time the request was sent.
current_outbound_sets: Dict[bytes32, Dict[bytes32, uint64]]
sync_store: SyncStore
fully_validated_up_to: uint32
potential_blocks_received: Dict[uint32, asyncio.Event]
potential_blocks: Dict[uint32, Any]
def __init__(
self,
sync_store: SyncStore,
peers: List[bytes32],
fork_height: uint32,
blockchain: Blockchain,
):
self.sync_store = sync_store
# Set of outbound requests for every full_node peer, and time sent
self.current_outbound_sets = {}
self.blockchain = blockchain
self.header_hashes = self.sync_store.get_potential_hashes()
self.fully_validated_up_to = fork_height
# Only request this height greater than our current validation
self.MAX_GAP = 100
# Only request this many simultaneous blocks per peer
self.MAX_REQUESTS_PER_PEER = 10
# If a response for a block request is not received by this timeout, the connection
# is closed.
self.BLOCK_RESPONSE_TIMEOUT = 60
for node_id in peers:
self.current_outbound_sets[node_id] = {}
self.potential_blocks_received = self.sync_store.potential_blocks_received
self.potential_blocks = self.sync_store.potential_blocks
# No blocks received yet
for height in range(self.fully_validated_up_to + 1, len(self.header_hashes)):
self.potential_blocks_received[uint32(height)] = asyncio.Event()
def done(self) -> bool:
"""
Returns True iff all required blocks have been downloaded.
"""
for height in range(self.fully_validated_up_to + 1, len(self.header_hashes)):
if not self.potential_blocks_received[uint32(height)].is_set():
# Some blocks have not been received yet
return False
# We have received all blocks
return True
async def monitor_timeouts(self) -> OutboundMessageGenerator:
"""
If any of our requests have timed out, disconnects from the node that should
have responded.
"""
current_time = time.time()
remove_node_ids = []
for node_id, outbound_set in self.current_outbound_sets.items():
for _, time_requested in outbound_set.items():
if current_time - time_requested > self.BLOCK_RESPONSE_TIMEOUT:
remove_node_ids.append(node_id)
for rnid in remove_node_ids:
if rnid in self.current_outbound_sets:
log.warning(
f"Timeout receiving block, closing connection with node {rnid}"
)
self.current_outbound_sets.pop(rnid, None)
yield OutboundMessage(
NodeType.FULL_NODE, Message("", None), Delivery.CLOSE, rnid
)
async def _add_to_request_sets(self) -> OutboundMessageGenerator:
"""
Refreshes the pointers of how far we validated and how far we downloaded. Then goes through
all peers and sends requests to peers for the blocks we have not requested yet, or have
requested to a peer that did not respond in time or disconnected.
"""
if not self.sync_store.get_sync_mode():
return
# fork fully validated MAX_GAP target
# $$$$$X$$$$$$$$$$$$$$$X================----==---=--====---=--X------->
# $
# $
# $$$$$$$$$$$$$$$$$$$$$$$$>
# prev tip
# Refresh the fully_validated_up_to pointer
target_height = len(self.header_hashes) - 1
for height in range(self.fully_validated_up_to + 1, target_height + 1):
if self.header_hashes[height] in self.blockchain.headers:
self.fully_validated_up_to = uint32(height)
else:
break
# Number of request slots
free_slots = 0
for node_id, request_set in self.current_outbound_sets.items():
free_slots += self.MAX_REQUESTS_PER_PEER - len(request_set)
to_send: List[uint32] = []
# Finds a block height
for height in range(
self.fully_validated_up_to + 1,
min(self.fully_validated_up_to + self.MAX_GAP + 1, target_height + 1),
):
if len(to_send) == free_slots:
# No more slots to send to any peers
break
header_hash = self.header_hashes[uint32(height)]
if header_hash in self.blockchain.headers:
# Avoids downloading blocks and headers that we already have
continue
if self.potential_blocks_received[uint32(height)].is_set():
continue
already_requested = False
# If we have asked for this block to some peer, we don't want to ask for it again yet.
for node_id_2, request_set_2 in self.current_outbound_sets.items():
if self.header_hashes[height] in request_set_2:
already_requested = True
break
if already_requested:
continue
to_send.append(uint32(height))
# Sort by the peers that have the least outgoing messages
outbound_sets_list = list(self.current_outbound_sets.items())
outbound_sets_list.sort(key=lambda x: len(x[1]))
index = 0
to_yield: List[Any] = []
for height in to_send:
# Find a the next peer with an empty slot. There must be an empty slot: to_send
# includes up to free_slots things, and current_outbound sets cannot change since there is
# no await from when free_slots is computed (and thus no context switch).
while (
len(outbound_sets_list[index % len(outbound_sets_list)][1])
== self.MAX_REQUESTS_PER_PEER
):
index += 1
# Add to peer request
node_id, request_set = outbound_sets_list[index % len(outbound_sets_list)]
request_set[self.header_hashes[height]] = uint64(int(time.time()))
to_yield.append(
full_node_protocol.RequestBlock(height, self.header_hashes[height])
)
for request in to_yield:
yield OutboundMessage(
NodeType.FULL_NODE,
Message("request_block", request),
Delivery.SPECIFIC,
node_id,
)
async def new_block(
self, block: Union[FullBlock, HeaderBlock]
) -> OutboundMessageGenerator:
"""
A new block was received from a peer.
"""
header_hash: bytes32 = block.header_hash
if not isinstance(block, FullBlock):
return
if (
block.height >= len(self.header_hashes)
or self.header_hashes[block.height] != header_hash
):
# This block is wrong, so ignore
log.info(
f"Received header hash that is not in sync path {header_hash} at height {block.height}"
)
return
# save block to DB
self.potential_blocks[block.height] = block
if not self.sync_store.get_sync_mode():
return
assert block.height in self.potential_blocks_received
self.potential_blocks_received[block.height].set()
# remove block from request set
for node_id, request_set in self.current_outbound_sets.items():
request_set.pop(header_hash, None)
# add to request sets
async for msg in self._add_to_request_sets():
yield msg
async def reject_block(
self, header_hash: bytes32, node_id: bytes32
) -> OutboundMessageGenerator:
"""
A rejection was received from a peer, so we remove this peer and close the connection,
since we assume this peer cannot help us sync up. All blocks are removed from the
request set.
"""
self.current_outbound_sets.pop(node_id, None)
yield OutboundMessage(NodeType.FULL_NODE, Message("", None), Delivery.CLOSE)
def new_node_connected(self, node_id: bytes32):
"""
A new node has connected to us.
"""
self.current_outbound_sets[node_id] = {}
def node_disconnected(self, node_id: bytes32):
"""
A connection with a node has been closed.
"""
self.current_outbound_sets.pop(node_id, None)

View File

@ -1,10 +1,8 @@
import asyncio
import logging
import aiosqlite
from typing import Dict, List, Optional, Tuple
from src.types.full_block import FullBlock
from src.types.header_block import HeaderBlock
from src.types.sized_bytes import bytes32
from src.util.ints import uint32
@ -12,65 +10,42 @@ log = logging.getLogger(__name__)
class SyncStore:
db: aiosqlite.Connection
# Whether or not we are syncing
sync_mode: bool
# Whether we are waiting for tips (at the start of sync) or already syncing
waiting_for_tips: bool
# Potential new tips that we have received from others.
potential_tips: Dict[bytes32, FullBlock]
# List of all header hashes up to the tip, download up front
potential_hashes: List[bytes32]
# Header blocks received from other peers during sync
potential_headers: Dict[uint32, HeaderBlock]
# Blocks received from other peers during sync
potential_blocks: Dict[uint32, FullBlock]
# Event to signal when header hashes are received
potential_hashes_received: Optional[asyncio.Event]
# Event to signal when headers are received at each height
potential_headers_received: Dict[uint32, asyncio.Event]
# Event to signal when blocks are received at each height
potential_blocks_received: Dict[uint32, asyncio.Event]
# Blocks that we have finalized during sync, queue them up for adding after sync is done
potential_future_blocks: List[FullBlock]
@classmethod
async def create(cls, connection):
async def create(cls):
self = cls()
# All full blocks which have been added to the blockchain. Header_hash -> block
self.db = connection
# Blocks received from other peers during sync, cleared after sync
await self.db.execute(
"CREATE TABLE IF NOT EXISTS potential_blocks(height bigint PRIMARY KEY, block blob)"
)
await self.db.commit()
self.sync_mode = False
self.waiting_for_tips = True
self.potential_tips = {}
self.potential_hashes = []
self.potential_headers = {}
self.potential_blocks = {}
self.potential_hashes_received = None
self.potential_headers_received = {}
self.potential_blocks_received = {}
self.potential_future_blocks = []
return self
async def add_potential_block(self, block: FullBlock) -> None:
cursor = await self.db.execute(
"INSERT OR REPLACE INTO potential_blocks VALUES(?, ?)",
(block.height, bytes(block)),
)
await cursor.close()
await self.db.commit()
def set_sync_mode(self, sync_mode: bool) -> None:
self.sync_mode = sync_mode
async def get_potential_block(self, height: uint32) -> Optional[FullBlock]:
cursor = await self.db.execute(
"SELECT * from potential_blocks WHERE height=?", (height,)
)
row = await cursor.fetchone()
await cursor.close()
if row is not None:
return FullBlock.from_bytes(row[1])
return None
def get_sync_mode(self) -> bool:
return self.sync_mode
def set_waiting_for_tips(self, waiting_for_tips: bool) -> None:
self.waiting_for_tips = waiting_for_tips
@ -80,9 +55,7 @@ class SyncStore:
async def clear_sync_info(self):
self.potential_tips.clear()
self.potential_headers.clear()
cursor = await self.db.execute("DELETE FROM potential_blocks")
await cursor.close()
self.potential_blocks.clear()
self.potential_blocks_received.clear()
self.potential_future_blocks.clear()
self.waiting_for_tips = True
@ -96,15 +69,6 @@ class SyncStore:
def get_potential_tip(self, header_hash: bytes32) -> Optional[FullBlock]:
return self.potential_tips.get(header_hash, None)
def add_potential_header(self, block: HeaderBlock) -> None:
self.potential_headers[block.height] = block
def get_potential_header(self, height: uint32) -> Optional[HeaderBlock]:
return self.potential_headers.get(height, None)
def clear_potential_headers(self) -> None:
self.potential_headers.clear()
def set_potential_hashes(self, potential_hashes: List[bytes32]) -> None:
self.potential_hashes = potential_hashes
@ -117,18 +81,6 @@ class SyncStore:
def get_potential_hashes_received(self) -> Optional[asyncio.Event]:
return self.potential_hashes_received
def set_potential_headers_received(self, height: uint32, event: asyncio.Event):
self.potential_headers_received[height] = event
def get_potential_headers_received(self, height: uint32) -> asyncio.Event:
return self.potential_headers_received[height]
def set_potential_blocks_received(self, height: uint32, event: asyncio.Event):
self.potential_blocks_received[height] = event
def get_potential_blocks_received(self, height: uint32) -> asyncio.Event:
return self.potential_blocks_received[height]
def add_potential_future_block(self, block: FullBlock):
self.potential_future_blocks.append(block)

View File

@ -22,7 +22,7 @@ log = logging.getLogger(__name__)
def load_plots(
config_file: Dict, plot_config_file: Dict, pool_pubkeys: List[PublicKey]
config_file: Dict, plot_config_file: Dict, pool_pubkeys: Optional[List[PublicKey]]
) -> Dict[Path, DiskProver]:
provers: Dict[Path, DiskProver] = {}
for partial_filename_str, plot_config in plot_config_file["plots"].items():
@ -35,7 +35,7 @@ def load_plots(
pool_pubkey = PublicKey.from_bytes(bytes.fromhex(plot_config["pool_pk"]))
# Only use plots that correct pools associated with them
if pool_pubkey not in pool_pubkeys:
if pool_pubkeys is not None and pool_pubkey not in pool_pubkeys:
log.warning(
f"Plot {partial_filename} has a pool key that is not in the farmer's pool_pk list."
)
@ -47,7 +47,7 @@ def load_plots(
if filename.exists():
try:
provers[partial_filename_str] = DiskProver(str(filename))
except ValueError as e:
except Exception as e:
log.error(f"Failed to open file {filename}. {e}")
failed_to_open = True
break
@ -67,6 +67,7 @@ class Harvester:
provers: Dict[Path, DiskProver]
challenge_hashes: Dict[bytes32, Tuple[bytes32, Path, uint8]]
_plot_notification_task: asyncio.Task
_reconnect_task: Optional[asyncio.Task]
_is_shutdown: bool
executor: concurrent.futures.ThreadPoolExecutor
@ -82,6 +83,7 @@ class Harvester:
# From quality string to (challenge_hash, filename, index)
self.challenge_hashes = {}
self._plot_notification_task = asyncio.create_task(self._plot_notification())
self._reconnect_task = None
self._is_shutdown = False
self.server = None
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
@ -119,22 +121,26 @@ class Harvester:
async def connection_check():
while not self._is_shutdown:
if self.server is not None:
farmer_retry = True
counter = 0
while not self._is_shutdown and counter % 30 == 0:
if self.server is not None:
farmer_retry = True
for connection in self.server.global_connections.get_connections():
if connection.get_peer_info() == farmer_peer:
farmer_retry = False
for (
connection
) in self.server.global_connections.get_connections():
if connection.get_peer_info() == farmer_peer:
farmer_retry = False
if farmer_retry:
log.info(f"Reconnecting to farmer {farmer_retry}")
if not await self.server.start_client(
farmer_peer, None, auth=True
):
await asyncio.sleep(1)
await asyncio.sleep(30)
if farmer_retry:
log.info(f"Reconnecting to farmer {farmer_retry}")
if not await self.server.start_client(
farmer_peer, None, auth=True
):
await asyncio.sleep(1)
await asyncio.sleep(1)
self.reconnect_task = asyncio.create_task(connection_check())
self._reconnect_task = asyncio.create_task(connection_check())
def _shutdown(self):
self._is_shutdown = True
@ -142,6 +148,8 @@ class Harvester:
async def _await_shutdown(self):
await self._plot_notification_task
if self._reconnect_task is not None:
await self._reconnect_task
@api_request
async def harvester_handshake(

View File

@ -53,10 +53,8 @@ class RpcApiHandler:
"""
tips: List[Header] = self.full_node.blockchain.get_current_tips()
lca: Header = self.full_node.blockchain.lca_block
sync_mode: bool = self.full_node.full_node_store.get_sync_mode()
difficulty: uint64 = self.full_node.blockchain.get_next_difficulty(
lca.header_hash
)
sync_mode: bool = self.full_node.sync_store.get_sync_mode()
difficulty: uint64 = self.full_node.blockchain.get_next_difficulty(lca)
lca_block = await self.full_node.block_store.get_block(lca.header_hash)
if lca_block is None:
raise web.HTTPNotFound()
@ -69,12 +67,24 @@ class RpcApiHandler:
tip_hashes = []
for tip in tips:
tip_hashes.append(tip.header_hash)
if sync_mode and self.full_node.sync_peers_handler is not None:
sync_tip_height = len(self.full_node.sync_store.get_potential_hashes())
sync_progress_height = (
self.full_node.sync_peers_handler.fully_validated_up_to
)
else:
sync_tip_height = 0
sync_progress_height = uint32(0)
response = {
"tips": tips,
"tip_hashes": tip_hashes,
"lca": lca,
"sync_mode": sync_mode,
"sync": {
"sync_mode": sync_mode,
"sync_tip_height": sync_tip_height,
"sync_progress_height": sync_progress_height,
},
"difficulty": difficulty,
"ips": ips,
"min_iters": min_iters,
@ -236,7 +246,7 @@ class RpcApiHandler:
tip_weights = [tip.weight for tip in tips]
i = tip_weights.index(max(tip_weights))
max_tip: Header = tips[i]
if self.full_node.full_node_store.get_sync_mode():
if self.full_node.sync_store.get_sync_mode():
potential_tips = self.full_node.sync_store.get_potential_tips_tuples()
for _, pot_block in potential_tips:
if pot_block.weight > max_tip.weight:

View File

@ -51,9 +51,10 @@ class Connection:
self.bytes_read = 0
self.bytes_written = 0
self.last_message_time: float = 0
self._cached_peer_name = self.writer.get_extra_info("peername")
def get_peername(self):
return self.writer.get_extra_info("peername")
return self._cached_peer_name
def get_socket(self):
return self.writer.get_extra_info("socket")
@ -66,6 +67,9 @@ class Connection:
def get_last_message_time(self) -> float:
return self.last_message_time
def is_closing(self) -> bool:
return self.writer.is_closing()
async def send(self, message: Message):
encoded: bytes = cbor.dumps({"f": message.function, "d": message.data})
assert len(encoded) < (2 ** (LENGTH_BYTES * 8))

View File

@ -1,6 +1,7 @@
from dataclasses import dataclass
from enum import IntEnum
from typing import Any
from typing import Any, Optional
from src.types.sized_bytes import bytes32
class NodeType(IntEnum):
@ -23,6 +24,8 @@ class Delivery(IntEnum):
RANDOM = 4
# Pseudo-message to close the current connection
CLOSE = 5
# A message is sent to a speicific peer, specified in OutboundMessage
SPECIFIC = 6
@dataclass
@ -40,3 +43,6 @@ class OutboundMessage:
# Message to send
message: Message
delivery_method: Delivery
# Node id to send the request to, only applies to SPECIFIC delivery type
specific_peer_node_id: Optional[bytes32] = None

View File

@ -326,6 +326,11 @@ class ChiaServer:
# Does not ban the peer, this is just a graceful close of connection.
self.global_connections.close(connection, True)
continue
if connection.is_closing():
self.log.info(
f"Closing, so will not send {message.function} to peer {connection.get_peername()}"
)
continue
self.log.info(
f"-> {message.function} to peer {connection.get_peername()}"
)
@ -407,6 +412,10 @@ class ChiaServer:
connection.node_id = inbound_handshake.node_id
connection.peer_server_port = int(inbound_handshake.server_port)
connection.connection_type = inbound_handshake.node_type
if self._srwt_aiter.is_stopped():
raise Exception("No longer accepting handshakes, closing.")
if not self.global_connections.add(connection):
raise ProtocolError(Err.DUPLICATE_CONNECTION, [False])
@ -461,12 +470,18 @@ class ChiaServer:
self.log.warning(
f"Connection error by peer {connection.get_peername()}, closing connection."
)
except ssl.SSLError as e:
self.log.warning(
f"SSLError {e} in connection with peer {connection.get_peername()}."
)
except (
concurrent.futures._base.CancelledError,
OSError,
TimeoutError,
asyncio.TimeoutError,
) as e:
tb = traceback.format_exc()
self.log.error(tb)
self.log.error(
f"Timeout/OSError {e} in connection with peer {connection.get_peername()}, closing connection."
)
@ -566,6 +581,28 @@ class ChiaServer:
yield (peer, outbound_message.message)
else:
yield (peer, outbound_message.message)
elif outbound_message.delivery_method == Delivery.SPECIFIC:
# Send to a specific peer, by node_id, assuming the NodeType matches.
if outbound_message.specific_peer_node_id is None:
return
for peer in self.global_connections.get_connections():
if (
peer.connection_type == outbound_message.peer_type
and peer.node_id == outbound_message.specific_peer_node_id
):
yield (peer, outbound_message.message)
elif outbound_message.delivery_method == Delivery.CLOSE:
# Close the connection but don't ban the peer
yield (connection, None)
if outbound_message.specific_peer_node_id is None:
# Close the connection but don't ban the peer
if connection.connection_type == outbound_message.peer_type:
yield (connection, None)
else:
for peer in self.global_connections.get_connections():
# Close the connection with the specific peer
if (
peer.connection_type == outbound_message.peer_type
and peer.node_id == outbound_message.specific_peer_node_id
):
yield (peer, outbound_message.message)

View File

@ -69,6 +69,7 @@ async def async_main():
# Called by the UI, when node is closed, or when a signal is sent
log.info("Closing all connections, and server...")
server.close_all()
full_node._close()
server_closed = True
if config["start_rpc_server"]:
@ -90,7 +91,7 @@ async def async_main():
log.info("Closed all node servers.")
# Stops the full node and closes DBs
await full_node._shutdown()
await full_node._await_closed()
# Waits for the rpc server to close
if rpc_cleanup is not None:

View File

@ -108,6 +108,7 @@ class TestBlockStore:
db_filename.unlink()
db_filename_2.unlink()
db_filename_3.unlink()
b.shut_down()
raise
await connection.close()
@ -116,27 +117,28 @@ class TestBlockStore:
db_filename.unlink()
db_filename_2.unlink()
db_filename_3.unlink()
b.shut_down()
# @pytest.mark.asyncio
# async def test_deadlock(self):
# blocks = bt.get_consecutive_blocks(test_constants, 10, [], 9, b"0")
# db_filename = Path("blockchain_test.db")
@pytest.mark.asyncio
async def test_deadlock(self):
blocks = bt.get_consecutive_blocks(test_constants, 10, [], 9, b"0")
db_filename = Path("blockchain_test.db")
# if db_filename.exists():
# db_filename.unlink()
if db_filename.exists():
db_filename.unlink()
# connection = await aiosqlite.connect(db_filename)
# db = await BlockStore.create(connection)
# tasks = []
connection = await aiosqlite.connect(db_filename)
db = await BlockStore.create(connection)
tasks = []
# for i in range(10000):
# rand_i = random.randint(0, 10)
# if random.random() < 0.5:
# tasks.append(asyncio.create_task(db.add_block(blocks[rand_i])))
# if random.random() < 0.5:
# tasks.append(
# asyncio.create_task(db.get_block(blocks[rand_i].header_hash))
# )
# await asyncio.gather(*tasks)
# await connection.close()
# db_filename.unlink()
for i in range(10000):
rand_i = random.randint(0, 10)
if random.random() < 0.5:
tasks.append(asyncio.create_task(db.add_block(blocks[rand_i])))
if random.random() < 0.5:
tasks.append(
asyncio.create_task(db.get_block(blocks[rand_i].header_hash))
)
await asyncio.gather(*tasks)
await connection.close()
db_filename.unlink()

View File

@ -58,12 +58,11 @@ class TestGenesisBlock:
assert len(bc1.get_current_tips()) == 1
genesis_block = bc1.get_current_tips()[0]
assert genesis_block.height == 0
assert (
bc1.get_next_difficulty(genesis_block.header_hash)
) == genesis_block.weight
assert (bc1.get_next_difficulty(genesis_block)) == genesis_block.weight
assert bc1.get_next_min_iters(bc1.genesis) > 0
await connection.close()
bc1.shut_down()
class TestBlockValidation:
@ -509,9 +508,9 @@ class TestBlockValidation:
assert result == ReceiveBlockResult.ADDED_TO_HEAD
assert error_code is None
diff_25 = b.get_next_difficulty(blocks[24].header_hash)
diff_26 = b.get_next_difficulty(blocks[25].header_hash)
diff_27 = b.get_next_difficulty(blocks[26].header_hash)
diff_25 = b.get_next_difficulty(blocks[24].header)
diff_26 = b.get_next_difficulty(blocks[25].header)
diff_27 = b.get_next_difficulty(blocks[26].header)
assert diff_26 == diff_25
assert diff_27 > diff_26
@ -524,6 +523,7 @@ class TestBlockValidation:
assert (b.get_next_min_iters(blocks[27])) == (b.get_next_min_iters(blocks[26]))
await connection.close()
b.shut_down()
class TestReorgs:
@ -558,6 +558,7 @@ class TestReorgs:
assert b.get_current_tips()[0].height == 119
await connection.close()
b.shut_down()
@pytest.mark.asyncio
async def test_reorg_from_genesis(self):
@ -602,6 +603,7 @@ class TestReorgs:
assert result == ReceiveBlockResult.ADDED_TO_HEAD
await connection.close()
b.shut_down()
@pytest.mark.asyncio
async def test_lca(self):
@ -631,6 +633,7 @@ class TestReorgs:
assert b.lca_block.header_hash == blocks[0].header_hash
await connection.close()
b.shut_down()
@pytest.mark.asyncio
async def test_find_fork_point(self):
@ -674,6 +677,7 @@ class TestReorgs:
)
assert b.lca_block.data == blocks[4].header.data
await connection.close()
b.shut_down()
@pytest.mark.asyncio
async def test_get_header_hashes(self):
@ -693,3 +697,4 @@ class TestReorgs:
assert header_hashes == [block.header_hash for block in blocks]
await connection.close()
b.shut_down()

View File

@ -182,10 +182,12 @@ class TestCoinStore:
except Exception as e:
await connection.close()
Path("blockchain_test.db").unlink()
b.shut_down()
raise e
await connection.close()
Path("blockchain_test.db").unlink()
b.shut_down()
@pytest.mark.asyncio
async def test_get_puzzle_hash(self):
@ -214,7 +216,9 @@ class TestCoinStore:
except Exception as e:
await connection.close()
Path("blockchain_test.db").unlink()
b.shut_down()
raise e
await connection.close()
Path("blockchain_test.db").unlink()
b.shut_down()

View File

@ -703,12 +703,12 @@ class TestFullNodeProtocol:
)
# In sync mode
full_node_1.full_node_store.set_sync_mode(True)
full_node_1.sync_store.set_sync_mode(True)
msgs = [
_ async for _ in full_node_1.respond_block(fnp.RespondBlock(blocks_new[-5]))
]
assert len(msgs) == 0
full_node_1.full_node_store.set_sync_mode(False)
full_node_1.sync_store.set_sync_mode(False)
# If invalid, do nothing
block_invalid = FullBlock(

View File

@ -60,11 +60,6 @@ class TestFullNodeStore:
db = await FullNodeStore.create(connection)
db_2 = await FullNodeStore.create(connection_2)
try:
# Save/get sync
for sync_mode in (False, True):
db.set_sync_mode(sync_mode)
assert sync_mode == db.get_sync_mode()
# Add/get candidate block
assert db.get_candidate_block(0) is None
partial = (

View File

@ -81,20 +81,15 @@ class TestNodeLoad:
start_unf = time.time()
for i in range(1, num_blocks):
while max([h.height for h in full_node_2.blockchain.get_current_tips()]) < (
i - 1
):
# Waits until we reach height i - 1
await asyncio.sleep(0.05)
msg = Message("respond_block", full_node_protocol.RespondBlock(blocks[i]))
server_1.push_message(
OutboundMessage(NodeType.FULL_NODE, msg, Delivery.BROADCAST)
)
while time.time() - start_unf < 100:
if (
max([h.height for h in full_node_2.blockchain.get_current_tips()])
== num_blocks - 1
):
print(
f"Time taken to process {num_blocks} is {time.time() - start_unf}"
)
return
await asyncio.sleep(0.1)
raise Exception("Took too long to process blocks")
print(f"Time taken to process {num_blocks} is {time.time() - start_unf}")
assert time.time() - start_unf < 200

View File

@ -42,69 +42,18 @@ class TestStore:
assert sqlite3.threadsafety == 1
blocks = bt.get_consecutive_blocks(test_constants, 9, [], 9, b"0")
blocks_alt = bt.get_consecutive_blocks(test_constants, 3, [], 9, b"1")
db_filename = Path("blockchain_test.db")
db_filename_2 = Path("blockchain_test_2.db")
db = await SyncStore.create()
db_2 = await SyncStore.create()
if db_filename.exists():
db_filename.unlink()
if db_filename_2.exists():
db_filename_2.unlink()
# Save/get sync
for sync_mode in (False, True):
db.set_sync_mode(sync_mode)
assert sync_mode == db.get_sync_mode()
genesis = FullBlock.from_bytes(test_constants["GENESIS_BLOCK"])
connection = await aiosqlite.connect(db_filename)
connection_2 = await aiosqlite.connect(db_filename_2)
# clear sync info
await db.clear_sync_info()
db = await SyncStore.create(connection)
db_2 = await SyncStore.create(connection_2)
try:
genesis = FullBlock.from_bytes(test_constants["GENESIS_BLOCK"])
# clear sync info
await db.clear_sync_info()
# add/get potential tip, get potential tips num
db.add_potential_tip(blocks[6])
assert blocks[6] == db.get_potential_tip(blocks[6].header_hash)
# Add potential block
await db.add_potential_block(genesis)
assert genesis == await db.get_potential_block(uint32(0))
except Exception:
await connection.close()
await connection_2.close()
db_filename.unlink()
db_filename_2.unlink()
raise
await connection.close()
await connection_2.close()
db_filename.unlink()
db_filename_2.unlink()
@pytest.mark.asyncio
async def test_deadlock(self):
blocks = bt.get_consecutive_blocks(test_constants, 10, [], 9, b"0")
db_filename = Path("blockchain_test.db")
if db_filename.exists():
db_filename.unlink()
connection = await aiosqlite.connect(db_filename)
db = await SyncStore.create(connection)
tasks = []
for i in range(10000):
rand_i = random.randint(0, 10)
if random.random() < 0.5:
tasks.append(
asyncio.create_task(db.add_potential_block(blocks[rand_i]))
)
if random.random() < 0.5:
tasks.append(
asyncio.create_task(
db.get_potential_block(blocks[rand_i].header_hash)
)
)
await asyncio.gather(*tasks)
await connection.close()
db_filename.unlink()
# add/get potential tip, get potential tips num
db.add_potential_tip(blocks[6])
assert blocks[6] == db.get_potential_tip(blocks[6].header_hash)

View File

@ -39,7 +39,7 @@ class TestRpc:
pass
def stop_node_cb():
full_node_1._shutdown()
full_node_1._close()
server_1.close_all()
rpc_cleanup = await start_rpc_server(full_node_1, stop_node_cb, test_rpc_port)
@ -48,7 +48,7 @@ class TestRpc:
client = await RpcClient.create(test_rpc_port)
state = await client.get_blockchain_state()
assert state["lca"].header_hash is not None
assert not state["sync_mode"]
assert not state["sync"]["sync_mode"]
assert len(state["tips"]) > 0
assert state["difficulty"] > 0
assert state["ips"] > 0

View File

@ -42,6 +42,15 @@ test_constants["GENESIS_BLOCK"] = bytes(
)
async def _teardown_nodes(node_aiters: List) -> None:
awaitables = [node_iter.__anext__() for node_iter in node_aiters]
for sublist_awaitable in asyncio.as_completed(awaitables):
try:
await sublist_awaitable
except StopAsyncIteration:
pass
async def setup_full_node_simulator(db_name, port, introducer_port=None, dic={}):
# SETUP
test_constants_copy = test_constants.copy()
@ -87,8 +96,9 @@ async def setup_full_node_simulator(db_name, port, introducer_port=None, dic={})
# TEARDOWN
server_1.close_all()
full_node_1._close()
await server_1.await_closed()
await full_node_1._shutdown()
await full_node_1._await_closed()
db_path.unlink()
@ -128,6 +138,7 @@ async def setup_full_node(db_name, port, introducer_port=None, dic={}):
network_id,
root_path,
config,
f"full_node_server_{port}",
)
_ = await server_1.start_server(full_node_1._on_connect)
full_node_1._set_server(server_1)
@ -136,8 +147,9 @@ async def setup_full_node(db_name, port, introducer_port=None, dic={}):
# TEARDOWN
server_1.close_all()
full_node_1._close()
await server_1.await_closed()
await full_node_1._shutdown()
await full_node_1._await_closed()
db_path = root_path / f"{db_name}"
if db_path.exists():
db_path.unlink()
@ -209,12 +221,13 @@ async def setup_harvester(port, dic={}):
f"harvester_server_{port}",
)
harvester.set_server(server)
yield (harvester, server)
harvester._shutdown()
server.close_all()
await harvester._await_shutdown()
harvester._shutdown()
await server.await_closed()
await harvester._await_shutdown()
async def setup_farmer(port, dic={}):
@ -283,6 +296,7 @@ async def setup_introducer(port, dic={}):
network_id,
bt.root_path,
config,
f"introducer_server_{port}",
)
_ = await server.start_server(None)
@ -321,6 +335,7 @@ async def setup_timelord(port, dic={}):
network_id,
bt.root_path,
config,
f"timelord_server_{port}",
)
coro = asyncio.start_server(
@ -351,6 +366,7 @@ async def setup_timelord(port, dic={}):
async def setup_two_nodes(dic={}):
"""
Setup and teardown of two full nodes, with blockchains and separate DBs.
"""
@ -364,11 +380,7 @@ async def setup_two_nodes(dic={}):
yield (fn1, fn2, s1, s2)
for node_iter in node_iters:
try:
await node_iter.__anext__()
except StopAsyncIteration:
pass
await _teardown_nodes(node_iters)
async def setup_node_and_wallet(dic={}):
@ -382,11 +394,7 @@ async def setup_node_and_wallet(dic={}):
yield (full_node, wallet, s1, s2)
for node_iter in node_iters:
try:
await node_iter.__anext__()
except StopAsyncIteration:
pass
await _teardown_nodes(node_iters)
async def setup_node_and_two_wallets(dic={}):
@ -402,11 +410,7 @@ async def setup_node_and_two_wallets(dic={}):
yield (full_node, wallet, wallet_2, s1, s2, s3)
for node_iter in node_iters:
try:
await node_iter.__anext__()
except StopAsyncIteration:
pass
await _teardown_nodes(node_iters)
async def setup_simulators_and_wallets(
@ -432,11 +436,7 @@ async def setup_simulators_and_wallets(
yield (simulators, wallets)
for node_iter in node_iters:
try:
await node_iter.__anext__()
except StopAsyncIteration:
pass
await _teardown_nodes(node_iters)
async def setup_full_system(dic={}):
@ -469,9 +469,4 @@ async def setup_full_system(dic={}):
yield (node1, node2)
for node_iter in node_iters:
try:
await node_iter.__anext__()
except StopAsyncIteration:
pass
await _teardown_nodes(node_iters)

View File

@ -35,4 +35,4 @@ class TestSimulation:
if max([h.height for h in node1.blockchain.get_current_tips()]) > 10:
return
await asyncio.sleep(1)
assert False
raise Exception("Failed due to timeout")