Use attributes instead of dictionary keys.

This commit is contained in:
Richard Kiss 2020-07-20 10:33:27 -07:00 committed by Gene Hoffman
parent 9f05d9d112
commit 470b0c8c76
18 changed files with 175 additions and 175 deletions

View File

@ -1,5 +1,7 @@
import dataclasses
from src.util.ints import uint64
@dataclasses.dataclass(frozen=True)
class ConsensusConstants:
@ -18,7 +20,7 @@ class ConsensusConstants:
# will be contant and required for all blocks.
MIN_ITERS_PROPORTION: int
# For the first epoch, since we have no previous blocks, we can't estimate vdf iterations per second
MIN_ITERS_STARTING: int
MIN_ITERS_STARTING: uint64
MAX_FUTURE_TIME: int # The next block can have a timestamp of at most these many seconds more
NUMBER_OF_TIMESTAMPS: int # Than the average of the last NUMBEBR_OF_TIMESTAMPS blocks
# If an unfinished block is more than these many seconds slower than the best unfinished block,

View File

@ -213,7 +213,7 @@ class Farmer:
raise RuntimeError("Did not find challenge")
computed_quality_string = response.proof.verify_and_get_quality_string(
self.constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"]
self.constants.NUMBER_ZERO_BITS_CHALLENGE_SIG
)
if computed_quality_string is None:
raise RuntimeError("Invalid proof of space")

View File

@ -64,19 +64,19 @@ async def validate_unfinished_block_header(
if prev_header_block is not None:
last_timestamps: List[uint64] = []
curr = prev_header_block.header
while len(last_timestamps) < constants["NUMBER_OF_TIMESTAMPS"]:
while len(last_timestamps) < constants.NUMBER_OF_TIMESTAMPS:
last_timestamps.append(curr.data.timestamp)
fetched = headers.get(curr.prev_header_hash, None)
if not fetched:
break
curr = fetched
if len(last_timestamps) != constants["NUMBER_OF_TIMESTAMPS"]:
if len(last_timestamps) != constants.NUMBER_OF_TIMESTAMPS:
# For blocks 1 to 10, average timestamps of all previous blocks
assert curr.height == 0
prev_time: uint64 = uint64(int(sum(last_timestamps) // len(last_timestamps)))
if block_header.data.timestamp < prev_time:
return (Err.TIMESTAMP_TOO_FAR_IN_PAST, None)
if block_header.data.timestamp > time.time() + constants["MAX_FUTURE_TIME"]:
if block_header.data.timestamp > time.time() + constants.MAX_FUTURE_TIME:
return (Err.TIMESTAMP_TOO_FAR_IN_FUTURE, None)
# 7. Extension data must be valid, if any is present
@ -93,7 +93,7 @@ async def validate_unfinished_block_header(
# 10. The proof of space must be valid on the challenge
if pos_quality_string is None:
pos_quality_string = proof_of_space.verify_and_get_quality_string(
constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"]
constants.NUMBER_ZERO_BITS_CHALLENGE_SIG
)
if not pos_quality_string:
return (Err.INVALID_POSPACE, None)
@ -123,15 +123,15 @@ async def validate_unfinished_block_header(
constants, headers, height_to_hash, prev_header_block
)
else:
difficulty = uint64(constants["DIFFICULTY_STARTING"])
min_iters = uint64(constants["MIN_ITERS_STARTING"])
difficulty = uint64(constants.DIFFICULTY_STARTING)
min_iters = uint64(constants.MIN_ITERS_STARTING)
number_of_iters: uint64 = calculate_iterations_quality(
pos_quality_string, proof_of_space.size, difficulty, min_iters,
)
assert count_significant_bits(difficulty) <= constants["SIGNIFICANT_BITS"]
assert count_significant_bits(min_iters) <= constants["SIGNIFICANT_BITS"]
assert count_significant_bits(difficulty) <= constants.SIGNIFICANT_BITS
assert count_significant_bits(min_iters) <= constants.SIGNIFICANT_BITS
if prev_header_block is not None:
# 17. If not genesis, the total weight must be the parent weight + difficulty
@ -203,7 +203,7 @@ async def validate_finished_block_header(
# 2. the PoT must be valid, on a discriminant of size 1024, and the challenge_hash
if not pre_validated:
if not block.proof_of_time.is_valid(constants["DISCRIMINANT_SIZE_BITS"]):
if not block.proof_of_time.is_valid(constants.DISCRIMINANT_SIZE_BITS):
return Err.INVALID_POT
# 3. If not genesis, the challenge_hash in the proof of time must match the challenge on the previous block
if not genesis:
@ -234,7 +234,7 @@ def pre_validate_finished_block_header(constants: ConsensusConstants, data: byte
return False, None
# 4. Check PoT
if not block.proof_of_time.is_valid(constants["DISCRIMINANT_SIZE_BITS"]):
if not block.proof_of_time.is_valid(constants.DISCRIMINANT_SIZE_BITS):
return False, None
# 9. Check harvester signature of header data is valid based on harvester key
@ -248,7 +248,7 @@ def pre_validate_finished_block_header(constants: ConsensusConstants, data: byte
# 10. Check proof of space based on challenge
pos_quality_string = block.proof_of_space.verify_and_get_quality_string(
constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"]
constants.NUMBER_ZERO_BITS_CHALLENGE_SIG
)
if not pos_quality_string:

View File

@ -108,8 +108,8 @@ class Blockchain:
self.coin_store = coin_store
self.block_store = block_store
self._shut_down = False
self.genesis = FullBlock.from_bytes(self.constants["GENESIS_BLOCK"])
self.coinbase_freeze = self.constants["COINBASE_FREEZE_PERIOD"]
self.genesis = FullBlock.from_bytes(self.constants.GENESIS_BLOCK)
self.coinbase_freeze = self.constants.COINBASE_FREEZE_PERIOD
await self._load_chain_from_store()
return self
@ -221,9 +221,7 @@ class Blockchain:
prev_challenge_hash = block.proof_of_space.challenge_hash
new_difficulty: Optional[uint64]
if (block.height + 1) % self.constants["DIFFICULTY_EPOCH"] == self.constants[
"DIFFICULTY_DELAY"
]:
if (block.height + 1) % self.constants.DIFFICULTY_EPOCH == self.constants.DIFFICULTY_DELAY:
new_difficulty = get_next_difficulty(
self.constants, self.headers, self.height_to_hash, block.header
)
@ -366,7 +364,7 @@ class Blockchain:
removed: Optional[Header] = None
if len(self.tips) == 0 or block.weight > min([b.weight for b in self.tips]):
self.tips.append(block)
while len(self.tips) > self.constants["NUMBER_OF_HEADS"]:
while len(self.tips) > self.constants.NUMBER_OF_HEADS:
self.tips.sort(key=lambda b: b.weight, reverse=True)
# This will loop only once
removed = self.tips.pop()
@ -643,11 +641,11 @@ class Blockchain:
return Err.UNKNOWN
# Get List of names removed, puzzles hashes for removed coins and conditions crated
error, npc_list, cost = calculate_cost_of_program(
block.transactions_generator, self.constants["CLVM_COST_RATIO_CONSTANT"]
block.transactions_generator, self.constants.CLVM_COST_RATIO_CONSTANT
)
# 2. Check that cost <= MAX_BLOCK_COST_CLVM
if cost > self.constants["MAX_BLOCK_COST_CLVM"]:
if cost > self.constants.MAX_BLOCK_COST_CLVM:
return Err.BLOCK_COST_EXCEEDS_MAX
if error:
return error
@ -669,7 +667,7 @@ class Blockchain:
# Check additions for max coin amount
for coin in additions:
additions_dic[coin.name()] = coin
if coin.amount >= uint64.from_bytes(self.constants["MAX_COIN_AMOUNT"]):
if coin.amount >= uint64.from_bytes(self.constants.MAX_COIN_AMOUNT):
return Err.COIN_AMOUNT_EXCEEDS_MAXIMUM
# Validate addition and removal roots

View File

@ -26,14 +26,14 @@ def get_next_difficulty(
"""
next_height: uint32 = uint32(block.height + 1)
if next_height < constants["DIFFICULTY_EPOCH"]:
if next_height < constants.DIFFICULTY_EPOCH:
# We are in the first epoch
return uint64(constants["DIFFICULTY_STARTING"])
return uint64(constants.DIFFICULTY_STARTING)
# Epochs are diffined as intervals of DIFFICULTY_EPOCH blocks, inclusive and indexed at 0.
# For example, [0-2047], [2048-4095], etc. The difficulty changes DIFFICULTY_DELAY into the
# epoch, as opposed to the first block (as in Bitcoin).
elif next_height % constants["DIFFICULTY_EPOCH"] != constants["DIFFICULTY_DELAY"]:
elif next_height % constants.DIFFICULTY_EPOCH != constants.DIFFICULTY_DELAY:
# Not at a point where difficulty would change
prev_block: Header = headers[block.prev_header_hash]
return uint64(block.weight - prev_block.weight)
@ -43,12 +43,12 @@ def get_next_difficulty(
# h1 h2 h3 i-1
# Height1 is the last block 2 epochs ago, so we can include the time to mine 1st block in previous epoch
height1 = uint32(
next_height - constants["DIFFICULTY_EPOCH"] - constants["DIFFICULTY_DELAY"] - 1
next_height - constants.DIFFICULTY_EPOCH - constants.DIFFICULTY_DELAY - 1
)
# Height2 is the DIFFICULTY DELAYth block in the previous epoch
height2 = uint32(next_height - constants["DIFFICULTY_EPOCH"] - 1)
height2 = uint32(next_height - constants.DIFFICULTY_EPOCH - 1)
# Height3 is the last block in the previous epoch
height3 = uint32(next_height - constants["DIFFICULTY_DELAY"] - 1)
height3 = uint32(next_height - constants.DIFFICULTY_DELAY - 1)
# h1 to h2 timestamps are mined on previous difficulty, while and h2 to h3 timestamps are mined on the
# current difficulty
@ -95,33 +95,33 @@ def get_next_difficulty(
timestamp1 = int(block1.data.timestamp) # i - 512 - 1
else:
# In the case of height == -1, there is no timestamp here, so assume the genesis block
# took constants["BLOCK_TIME_TARGET"] seconds to mine.
# took constants.BLOCK_TIME_TARGET seconds to mine.
genesis = headers[height_to_hash[uint32(0)]]
timestamp1 = genesis.data.timestamp - constants["BLOCK_TIME_TARGET"]
timestamp1 = genesis.data.timestamp - constants.BLOCK_TIME_TARGET
timestamp2 = block2.data.timestamp # i - 2048 + 512 - 1
timestamp3 = block3.data.timestamp # i - 512 - 1
# Numerator fits in 128 bits, so big int is not necessary
# We multiply by the denominators here, so we only have one fraction in the end (avoiding floating point)
term1 = (
constants["DIFFICULTY_DELAY"]
constants.DIFFICULTY_DELAY
* Tp
* (timestamp3 - timestamp2)
* constants["BLOCK_TIME_TARGET"]
* constants.BLOCK_TIME_TARGET
)
term2 = (
(constants["DIFFICULTY_WARP_FACTOR"] - 1)
* (constants["DIFFICULTY_EPOCH"] - constants["DIFFICULTY_DELAY"])
(constants.DIFFICULTY_WARP_FACTOR - 1)
* (constants.DIFFICULTY_EPOCH - constants.DIFFICULTY_DELAY)
* Tc
* (timestamp2 - timestamp1)
* constants["BLOCK_TIME_TARGET"]
* constants.BLOCK_TIME_TARGET
)
# Round down after the division
new_difficulty_precise: uint64 = uint64(
(term1 + term2)
// (
constants["DIFFICULTY_WARP_FACTOR"]
constants.DIFFICULTY_WARP_FACTOR
* (timestamp3 - timestamp2)
* (timestamp2 - timestamp1)
)
@ -129,20 +129,20 @@ def get_next_difficulty(
# Take only DIFFICULTY_SIGNIFICANT_BITS significant bits
new_difficulty = uint64(
truncate_to_significant_bits(
new_difficulty_precise, constants["SIGNIFICANT_BITS"]
new_difficulty_precise, constants.SIGNIFICANT_BITS
)
)
assert count_significant_bits(new_difficulty) <= constants["SIGNIFICANT_BITS"]
assert count_significant_bits(new_difficulty) <= constants.SIGNIFICANT_BITS
# Only change by a max factor, to prevent attacks, as in greenpaper, and must be at least 1
max_diff = uint64(
truncate_to_significant_bits(
constants["DIFFICULTY_FACTOR"] * Tc, constants["SIGNIFICANT_BITS"],
constants.DIFFICULTY_FACTOR * Tc, constants.SIGNIFICANT_BITS,
)
)
min_diff = uint64(
truncate_to_significant_bits(
Tc // constants["DIFFICULTY_FACTOR"], constants["SIGNIFICANT_BITS"],
Tc // constants.DIFFICULTY_FACTOR, constants.SIGNIFICANT_BITS,
)
)
if new_difficulty >= Tc:
@ -162,9 +162,9 @@ def get_next_min_iters(
the number of iterations of the last epoch, and changes at the same block as the difficulty.
"""
next_height: uint32 = uint32(block.height + 1)
if next_height < constants["DIFFICULTY_EPOCH"]:
if next_height < constants.DIFFICULTY_EPOCH:
# First epoch has a hardcoded vdf speed
return constants["MIN_ITERS_STARTING"]
return constants.MIN_ITERS_STARTING
prev_block_header: Header = headers[block.prev_header_hash]
@ -179,10 +179,10 @@ def get_next_min_iters(
proof_of_space,
difficulty,
iterations,
constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"],
constants.NUMBER_ZERO_BITS_CHALLENGE_SIG,
)
if next_height % constants["DIFFICULTY_EPOCH"] != constants["DIFFICULTY_DELAY"]:
if next_height % constants.DIFFICULTY_EPOCH != constants.DIFFICULTY_DELAY:
# Not at a point where ips would change, so return the previous ips
# TODO: cache this for efficiency
return prev_min_iters
@ -194,10 +194,10 @@ def get_next_min_iters(
# Height1 is the last block 2 epochs ago, so we can include the iterations taken for mining first block in epoch
height1 = uint32(
next_height - constants["DIFFICULTY_EPOCH"] - constants["DIFFICULTY_DELAY"] - 1
next_height - constants.DIFFICULTY_EPOCH - constants.DIFFICULTY_DELAY - 1
)
# Height2 is the last block in the previous epoch
height2 = uint32(next_height - constants["DIFFICULTY_DELAY"] - 1)
height2 = uint32(next_height - constants.DIFFICULTY_DELAY - 1)
block1: Optional[Header] = None
block2: Optional[Header] = None
@ -235,10 +235,10 @@ def get_next_min_iters(
min_iters_precise = uint64(
(iters2 - iters1)
// (constants["DIFFICULTY_EPOCH"] * constants["MIN_ITERS_PROPORTION"])
// (constants.DIFFICULTY_EPOCH * constants.MIN_ITERS_PROPORTION)
)
min_iters = uint64(
truncate_to_significant_bits(min_iters_precise, constants["SIGNIFICANT_BITS"])
truncate_to_significant_bits(min_iters_precise, constants.SIGNIFICANT_BITS)
)
assert count_significant_bits(min_iters) <= constants["SIGNIFICANT_BITS"]
assert count_significant_bits(min_iters) <= constants.SIGNIFICANT_BITS
return min_iters

View File

@ -650,7 +650,9 @@ class FullNode:
[new_proof_of_time.height]
)
if new_proof_of_time.height not in self.blockchain.height_to_hash:
self.log.error(f"Height {new_proof_of_time.height} not found in height_to_hash.")
self.log.error(
f"Height {new_proof_of_time.height} not found in height_to_hash."
)
return
header_hash = self.blockchain.height_to_hash[new_proof_of_time.height]
for block in blocks:
@ -813,7 +815,7 @@ class FullNode:
if height is None:
self.log.info("No block for compact proof of time.")
return
if not proof.is_valid(self.constants["DISCRIMINANT_SIZE_BITS"]):
if not proof.is_valid(self.constants.DISCRIMINANT_SIZE_BITS):
self.log.error("Invalid compact proof of time.")
return
@ -1063,7 +1065,7 @@ class FullNode:
)
)
if expected_time > self.constants["PROPAGATION_DELAY_THRESHOLD"]:
if expected_time > self.constants.PROPAGATION_DELAY_THRESHOLD:
self.log.info(f"Block is slow, expected {expected_time} seconds, waiting")
# If this block is slow, sleep to allow faster blocks to come out first
await asyncio.sleep(5)
@ -1080,7 +1082,7 @@ class FullNode:
(block.height, expected_time)
)
elif block.height == leader[0]:
if expected_time > leader[1] + self.constants["PROPAGATION_THRESHOLD"]:
if expected_time > leader[1] + self.constants.PROPAGATION_THRESHOLD:
# If VDF is expected to finish X seconds later than the best, don't propagate
self.log.info(
f"VDF will finish too late {expected_time} seconds, so don't propagate"
@ -1303,7 +1305,7 @@ class FullNode:
request.proof_of_space,
difficulty,
vdf_min_iters,
self.constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"],
self.constants.NUMBER_ZERO_BITS_CHALLENGE_SIG,
)
removal_merkle_set = MerkleSet()
@ -1835,13 +1837,13 @@ class FullNode:
difficulty_update: Optional[uint64] = None
iters_update: Optional[uint64] = None
if (
curr.height % self.constants["DIFFICULTY_EPOCH"]
== self.constants["DIFFICULTY_DELAY"]
curr.height % self.constants.DIFFICULTY_EPOCH
== self.constants.DIFFICULTY_DELAY
):
difficulty_update = self.blockchain.get_next_difficulty(
self.blockchain.headers[curr.prev_header_hash]
)
if (curr.height + 1) % self.constants["DIFFICULTY_EPOCH"] == 0:
if (curr.height + 1) % self.constants.DIFFICULTY_EPOCH == 0:
iters_update = curr.data.total_iters
hashes.append(
(proof_hashes_map[curr.header_hash], difficulty_update, iters_update)

View File

@ -49,15 +49,15 @@ class MempoolManager:
] = SortedDict() # pylint: disable=E1136
self.coin_store = coin_store
tx_per_sec = self.constants["TX_PER_SEC"]
sec_per_block = self.constants["BLOCK_TIME_TARGET"]
block_buffer_count = self.constants["MEMPOOL_BLOCK_BUFFER"]
tx_per_sec = self.constants.TX_PER_SEC
sec_per_block = self.constants.BLOCK_TIME_TARGET
block_buffer_count = self.constants.MEMPOOL_BLOCK_BUFFER
# MEMPOOL_SIZE = 60000
self.mempool_size = tx_per_sec * sec_per_block * block_buffer_count
self.potential_cache_size = 300
self.seen_cache_size = 10000
self.coinbase_freeze = self.constants["COINBASE_FREEZE_PERIOD"]
self.coinbase_freeze = self.constants.COINBASE_FREEZE_PERIOD
async def create_bundle_for_tip(self, header: Header) -> Optional[SpendBundle]:
"""
@ -69,7 +69,7 @@ class MempoolManager:
spend_bundles: List[SpendBundle] = []
for dic in mempool.sorted_spends.values():
for item in dic.values():
if item.cost + cost_sum <= self.constants["MAX_BLOCK_COST_CLVM"]:
if item.cost + cost_sum <= self.constants.MAX_BLOCK_COST_CLVM:
spend_bundles.append(item.spend_bundle)
cost_sum += item.cost
else:
@ -143,7 +143,7 @@ class MempoolManager:
# Check additions for max coin amount
for coin in additions:
if coin.amount >= uint64.from_bytes(self.constants["MAX_COIN_AMOUNT"]):
if coin.amount >= uint64.from_bytes(self.constants.MAX_COIN_AMOUNT):
return (
None,
MempoolInclusionStatus.FAILED,

View File

@ -62,7 +62,7 @@ class FullNodeRpcApi:
if lca_block is None:
return None
min_iters: uint64 = self.service.blockchain.get_next_min_iters(lca_block)
ips: uint64 = (
ips: uint64 = uint64(
min_iters
* self.service.constants.MIN_ITERS_PROPORTION
// self.service.constants.BLOCK_TIME_TARGET
@ -257,14 +257,14 @@ class FullNodeRpcApi:
older_block.proof_of_space,
older_diff,
older_block.proof_of_time.number_of_iterations,
self.service.constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"],
self.service.constants.NUMBER_ZERO_BITS_CHALLENGE_SIG,
)
# We do not count the min iters in the old block, since it's not included in the range
total_mi: uint64 = uint64(0)
for curr_h in range(older_block.height + 1, newer_block.height + 1):
if (
curr_h % self.service.constants["DIFFICULTY_EPOCH"]
) == self.service.constants["DIFFICULTY_DELAY"]:
curr_h % self.service.constants.DIFFICULTY_EPOCH
) == self.service.constants.DIFFICULTY_DELAY:
curr_b_header_hash = self.service.blockchain.height_to_hash.get(
uint32(int(curr_h))
)
@ -285,7 +285,7 @@ class FullNodeRpcApi:
curr_b_block.proof_of_space,
uint64(curr_diff),
curr_b_block.proof_of_time.number_of_iterations,
self.service.constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"],
self.service.constants.NUMBER_ZERO_BITS_CHALLENGE_SIG,
)
if curr_mi is None:
raise web.HTTPBadRequest()
@ -330,7 +330,7 @@ class FullNodeRpcApi:
tips_adjustment_constant = 0.65
network_space_constant = 2 ** 32 # 2^32
eligible_plots_filter_mult = (
2 ** self.service.constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"]
2 ** self.service.constants.NUMBER_ZERO_BITS_CHALLENGE_SIG
)
network_space_bytes_estimate = (
weight_div_iters

View File

@ -19,7 +19,7 @@ def service_kwargs_for_timelord(root_path):
PeerInfo(config["full_node_peer"]["host"], config["full_node_peer"]["port"])
]
api = Timelord(config, constants["DISCRIMINANT_SIZE_BITS"])
api = Timelord(config, constants.DISCRIMINANT_SIZE_BITS)
async def start_callback():
await api._start()

View File

@ -197,54 +197,54 @@ class BlockTools:
if transaction_data_at_height is None:
transaction_data_at_height = {}
if seconds_per_block is None:
seconds_per_block = test_constants["BLOCK_TIME_TARGET"]
seconds_per_block = test_constants.BLOCK_TIME_TARGET
if len(block_list) == 0:
block_list.append(FullBlock.from_bytes(test_constants["GENESIS_BLOCK"]))
prev_difficulty = test_constants["DIFFICULTY_STARTING"]
block_list.append(FullBlock.from_bytes(test_constants.GENESIS_BLOCK))
prev_difficulty = test_constants.DIFFICULTY_STARTING
curr_difficulty = prev_difficulty
curr_min_iters = test_constants["MIN_ITERS_STARTING"]
curr_min_iters = test_constants.MIN_ITERS_STARTING
elif len(block_list) < (
test_constants["DIFFICULTY_EPOCH"] + test_constants["DIFFICULTY_DELAY"]
test_constants.DIFFICULTY_EPOCH + test_constants.DIFFICULTY_DELAY
):
# First epoch (+delay), so just get first difficulty
prev_difficulty = block_list[0].weight
curr_difficulty = block_list[0].weight
assert test_constants["DIFFICULTY_STARTING"] == prev_difficulty
curr_min_iters = test_constants["MIN_ITERS_STARTING"]
assert test_constants.DIFFICULTY_STARTING == prev_difficulty
curr_min_iters = test_constants.MIN_ITERS_STARTING
else:
curr_difficulty = block_list[-1].weight - block_list[-2].weight
prev_difficulty = (
block_list[-1 - test_constants["DIFFICULTY_EPOCH"]].weight
- block_list[-2 - test_constants["DIFFICULTY_EPOCH"]].weight
block_list[-1 - test_constants.DIFFICULTY_EPOCH].weight
- block_list[-2 - test_constants.DIFFICULTY_EPOCH].weight
)
assert block_list[-1].proof_of_time is not None
curr_min_iters = calculate_min_iters_from_iterations(
block_list[-1].proof_of_space,
curr_difficulty,
block_list[-1].proof_of_time.number_of_iterations,
test_constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"],
test_constants.NUMBER_ZERO_BITS_CHALLENGE_SIG,
)
starting_height = block_list[-1].height + 1
timestamp = block_list[-1].header.data.timestamp
for next_height in range(starting_height, starting_height + num_blocks):
if (
next_height > test_constants["DIFFICULTY_EPOCH"]
and next_height % test_constants["DIFFICULTY_EPOCH"]
== test_constants["DIFFICULTY_DELAY"]
next_height > test_constants.DIFFICULTY_EPOCH
and next_height % test_constants.DIFFICULTY_EPOCH
== test_constants.DIFFICULTY_DELAY
):
# Calculates new difficulty
height1 = uint64(
next_height
- (
test_constants["DIFFICULTY_EPOCH"]
+ test_constants["DIFFICULTY_DELAY"]
test_constants.DIFFICULTY_EPOCH
+ test_constants.DIFFICULTY_DELAY
)
- 1
)
height2 = uint64(next_height - (test_constants["DIFFICULTY_EPOCH"]) - 1)
height3 = uint64(next_height - (test_constants["DIFFICULTY_DELAY"]) - 1)
height2 = uint64(next_height - (test_constants.DIFFICULTY_EPOCH) - 1)
height3 = uint64(next_height - (test_constants.DIFFICULTY_DELAY) - 1)
if height1 >= 0:
block1 = block_list[height1]
iters1 = block1.header.data.total_iters
@ -253,7 +253,7 @@ class BlockTools:
block1 = block_list[0]
timestamp1 = uint64(
block1.header.data.timestamp
- test_constants["BLOCK_TIME_TARGET"]
- test_constants.BLOCK_TIME_TARGET
)
iters1 = uint64(0)
timestamp2 = block_list[height2].header.data.timestamp
@ -262,47 +262,47 @@ class BlockTools:
block3 = block_list[height3]
iters3 = block3.header.data.total_iters
term1 = (
test_constants["DIFFICULTY_DELAY"]
test_constants.DIFFICULTY_DELAY
* prev_difficulty
* (timestamp3 - timestamp2)
* test_constants["BLOCK_TIME_TARGET"]
* test_constants.BLOCK_TIME_TARGET
)
term2 = (
(test_constants["DIFFICULTY_WARP_FACTOR"] - 1)
(test_constants.DIFFICULTY_WARP_FACTOR - 1)
* (
test_constants["DIFFICULTY_EPOCH"]
- test_constants["DIFFICULTY_DELAY"]
test_constants.DIFFICULTY_EPOCH
- test_constants.DIFFICULTY_DELAY
)
* curr_difficulty
* (timestamp2 - timestamp1)
* test_constants["BLOCK_TIME_TARGET"]
* test_constants.BLOCK_TIME_TARGET
)
# Round down after the division
new_difficulty_precise: uint64 = uint64(
(term1 + term2)
// (
test_constants["DIFFICULTY_WARP_FACTOR"]
test_constants.DIFFICULTY_WARP_FACTOR
* (timestamp3 - timestamp2)
* (timestamp2 - timestamp1)
)
)
new_difficulty = uint64(
truncate_to_significant_bits(
new_difficulty_precise, test_constants["SIGNIFICANT_BITS"]
new_difficulty_precise, test_constants.SIGNIFICANT_BITS
)
)
max_diff = uint64(
truncate_to_significant_bits(
test_constants["DIFFICULTY_FACTOR"] * curr_difficulty,
test_constants["SIGNIFICANT_BITS"],
test_constants.DIFFICULTY_FACTOR * curr_difficulty,
test_constants.SIGNIFICANT_BITS,
)
)
min_diff = uint64(
truncate_to_significant_bits(
curr_difficulty // test_constants["DIFFICULTY_FACTOR"],
test_constants["SIGNIFICANT_BITS"],
curr_difficulty // test_constants.DIFFICULTY_FACTOR,
test_constants.SIGNIFICANT_BITS,
)
)
if new_difficulty >= curr_difficulty:
@ -313,13 +313,13 @@ class BlockTools:
min_iters_precise = uint64(
(iters3 - iters1)
// (
test_constants["DIFFICULTY_EPOCH"]
* test_constants["MIN_ITERS_PROPORTION"]
test_constants.DIFFICULTY_EPOCH
* test_constants.MIN_ITERS_PROPORTION
)
)
curr_min_iters = uint64(
truncate_to_significant_bits(
min_iters_precise, test_constants["SIGNIFICANT_BITS"]
min_iters_precise, test_constants.SIGNIFICANT_BITS
)
)
prev_difficulty = curr_difficulty
@ -333,8 +333,8 @@ class BlockTools:
transactions, aggsig = transaction_data_at_height[next_height]
update_difficulty = (
next_height % test_constants["DIFFICULTY_EPOCH"]
== test_constants["DIFFICULTY_DELAY"]
next_height % test_constants.DIFFICULTY_EPOCH
== test_constants.DIFFICULTY_DELAY
)
block_list.append(
self.create_next_block(
@ -371,8 +371,8 @@ class BlockTools:
uint64(0),
uint128(0),
uint64(int(time.time())),
uint64(test_constants["DIFFICULTY_STARTING"]),
uint64(test_constants["MIN_ITERS_STARTING"]),
uint64(test_constants.DIFFICULTY_STARTING),
uint64(test_constants.MIN_ITERS_STARTING),
seed,
True,
reward_puzzlehash,
@ -473,7 +473,7 @@ class BlockTools:
ccp = ProofOfSpace.can_create_proof(
plot_id,
challenge_hash,
test_constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"],
test_constants.NUMBER_ZERO_BITS_CHALLENGE_SIG,
)
if not ccp:
continue
@ -490,7 +490,7 @@ class BlockTools:
ccp = ProofOfSpace.can_create_proof(
plot_id,
challenge_hash,
test_constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"],
test_constants.NUMBER_ZERO_BITS_CHALLENGE_SIG,
)
if not ccp:
continue
@ -527,15 +527,15 @@ class BlockTools:
proof_of_space,
difficulty,
min_iters,
test_constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"],
test_constants.NUMBER_ZERO_BITS_CHALLENGE_SIG,
)
if self.real_plots:
print(f"Performing {number_iters} VDF iterations")
int_size = (test_constants["DISCRIMINANT_SIZE_BITS"] + 16) >> 4
int_size = (test_constants.DISCRIMINANT_SIZE_BITS + 16) >> 4
result = prove(
challenge_hash, test_constants["DISCRIMINANT_SIZE_BITS"], number_iters
challenge_hash, test_constants.DISCRIMINANT_SIZE_BITS, number_iters
)
output = ClassgroupElement(

View File

@ -361,7 +361,7 @@ class WalletNode:
self.header_hashes_error = False
self.proof_hashes = []
self.potential_header_hashes = {}
genesis = FullBlock.from_bytes(self.constants["GENESIS_BLOCK"])
genesis = FullBlock.from_bytes(self.constants.GENESIS_BLOCK)
genesis_challenge = genesis.proof_of_space.challenge_hash
request_header_hashes = wallet_protocol.RequestAllHeaderHashesAfter(
uint32(0), genesis_challenge
@ -542,7 +542,7 @@ class WalletNode:
tip_height + 1,
)
if fork_point_height == 0:
difficulty = self.constants["DIFFICULTY_STARTING"]
difficulty = self.constants.DIFFICULTY_STARTING
else:
fork_point_parent_hash = self.wallet_state_manager.block_records[
fork_point_hash

View File

@ -122,7 +122,7 @@ class WalletStateManager:
self.sync_mode = False
self.height_to_hash = {}
self.block_records = await self.wallet_store.get_lca_path()
genesis = FullBlock.from_bytes(self.constants["GENESIS_BLOCK"])
genesis = FullBlock.from_bytes(self.constants.GENESIS_BLOCK)
self.genesis = genesis
self.state_changed_callback = None
self.pending_tx_callback = None
@ -405,7 +405,7 @@ class WalletStateManager:
async def get_frozen_balance(self, wallet_id: int) -> uint64:
current_index = self.block_records[self.lca].height
coinbase_freeze_period = self.constants["COINBASE_FREEZE_PERIOD"]
coinbase_freeze_period = self.constants.COINBASE_FREEZE_PERIOD
valid_index = current_index - coinbase_freeze_period
@ -709,15 +709,13 @@ class WalletStateManager:
if header_block is not None:
if not await self.validate_header_block(block, header_block):
return ReceiveBlockResult.INVALID_BLOCK
if (block.height + 1) % self.constants[
"DIFFICULTY_EPOCH"
] == self.constants["DIFFICULTY_DELAY"]:
if (block.height + 1) % self.constants.DIFFICULTY_EPOCH == self.constants.DIFFICULTY_DELAY:
assert header_block.challenge.new_work_difficulty is not None
self.difficulty_resets_prev[
block.header_hash
] = header_block.challenge.new_work_difficulty
if (block.height + 1) % self.constants["DIFFICULTY_EPOCH"] == 0:
if (block.height + 1) % self.constants.DIFFICULTY_EPOCH == 0:
assert block.total_iters is not None
# Block is valid, so add it to the blockchain
@ -792,26 +790,26 @@ class WalletStateManager:
curr = block_record
if (
curr.height
< self.constants["DIFFICULTY_EPOCH"] + self.constants["DIFFICULTY_DELAY"]
< self.constants.DIFFICULTY_EPOCH + self.constants.DIFFICULTY_DELAY
):
return self.constants["MIN_ITERS_STARTING"]
return self.constants.MIN_ITERS_STARTING
if (
curr.height % self.constants["DIFFICULTY_EPOCH"]
< self.constants["DIFFICULTY_DELAY"]
curr.height % self.constants.DIFFICULTY_EPOCH
< self.constants.DIFFICULTY_DELAY
):
# First few blocks of epoch (using old difficulty and min_iters)
height2 = (
curr.height
- (curr.height % self.constants["DIFFICULTY_EPOCH"])
- self.constants["DIFFICULTY_EPOCH"]
- (curr.height % self.constants.DIFFICULTY_EPOCH)
- self.constants.DIFFICULTY_EPOCH
- 1
)
else:
# The rest of the blocks of epoch (using new difficulty and min iters)
height2 = (
curr.height - (curr.height % self.constants["DIFFICULTY_EPOCH"]) - 1
curr.height - (curr.height % self.constants.DIFFICULTY_EPOCH) - 1
)
height1 = height2 - self.constants["DIFFICULTY_EPOCH"]
height1 = height2 - self.constants.DIFFICULTY_EPOCH
assert height2 > 0
iters1: Optional[uint64] = uint64(0)
@ -827,14 +825,14 @@ class WalletStateManager:
min_iters_precise = uint64(
(iters2 - iters1)
// (
self.constants["DIFFICULTY_EPOCH"]
* self.constants["MIN_ITERS_PROPORTION"]
self.constants.DIFFICULTY_EPOCH
* self.constants.MIN_ITERS_PROPORTION
)
)
# Truncates to only 12 bits plus 0s. This prevents grinding attacks.
return uint64(
truncate_to_significant_bits(
min_iters_precise, self.constants["SIGNIFICANT_BITS"]
min_iters_precise, self.constants.SIGNIFICANT_BITS
)
)
@ -871,7 +869,7 @@ class WalletStateManager:
quality_str: Optional[
bytes32
] = header_block.proof_of_space.verify_and_get_quality_string(
self.constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"]
self.constants.NUMBER_ZERO_BITS_CHALLENGE_SIG
)
if quality_str is None:
return False
@ -880,8 +878,8 @@ class WalletStateManager:
min_iters: uint64 = self.get_min_iters(br)
prev_block: Optional[BlockRecord]
if (
br.height % self.constants["DIFFICULTY_EPOCH"]
!= self.constants["DIFFICULTY_DELAY"]
br.height % self.constants.DIFFICULTY_EPOCH
!= self.constants.DIFFICULTY_DELAY
):
# Only allow difficulty changes once per epoch
if br.height > 1:
@ -898,7 +896,7 @@ class WalletStateManager:
assert difficulty == prev_block.weight
else:
difficulty = uint64(br.weight)
assert difficulty == self.constants["DIFFICULTY_STARTING"]
assert difficulty == self.constants.DIFFICULTY_STARTING
else:
# This is a difficulty change, so check whether it's within the allowed range.
# (But don't check whether it's the right amount).
@ -916,14 +914,14 @@ class WalletStateManager:
max_diff = uint64(
truncate_to_significant_bits(
prev_difficulty * self.constants["DIFFICULTY_FACTOR"],
self.constants["SIGNIFICANT_BITS"],
prev_difficulty * self.constants.DIFFICULTY_FACTOR,
self.constants.SIGNIFICANT_BITS,
)
)
min_diff = uint64(
truncate_to_significant_bits(
prev_difficulty // self.constants["DIFFICULTY_FACTOR"],
self.constants["SIGNIFICANT_BITS"],
prev_difficulty // self.constants.DIFFICULTY_FACTOR,
self.constants.SIGNIFICANT_BITS,
)
)
@ -942,7 +940,7 @@ class WalletStateManager:
# Check PoT
if not header_block.proof_of_time.is_valid(
self.constants["DISCRIMINANT_SIZE_BITS"]
self.constants.DISCRIMINANT_SIZE_BITS
):
return False
@ -983,7 +981,7 @@ class WalletStateManager:
# Check that block is not far in the future
if (
header_block.header.data.timestamp
> time.time() + self.constants["MAX_FUTURE_TIME"]
> time.time() + self.constants.MAX_FUTURE_TIME
):
return False
@ -1044,8 +1042,8 @@ class WalletStateManager:
):
return False
if (
height % self.constants["DIFFICULTY_EPOCH"]
== self.constants["DIFFICULTY_DELAY"]
height % self.constants.DIFFICULTY_EPOCH
== self.constants.DIFFICULTY_DELAY
):
diff_change = all_proof_hashes[height][1]
assert diff_change is not None
@ -1078,22 +1076,22 @@ class WalletStateManager:
# Get difficulty
if (
height % self.constants["DIFFICULTY_EPOCH"]
< self.constants["DIFFICULTY_DELAY"]
height % self.constants.DIFFICULTY_EPOCH
< self.constants.DIFFICULTY_DELAY
):
diff_height = (
height
- (height % self.constants["DIFFICULTY_EPOCH"])
- (height % self.constants.DIFFICULTY_EPOCH)
- (
self.constants["DIFFICULTY_EPOCH"]
- self.constants["DIFFICULTY_DELAY"]
self.constants.DIFFICULTY_EPOCH
- self.constants.DIFFICULTY_DELAY
)
)
else:
diff_height = (
height
- (height % self.constants["DIFFICULTY_EPOCH"])
+ self.constants["DIFFICULTY_DELAY"]
- (height % self.constants.DIFFICULTY_EPOCH)
+ self.constants.DIFFICULTY_DELAY
)
difficulty = all_proof_hashes[diff_height][1]
@ -1101,31 +1099,31 @@ class WalletStateManager:
# Validate pospace to get iters
quality_str = header_block.proof_of_space.verify_and_get_quality_string(
self.constants["NUMBER_ZERO_BITS_CHALLENGE_SIG"]
self.constants.NUMBER_ZERO_BITS_CHALLENGE_SIG
)
assert quality_str is not None
if (
height
< self.constants["DIFFICULTY_EPOCH"]
+ self.constants["DIFFICULTY_DELAY"]
< self.constants.DIFFICULTY_EPOCH
+ self.constants.DIFFICULTY_DELAY
):
min_iters = self.constants["MIN_ITERS_STARTING"]
min_iters = self.constants.MIN_ITERS_STARTING
else:
if (
height % self.constants["DIFFICULTY_EPOCH"]
< self.constants["DIFFICULTY_DELAY"]
height % self.constants.DIFFICULTY_EPOCH
< self.constants.DIFFICULTY_DELAY
):
height2 = (
height
- (height % self.constants["DIFFICULTY_EPOCH"])
- self.constants["DIFFICULTY_EPOCH"]
- (height % self.constants.DIFFICULTY_EPOCH)
- self.constants.DIFFICULTY_EPOCH
- 1
)
else:
height2 = height - (height % self.constants["DIFFICULTY_EPOCH"]) - 1
height2 = height - (height % self.constants.DIFFICULTY_EPOCH) - 1
height1 = height2 - self.constants["DIFFICULTY_EPOCH"]
height1 = height2 - self.constants.DIFFICULTY_EPOCH
if height1 == -1:
iters1 = uint64(0)
else:
@ -1137,8 +1135,8 @@ class WalletStateManager:
min_iters = uint64(
(iters2 - iters1)
// (
self.constants["DIFFICULTY_EPOCH"]
* self.constants["MIN_ITERS_PROPORTION"]
self.constants.DIFFICULTY_EPOCH
* self.constants.MIN_ITERS_PROPORTION
)
)
@ -1151,7 +1149,7 @@ class WalletStateManager:
return False
if not header_block.proof_of_time.is_valid(
self.constants["DISCRIMINANT_SIZE_BITS"]
self.constants.DISCRIMINANT_SIZE_BITS
):
return False
@ -1367,7 +1365,7 @@ class WalletStateManager:
current_index = self.block_records[self.lca].height
coinbase_freeze_period = self.constants["COINBASE_FREEZE_PERIOD"]
coinbase_freeze_period = self.constants.COINBASE_FREEZE_PERIOD
if current_index <= coinbase_freeze_period:
return set()

View File

@ -44,7 +44,7 @@ class TestBlockStore:
await BlockStore.create(connection_2)
db_3 = await BlockStore.create(connection_3)
try:
genesis = FullBlock.from_bytes(test_constants["GENESIS_BLOCK"])
genesis = FullBlock.from_bytes(test_constants.GENESIS_BLOCK)
# Save/get block
for block in blocks:

View File

@ -550,8 +550,8 @@ class TestBlockValidation:
assert diff_6 == diff_7
assert diff_8 > diff_7
assert (diff_8 / diff_7) <= test_constants["DIFFICULTY_FACTOR"]
assert (b.get_next_min_iters(blocks[1])) == test_constants["MIN_ITERS_STARTING"]
assert (diff_8 / diff_7) <= test_constants.DIFFICULTY_FACTOR
assert (b.get_next_min_iters(blocks[1])) == test_constants.MIN_ITERS_STARTING
assert (b.get_next_min_iters(blocks[6])) == (b.get_next_min_iters(blocks[5]))
assert (b.get_next_min_iters(blocks[7])) > (b.get_next_min_iters(blocks[6]))
assert (b.get_next_min_iters(blocks[8])) == (b.get_next_min_iters(blocks[7]))

View File

@ -798,7 +798,7 @@ class TestWalletProtocol:
100, wallet_a.get_new_puzzlehash(), blocks_new[-1].get_coinbase(),
)
spend_bundle_bad = wallet_a.generate_signed_transaction(
uint64.from_bytes(constants["MAX_COIN_AMOUNT"]),
uint64.from_bytes(constants.MAX_COIN_AMOUNT),
wallet_a.get_new_puzzlehash(),
blocks_new[-1].get_coinbase(),
)
@ -866,15 +866,15 @@ class TestWalletProtocol:
assert len(hashes) >= len(blocks_list) - 2
for i in range(len(hashes)):
if (
i % test_constants["DIFFICULTY_EPOCH"]
== test_constants["DIFFICULTY_DELAY"]
i % test_constants.DIFFICULTY_EPOCH
== test_constants.DIFFICULTY_DELAY
):
assert hashes[i][1] is not None
elif i > 0:
assert hashes[i][1] is None
if (
i % test_constants["DIFFICULTY_EPOCH"]
== test_constants["DIFFICULTY_EPOCH"] - 1
i % test_constants.DIFFICULTY_EPOCH
== test_constants.DIFFICULTY_EPOCH - 1
):
assert hashes[i][2] is not None
else:

View File

@ -28,7 +28,7 @@ class TestStore:
for sync_mode in (False, True):
db.set_sync_mode(sync_mode)
assert sync_mode == db.get_sync_mode()
FullBlock.from_bytes(test_constants["GENESIS_BLOCK"])
FullBlock.from_bytes(test_constants.GENESIS_BLOCK)
# clear sync info
await db.clear_sync_info()

View File

@ -363,7 +363,7 @@ async def setup_timelord(port, full_node_port, sanitizer, consensus_constants: C
if sanitizer:
config["vdf_server"]["port"] = 7999
api = Timelord(config, consensus_constants["DISCRIMINANT_SIZE_BITS"])
api = Timelord(config, consensus_constants.DISCRIMINANT_SIZE_BITS)
started = asyncio.Event()

View File

@ -32,7 +32,7 @@ class TestCostCalculation:
assert spend_bundle is not None
program = best_solution_program(spend_bundle)
ratio = test_constants["CLVM_COST_RATIO_CONSTANT"]
ratio = test_constants.CLVM_COST_RATIO_CONSTANT
error, npc_list, clvm_cost = calculate_cost_of_program(program, ratio)