mirror of https://github.com/poanetwork/quorum.git
Merge pull request #270 from getamis/feature/update_istanbul
consensus, core, eth, miner, params: update istanbul engine
This commit is contained in:
commit
bbc8e726e5
|
@ -99,7 +99,7 @@ type Engine interface {
|
||||||
// Handler should be implemented is the consensus needs to handle and send peer's message
|
// Handler should be implemented is the consensus needs to handle and send peer's message
|
||||||
type Handler interface {
|
type Handler interface {
|
||||||
// NewChainHead handles a new head block comes
|
// NewChainHead handles a new head block comes
|
||||||
NewChainHead(block *types.Block) error
|
NewChainHead() error
|
||||||
|
|
||||||
// HandleMsg handles a message from peer
|
// HandleMsg handles a message from peer
|
||||||
HandleMsg(address common.Address, data p2p.Msg) (bool, error)
|
HandleMsg(address common.Address, data p2p.Msg) (bool, error)
|
||||||
|
@ -121,7 +121,7 @@ type Istanbul interface {
|
||||||
Engine
|
Engine
|
||||||
|
|
||||||
// Start starts the engine
|
// Start starts the engine
|
||||||
Start(chain ChainReader, inserter func(types.Blocks) (int, error)) error
|
Start(chain ChainReader, currentBlock func() *types.Block, hasBadBlock func(hash common.Hash) bool) error
|
||||||
|
|
||||||
// Stop stops the engine
|
// Stop stops the engine
|
||||||
Stop() error
|
Stop() error
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package istanbul
|
package istanbul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
@ -57,4 +58,16 @@ type Backend interface {
|
||||||
|
|
||||||
// LastProposal retrieves latest committed proposal and the address of proposer
|
// LastProposal retrieves latest committed proposal and the address of proposer
|
||||||
LastProposal() (Proposal, common.Address)
|
LastProposal() (Proposal, common.Address)
|
||||||
|
|
||||||
|
// HasPropsal checks if the combination of the given hash and height matches any existing blocks
|
||||||
|
HasPropsal(hash common.Hash, number *big.Int) bool
|
||||||
|
|
||||||
|
// GetProposer returns the proposer of the given block height
|
||||||
|
GetProposer(number uint64) common.Address
|
||||||
|
|
||||||
|
// ParentValidators returns the validator set of the given proposal's parent block
|
||||||
|
ParentValidators(proposal Proposal) ValidatorSet
|
||||||
|
|
||||||
|
// HasBadBlock returns whether the block with the hash is a bad block
|
||||||
|
HasBadProposal(hash common.Hash) bool
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ package backend
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
|
"math/big"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -35,6 +36,11 @@ import (
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// fetcherID is the ID indicates the block is from Istanbul engine
|
||||||
|
fetcherID = "istanbul"
|
||||||
|
)
|
||||||
|
|
||||||
// New creates an Ethereum backend for Istanbul core engine.
|
// New creates an Ethereum backend for Istanbul core engine.
|
||||||
func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Database) consensus.Istanbul {
|
func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Database) consensus.Istanbul {
|
||||||
// Allocate the snapshot caches and create the engine
|
// Allocate the snapshot caches and create the engine
|
||||||
|
@ -70,14 +76,15 @@ type backend struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
db ethdb.Database
|
db ethdb.Database
|
||||||
chain consensus.ChainReader
|
chain consensus.ChainReader
|
||||||
inserter func(types.Blocks) (int, error)
|
currentBlock func() *types.Block
|
||||||
|
hasBadBlock func(hash common.Hash) bool
|
||||||
|
|
||||||
// the channels for istanbul engine notifications
|
// the channels for istanbul engine notifications
|
||||||
commitCh chan *types.Block
|
commitCh chan *types.Block
|
||||||
proposedBlockHash common.Hash
|
proposedBlockHash common.Hash
|
||||||
sealMu sync.Mutex
|
sealMu sync.Mutex
|
||||||
coreStarted bool
|
coreStarted bool
|
||||||
coreMu sync.Mutex
|
coreMu sync.RWMutex
|
||||||
|
|
||||||
// Current list of candidates we are pushing
|
// Current list of candidates we are pushing
|
||||||
candidates map[common.Address]bool
|
candidates map[common.Address]bool
|
||||||
|
@ -180,16 +187,11 @@ func (sb *backend) Commit(proposal istanbul.Proposal, seals [][]byte) error {
|
||||||
if sb.proposedBlockHash == block.Hash() {
|
if sb.proposedBlockHash == block.Hash() {
|
||||||
// feed block hash to Seal() and wait the Seal() result
|
// feed block hash to Seal() and wait the Seal() result
|
||||||
sb.commitCh <- block
|
sb.commitCh <- block
|
||||||
// TODO: how do we check the block is inserted correctly?
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// if I'm not a proposer, insert the block directly and broadcast NewCommittedEvent
|
|
||||||
if _, err := sb.inserter(types.Blocks{block}); err != nil && err != core.ErrKnownBlock {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if sb.broadcaster != nil {
|
if sb.broadcaster != nil {
|
||||||
go sb.broadcaster.BroadcastBlock(block, false)
|
sb.broadcaster.Enqueue(fetcherID, block)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -208,6 +210,22 @@ func (sb *backend) Verify(proposal istanbul.Proposal) (time.Duration, error) {
|
||||||
sb.logger.Error("Invalid proposal, %v", proposal)
|
sb.logger.Error("Invalid proposal, %v", proposal)
|
||||||
return 0, errInvalidProposal
|
return 0, errInvalidProposal
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check bad block
|
||||||
|
if sb.HasBadProposal(block.Hash()) {
|
||||||
|
return 0, core.ErrBlacklistedHash
|
||||||
|
}
|
||||||
|
|
||||||
|
// check block body
|
||||||
|
txnHash := types.DeriveSha(block.Transactions())
|
||||||
|
uncleHash := types.CalcUncleHash(block.Uncles())
|
||||||
|
if txnHash != block.Header().TxHash {
|
||||||
|
return 0, errMismatchTxhashes
|
||||||
|
}
|
||||||
|
if uncleHash != nilUncleHash {
|
||||||
|
return 0, errInvalidUncleHash
|
||||||
|
}
|
||||||
|
|
||||||
// verify the header of proposed block
|
// verify the header of proposed block
|
||||||
err := sb.VerifyHeader(sb.chain, block.Header(), false)
|
err := sb.VerifyHeader(sb.chain, block.Header(), false)
|
||||||
// ignore errEmptyCommittedSeals error because we don't have the committed seals yet
|
// ignore errEmptyCommittedSeals error because we don't have the committed seals yet
|
||||||
|
@ -239,6 +257,11 @@ func (sb *backend) CheckSignature(data []byte, address common.Address, sig []byt
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasPropsal implements istanbul.Backend.HashBlock
|
||||||
|
func (sb *backend) HasPropsal(hash common.Hash, number *big.Int) bool {
|
||||||
|
return sb.chain.GetHeader(hash, number.Uint64()) != nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetProposer implements istanbul.Backend.GetProposer
|
// GetProposer implements istanbul.Backend.GetProposer
|
||||||
func (sb *backend) GetProposer(number uint64) common.Address {
|
func (sb *backend) GetProposer(number uint64) common.Address {
|
||||||
if h := sb.chain.GetHeaderByNumber(number); h != nil {
|
if h := sb.chain.GetHeaderByNumber(number); h != nil {
|
||||||
|
@ -248,6 +271,14 @@ func (sb *backend) GetProposer(number uint64) common.Address {
|
||||||
return common.Address{}
|
return common.Address{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParentValidators implements istanbul.Backend.GetParentValidators
|
||||||
|
func (sb *backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
|
||||||
|
if block, ok := proposal.(*types.Block); ok {
|
||||||
|
return sb.getValidators(block.Number().Uint64()-1, block.ParentHash())
|
||||||
|
}
|
||||||
|
return validator.NewSet(nil, sb.config.ProposerPolicy)
|
||||||
|
}
|
||||||
|
|
||||||
func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet {
|
func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet {
|
||||||
snap, err := sb.snapshot(sb.chain, number, hash, nil)
|
snap, err := sb.snapshot(sb.chain, number, hash, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -257,17 +288,12 @@ func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.Valid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *backend) LastProposal() (istanbul.Proposal, common.Address) {
|
func (sb *backend) LastProposal() (istanbul.Proposal, common.Address) {
|
||||||
if sb.chain == nil {
|
block := sb.currentBlock()
|
||||||
sb.logger.Error("Failed to access blockchain")
|
|
||||||
return nil, common.Address{}
|
|
||||||
}
|
|
||||||
|
|
||||||
h := sb.chain.CurrentHeader()
|
|
||||||
|
|
||||||
var proposer common.Address
|
var proposer common.Address
|
||||||
if h.Number.Cmp(common.Big0) > 0 {
|
if block.Number().Cmp(common.Big0) > 0 {
|
||||||
var err error
|
var err error
|
||||||
proposer, err = sb.Author(h)
|
proposer, err = sb.Author(block.Header())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sb.logger.Error("Failed to get block proposer", "err", err)
|
sb.logger.Error("Failed to get block proposer", "err", err)
|
||||||
return nil, common.Address{}
|
return nil, common.Address{}
|
||||||
|
@ -275,5 +301,12 @@ func (sb *backend) LastProposal() (istanbul.Proposal, common.Address) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return header only block here since we don't need block body
|
// Return header only block here since we don't need block body
|
||||||
return types.NewBlockWithHeader(h), proposer
|
return block, proposer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sb *backend) HasBadProposal(hash common.Hash) bool {
|
||||||
|
if sb.hasBadBlock == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return sb.hasBadBlock(hash)
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,11 +29,10 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
|
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSign(t *testing.T) {
|
func TestSign(t *testing.T) {
|
||||||
b, _, _ := newBackend()
|
b := newBackend()
|
||||||
data := []byte("Here is a string....")
|
data := []byte("Here is a string....")
|
||||||
sig, err := b.Sign(data)
|
sig, err := b.Sign(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -54,7 +53,7 @@ func TestCheckSignature(t *testing.T) {
|
||||||
data := []byte("Here is a string....")
|
data := []byte("Here is a string....")
|
||||||
hashData := crypto.Keccak256([]byte(data))
|
hashData := crypto.Keccak256([]byte(data))
|
||||||
sig, _ := crypto.Sign(hashData, key)
|
sig, _ := crypto.Sign(hashData, key)
|
||||||
b, _, _ := newBackend()
|
b := newBackend()
|
||||||
a := getAddress()
|
a := getAddress()
|
||||||
err := b.CheckSignature(data, a, sig)
|
err := b.CheckSignature(data, a, sig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -68,7 +67,7 @@ func TestCheckSignature(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckValidatorSignature(t *testing.T) {
|
func TestCheckValidatorSignature(t *testing.T) {
|
||||||
_, keys, vset := newBackend()
|
vset, keys := newTestValidatorSet(5)
|
||||||
|
|
||||||
// 1. Positive test: sign with validator's key should succeed
|
// 1. Positive test: sign with validator's key should succeed
|
||||||
data := []byte("dummy data")
|
data := []byte("dummy data")
|
||||||
|
@ -113,7 +112,7 @@ func TestCheckValidatorSignature(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCommit(t *testing.T) {
|
func TestCommit(t *testing.T) {
|
||||||
backend, _, _ := newBackend()
|
backend := newBackend()
|
||||||
|
|
||||||
commitCh := make(chan *types.Block)
|
commitCh := make(chan *types.Block)
|
||||||
// Case: it's a proposer, so the backend.commit will receive channel result from backend.Commit function
|
// Case: it's a proposer, so the backend.commit will receive channel result from backend.Commit function
|
||||||
|
@ -235,13 +234,9 @@ func (slice Keys) Swap(i, j int) {
|
||||||
slice[i], slice[j] = slice[j], slice[i]
|
slice[i], slice[j] = slice[j], slice[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBackend() (b *backend, validatorKeys Keys, validatorSet istanbul.ValidatorSet) {
|
func newBackend() (b *backend) {
|
||||||
|
_, b = newBlockChain(4)
|
||||||
key, _ := generatePrivateKey()
|
key, _ := generatePrivateKey()
|
||||||
validatorSet, validatorKeys = newTestValidatorSet(5)
|
b.privateKey = key
|
||||||
b = &backend{
|
|
||||||
privateKey: key,
|
|
||||||
logger: log.New("backend", "simple"),
|
|
||||||
commitCh: make(chan *types.Block, 1),
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,6 +80,8 @@ var (
|
||||||
errInvalidCommittedSeals = errors.New("invalid committed seals")
|
errInvalidCommittedSeals = errors.New("invalid committed seals")
|
||||||
// errEmptyCommittedSeals is returned if the field of committed seals is zero.
|
// errEmptyCommittedSeals is returned if the field of committed seals is zero.
|
||||||
errEmptyCommittedSeals = errors.New("zero committed seals")
|
errEmptyCommittedSeals = errors.New("zero committed seals")
|
||||||
|
// errMismatchTxhashes is returned if the TxHash in header is mismatch.
|
||||||
|
errMismatchTxhashes = errors.New("mismatch transcations hashes")
|
||||||
)
|
)
|
||||||
var (
|
var (
|
||||||
defaultDifficulty = big.NewInt(1)
|
defaultDifficulty = big.NewInt(1)
|
||||||
|
@ -477,7 +479,7 @@ func (sb *backend) APIs(chain consensus.ChainReader) []rpc.API {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start implements consensus.Istanbul.Start
|
// Start implements consensus.Istanbul.Start
|
||||||
func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks) (int, error)) error {
|
func (sb *backend) Start(chain consensus.ChainReader, currentBlock func() *types.Block, hasBadBlock func(hash common.Hash) bool) error {
|
||||||
sb.coreMu.Lock()
|
sb.coreMu.Lock()
|
||||||
defer sb.coreMu.Unlock()
|
defer sb.coreMu.Unlock()
|
||||||
if sb.coreStarted {
|
if sb.coreStarted {
|
||||||
|
@ -492,23 +494,10 @@ func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks
|
||||||
sb.commitCh = make(chan *types.Block, 1)
|
sb.commitCh = make(chan *types.Block, 1)
|
||||||
|
|
||||||
sb.chain = chain
|
sb.chain = chain
|
||||||
sb.inserter = inserter
|
sb.currentBlock = currentBlock
|
||||||
|
sb.hasBadBlock = hasBadBlock
|
||||||
|
|
||||||
curHeader := chain.CurrentHeader()
|
if err := sb.core.Start(); err != nil {
|
||||||
lastSequence := new(big.Int).Set(curHeader.Number)
|
|
||||||
lastProposer := common.Address{}
|
|
||||||
// should get proposer if the block is not genesis
|
|
||||||
if lastSequence.Cmp(common.Big0) > 0 {
|
|
||||||
p, err := sb.Author(curHeader)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
lastProposer = p
|
|
||||||
}
|
|
||||||
// We don't need block body so we create a header only block.
|
|
||||||
// The proposal is only for validator set calculation.
|
|
||||||
lastProposal := types.NewBlockWithHeader(curHeader)
|
|
||||||
if err := sb.core.Start(lastSequence, lastProposer, lastProposal); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ func newBlockChain(n int) (*core.BlockChain, *backend) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
b.Start(blockchain, blockchain.InsertChain)
|
b.Start(blockchain, blockchain.CurrentBlock, blockchain.HasBadBlock)
|
||||||
snap, err := b.snapshot(blockchain, 0, common.Hash{}, nil)
|
snap, err := b.snapshot(blockchain, 0, common.Hash{}, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -138,7 +138,7 @@ func makeBlock(chain *core.BlockChain, engine *backend, parent *types.Block) *ty
|
||||||
func makeBlockWithoutSeal(chain *core.BlockChain, engine *backend, parent *types.Block) *types.Block {
|
func makeBlockWithoutSeal(chain *core.BlockChain, engine *backend, parent *types.Block) *types.Block {
|
||||||
header := makeHeader(parent, engine.config)
|
header := makeHeader(parent, engine.config)
|
||||||
engine.Prepare(chain, header)
|
engine.Prepare(chain, header)
|
||||||
state, _, _ := chain.StateAt(parent.Root())
|
state, _,_ := chain.StateAt(parent.Root())
|
||||||
block, _ := engine.Finalize(chain, header, state, nil, nil, nil)
|
block, _ := engine.Finalize(chain, header, state, nil, nil, nil)
|
||||||
return block
|
return block
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/consensus"
|
"github.com/ethereum/go-ethereum/consensus"
|
||||||
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
)
|
)
|
||||||
|
@ -93,20 +92,12 @@ func (sb *backend) SetBroadcaster(broadcaster consensus.Broadcaster) {
|
||||||
sb.broadcaster = broadcaster
|
sb.broadcaster = broadcaster
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *backend) NewChainHead(block *types.Block) error {
|
func (sb *backend) NewChainHead() error {
|
||||||
sb.coreMu.Lock()
|
sb.coreMu.RLock()
|
||||||
defer sb.coreMu.Unlock()
|
defer sb.coreMu.RUnlock()
|
||||||
if !sb.coreStarted {
|
if !sb.coreStarted {
|
||||||
return istanbul.ErrStoppedEngine
|
return istanbul.ErrStoppedEngine
|
||||||
}
|
}
|
||||||
p, err := sb.Author(block.Header())
|
go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{})
|
||||||
if err != nil {
|
|
||||||
sb.logger.Error("Failed to get block proposer", "err", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{
|
|
||||||
Proposal: block,
|
|
||||||
Proposer: p,
|
|
||||||
})
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
||||||
|
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
)
|
)
|
||||||
|
@ -50,11 +51,11 @@ type Tally struct {
|
||||||
type Snapshot struct {
|
type Snapshot struct {
|
||||||
Epoch uint64 // The number of blocks after which to checkpoint and reset the pending votes
|
Epoch uint64 // The number of blocks after which to checkpoint and reset the pending votes
|
||||||
|
|
||||||
Number uint64 `json:"number"` // Block number where the snapshot was created
|
Number uint64 // Block number where the snapshot was created
|
||||||
Hash common.Hash `json:"hash"` // Block hash where the snapshot was created
|
Hash common.Hash // Block hash where the snapshot was created
|
||||||
Votes []*Vote `json:"votes"` // List of votes cast in chronological order
|
Votes []*Vote // List of votes cast in chronological order
|
||||||
Tally map[common.Address]Tally `json:"tally"` // Current vote tally to avoid recalculating
|
Tally map[common.Address]Tally // Current vote tally to avoid recalculating
|
||||||
ValSet istanbul.ValidatorSet `json:"validators"` // Set of authorized validators at this moment
|
ValSet istanbul.ValidatorSet // Set of authorized validators at this moment
|
||||||
}
|
}
|
||||||
|
|
||||||
// newSnapshot create a new snapshot with the specified startup parameters. This
|
// newSnapshot create a new snapshot with the specified startup parameters. This
|
||||||
|
@ -272,3 +273,49 @@ func (s *Snapshot) validators() []common.Address {
|
||||||
}
|
}
|
||||||
return validators
|
return validators
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type snapshotJSON struct {
|
||||||
|
Epoch uint64 `json:"epoch"`
|
||||||
|
Number uint64 `json:"number"`
|
||||||
|
Hash common.Hash `json:"hash"`
|
||||||
|
Votes []*Vote `json:"votes"`
|
||||||
|
Tally map[common.Address]Tally `json:"tally"`
|
||||||
|
|
||||||
|
// for validator set
|
||||||
|
Validators []common.Address `json:"validators"`
|
||||||
|
Policy istanbul.ProposerPolicy `json:"policy"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Snapshot) toJSONStruct() *snapshotJSON {
|
||||||
|
return &snapshotJSON{
|
||||||
|
Epoch: s.Epoch,
|
||||||
|
Number: s.Number,
|
||||||
|
Hash: s.Hash,
|
||||||
|
Votes: s.Votes,
|
||||||
|
Tally: s.Tally,
|
||||||
|
Validators: s.validators(),
|
||||||
|
Policy: s.ValSet.Policy(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal from a json byte array
|
||||||
|
func (s *Snapshot) UnmarshalJSON(b []byte) error {
|
||||||
|
var j snapshotJSON
|
||||||
|
if err := json.Unmarshal(b, &j); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Epoch = j.Epoch
|
||||||
|
s.Number = j.Number
|
||||||
|
s.Hash = j.Hash
|
||||||
|
s.Votes = j.Votes
|
||||||
|
s.Tally = j.Tally
|
||||||
|
s.ValSet = validator.NewSet(j.Validators, j.Policy)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Marshal to a json byte array
|
||||||
|
func (s *Snapshot) MarshalJSON() ([]byte, error) {
|
||||||
|
j := s.toJSONStruct()
|
||||||
|
return json.Marshal(j)
|
||||||
|
}
|
||||||
|
|
|
@ -20,10 +20,12 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
||||||
|
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/core/vm"
|
"github.com/ethereum/go-ethereum/core/vm"
|
||||||
|
@ -400,3 +402,54 @@ func TestVoting(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSaveAndLoad(t *testing.T) {
|
||||||
|
snap := &Snapshot{
|
||||||
|
Epoch: 5,
|
||||||
|
Number: 10,
|
||||||
|
Hash: common.HexToHash("1234567890"),
|
||||||
|
Votes: []*Vote{
|
||||||
|
{
|
||||||
|
Validator: common.StringToAddress("1234567891"),
|
||||||
|
Block: 15,
|
||||||
|
Address: common.StringToAddress("1234567892"),
|
||||||
|
Authorize: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Tally: map[common.Address]Tally{
|
||||||
|
common.StringToAddress("1234567893"): Tally{
|
||||||
|
Authorize: false,
|
||||||
|
Votes: 20,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ValSet: validator.NewSet([]common.Address{
|
||||||
|
common.StringToAddress("1234567894"),
|
||||||
|
common.StringToAddress("1234567895"),
|
||||||
|
}, istanbul.RoundRobin),
|
||||||
|
}
|
||||||
|
db, _ := ethdb.NewMemDatabase()
|
||||||
|
err := snap.store(db)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("store snapshot failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
snap1, err := loadSnapshot(snap.Epoch, db, snap.Hash)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("load snapshot failed: %v", err)
|
||||||
|
}
|
||||||
|
if snap.Epoch != snap1.Epoch {
|
||||||
|
t.Errorf("epoch mismatch: have %v, want %v", snap1.Epoch, snap.Epoch)
|
||||||
|
}
|
||||||
|
if snap.Hash != snap1.Hash {
|
||||||
|
t.Errorf("hash mismatch: have %v, want %v", snap1.Number, snap.Number)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(snap.Votes, snap.Votes) {
|
||||||
|
t.Errorf("votes mismatch: have %v, want %v", snap1.Votes, snap.Votes)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(snap.Tally, snap.Tally) {
|
||||||
|
t.Errorf("tally mismatch: have %v, want %v", snap1.Tally, snap.Tally)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(snap.ValSet, snap.ValSet) {
|
||||||
|
t.Errorf("validator set mismatch: have %v, want %v", snap1.ValSet, snap.ValSet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -40,6 +40,15 @@ func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error {
|
||||||
return errInvalidMessage
|
return errInvalidMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if msgCode == msgRoundChange {
|
||||||
|
if view.Sequence.Cmp(c.currentView().Sequence) > 0 {
|
||||||
|
return errFutureMessage
|
||||||
|
} else if view.Cmp(c.currentView()) < 0 {
|
||||||
|
return errOldMessage
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if view.Cmp(c.currentView()) > 0 {
|
if view.Cmp(c.currentView()) > 0 {
|
||||||
return errFutureMessage
|
return errFutureMessage
|
||||||
}
|
}
|
||||||
|
@ -90,7 +99,7 @@ func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
backlog.Push(msg, toPriority(msg.Code, p.View))
|
backlog.Push(msg, toPriority(msg.Code, p.View))
|
||||||
}
|
}
|
||||||
// for istanbul.MsgPrepare and istanbul.MsgCommit cases
|
// for msgRoundChange, msgPrepare and msgCommit cases
|
||||||
default:
|
default:
|
||||||
var p *istanbul.Subject
|
var p *istanbul.Subject
|
||||||
err := msg.Decode(&p)
|
err := msg.Decode(&p)
|
||||||
|
@ -127,7 +136,7 @@ func (c *core) processBacklog() {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
view = m.View
|
view = m.View
|
||||||
}
|
}
|
||||||
// for istanbul.MsgPrepare and istanbul.MsgCommit cases
|
// for msgRoundChange, msgPrepare and msgCommit cases
|
||||||
default:
|
default:
|
||||||
var sub *istanbul.Subject
|
var sub *istanbul.Subject
|
||||||
err := msg.Decode(&sub)
|
err := msg.Decode(&sub)
|
||||||
|
@ -162,6 +171,10 @@ func (c *core) processBacklog() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func toPriority(msgCode uint64, view *istanbul.View) float32 {
|
func toPriority(msgCode uint64, view *istanbul.View) float32 {
|
||||||
|
if msgCode == msgRoundChange {
|
||||||
|
// For msgRoundChange, set the message priority based on its sequence
|
||||||
|
return -float32(view.Sequence.Uint64() * 1000)
|
||||||
|
}
|
||||||
// FIXME: round will be reset as 0 while new sequence
|
// FIXME: round will be reset as 0 while new sequence
|
||||||
// 10 * Round limits the range of message code is from 0 to 9
|
// 10 * Round limits the range of message code is from 0 to 9
|
||||||
// 1000 * Sequence limits the range of round is from 0 to 99
|
// 1000 * Sequence limits the range of round is from 0 to 99
|
||||||
|
|
|
@ -37,7 +37,7 @@ func TestCheckMessage(t *testing.T) {
|
||||||
current: newRoundState(&istanbul.View{
|
current: newRoundState(&istanbul.View{
|
||||||
Sequence: big.NewInt(1),
|
Sequence: big.NewInt(1),
|
||||||
Round: big.NewInt(0),
|
Round: big.NewInt(0),
|
||||||
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
|
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
|
||||||
}
|
}
|
||||||
|
|
||||||
// invalid view format
|
// invalid view format
|
||||||
|
@ -47,7 +47,7 @@ func TestCheckMessage(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
testStates := []State{StateAcceptRequest, StatePreprepared, StatePrepared, StateCommitted}
|
testStates := []State{StateAcceptRequest, StatePreprepared, StatePrepared, StateCommitted}
|
||||||
testCode := []uint64{msgPreprepare, msgPrepare, msgCommit}
|
testCode := []uint64{msgPreprepare, msgPrepare, msgCommit, msgRoundChange}
|
||||||
|
|
||||||
// future sequence
|
// future sequence
|
||||||
v := &istanbul.View{
|
v := &istanbul.View{
|
||||||
|
@ -73,7 +73,11 @@ func TestCheckMessage(t *testing.T) {
|
||||||
c.state = testStates[i]
|
c.state = testStates[i]
|
||||||
for j := 0; j < len(testCode); j++ {
|
for j := 0; j < len(testCode); j++ {
|
||||||
err := c.checkMessage(testCode[j], v)
|
err := c.checkMessage(testCode[j], v)
|
||||||
if err != errFutureMessage {
|
if testCode[j] == msgRoundChange {
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
|
}
|
||||||
|
} else if err != errFutureMessage {
|
||||||
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
|
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -89,7 +93,11 @@ func TestCheckMessage(t *testing.T) {
|
||||||
c.state = testStates[i]
|
c.state = testStates[i]
|
||||||
for j := 0; j < len(testCode); j++ {
|
for j := 0; j < len(testCode); j++ {
|
||||||
err := c.checkMessage(testCode[j], v)
|
err := c.checkMessage(testCode[j], v)
|
||||||
if err != errFutureMessage {
|
if testCode[j] == msgRoundChange {
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
|
}
|
||||||
|
} else if err != errFutureMessage {
|
||||||
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
|
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -101,7 +109,11 @@ func TestCheckMessage(t *testing.T) {
|
||||||
c.state = StateAcceptRequest
|
c.state = StateAcceptRequest
|
||||||
for i := 0; i < len(testCode); i++ {
|
for i := 0; i < len(testCode); i++ {
|
||||||
err = c.checkMessage(testCode[i], v)
|
err = c.checkMessage(testCode[i], v)
|
||||||
if testCode[i] == msgPreprepare {
|
if testCode[i] == msgRoundChange {
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
|
}
|
||||||
|
} else if testCode[i] == msgPreprepare {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("error mismatch: have %v, want nil", err)
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
}
|
}
|
||||||
|
@ -116,7 +128,11 @@ func TestCheckMessage(t *testing.T) {
|
||||||
c.state = StatePreprepared
|
c.state = StatePreprepared
|
||||||
for i := 0; i < len(testCode); i++ {
|
for i := 0; i < len(testCode); i++ {
|
||||||
err = c.checkMessage(testCode[i], v)
|
err = c.checkMessage(testCode[i], v)
|
||||||
if err != nil {
|
if testCode[i] == msgRoundChange {
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
t.Errorf("error mismatch: have %v, want nil", err)
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -125,7 +141,11 @@ func TestCheckMessage(t *testing.T) {
|
||||||
c.state = StatePrepared
|
c.state = StatePrepared
|
||||||
for i := 0; i < len(testCode); i++ {
|
for i := 0; i < len(testCode); i++ {
|
||||||
err = c.checkMessage(testCode[i], v)
|
err = c.checkMessage(testCode[i], v)
|
||||||
if err != nil {
|
if testCode[i] == msgRoundChange {
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
t.Errorf("error mismatch: have %v, want nil", err)
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -134,7 +154,11 @@ func TestCheckMessage(t *testing.T) {
|
||||||
c.state = StateCommitted
|
c.state = StateCommitted
|
||||||
for i := 0; i < len(testCode); i++ {
|
for i := 0; i < len(testCode); i++ {
|
||||||
err = c.checkMessage(testCode[i], v)
|
err = c.checkMessage(testCode[i], v)
|
||||||
if err != nil {
|
if testCode[i] == msgRoundChange {
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
t.Errorf("error mismatch: have %v, want nil", err)
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -195,6 +219,17 @@ func TestStoreBacklog(t *testing.T) {
|
||||||
if !reflect.DeepEqual(msg, m) {
|
if !reflect.DeepEqual(msg, m) {
|
||||||
t.Errorf("message mismatch: have %v, want %v", msg, m)
|
t.Errorf("message mismatch: have %v, want %v", msg, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// push roundChange msg
|
||||||
|
m = &message{
|
||||||
|
Code: msgRoundChange,
|
||||||
|
Msg: subjectPayload,
|
||||||
|
}
|
||||||
|
c.storeBacklog(m, p)
|
||||||
|
msg = c.backlogs[p].PopItem()
|
||||||
|
if !reflect.DeepEqual(msg, m) {
|
||||||
|
t.Errorf("message mismatch: have %v, want %v", msg, m)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessFutureBacklog(t *testing.T) {
|
func TestProcessFutureBacklog(t *testing.T) {
|
||||||
|
@ -209,7 +244,7 @@ func TestProcessFutureBacklog(t *testing.T) {
|
||||||
current: newRoundState(&istanbul.View{
|
current: newRoundState(&istanbul.View{
|
||||||
Sequence: big.NewInt(1),
|
Sequence: big.NewInt(1),
|
||||||
Round: big.NewInt(0),
|
Round: big.NewInt(0),
|
||||||
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
|
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
|
||||||
state: StateAcceptRequest,
|
state: StateAcceptRequest,
|
||||||
}
|
}
|
||||||
c.subscribeEvents()
|
c.subscribeEvents()
|
||||||
|
@ -276,6 +311,10 @@ func TestProcessBacklog(t *testing.T) {
|
||||||
Code: msgCommit,
|
Code: msgCommit,
|
||||||
Msg: subjectPayload,
|
Msg: subjectPayload,
|
||||||
},
|
},
|
||||||
|
&message{
|
||||||
|
Code: msgRoundChange,
|
||||||
|
Msg: subjectPayload,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for i := 0; i < len(msgs); i++ {
|
for i := 0; i < len(msgs); i++ {
|
||||||
testProcessBacklog(t, msgs[i])
|
testProcessBacklog(t, msgs[i])
|
||||||
|
@ -297,7 +336,7 @@ func testProcessBacklog(t *testing.T, msg *message) {
|
||||||
current: newRoundState(&istanbul.View{
|
current: newRoundState(&istanbul.View{
|
||||||
Sequence: big.NewInt(1),
|
Sequence: big.NewInt(1),
|
||||||
Round: big.NewInt(0),
|
Round: big.NewInt(0),
|
||||||
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
|
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
|
||||||
}
|
}
|
||||||
c.subscribeEvents()
|
c.subscribeEvents()
|
||||||
defer c.unsubscribeEvents()
|
defer c.unsubscribeEvents()
|
||||||
|
|
|
@ -39,6 +39,7 @@ func New(backend istanbul.Backend, config *istanbul.Config) Engine {
|
||||||
config: config,
|
config: config,
|
||||||
address: backend.Address(),
|
address: backend.Address(),
|
||||||
state: StateAcceptRequest,
|
state: StateAcceptRequest,
|
||||||
|
handlerWg: new(sync.WaitGroup),
|
||||||
logger: log.New("address", backend.Address()),
|
logger: log.New("address", backend.Address()),
|
||||||
backend: backend,
|
backend: backend,
|
||||||
backlogs: make(map[istanbul.Validator]*prque.Prque),
|
backlogs: make(map[istanbul.Validator]*prque.Prque),
|
||||||
|
@ -68,8 +69,6 @@ type core struct {
|
||||||
timeoutSub *event.TypeMuxSubscription
|
timeoutSub *event.TypeMuxSubscription
|
||||||
futurePreprepareTimer *time.Timer
|
futurePreprepareTimer *time.Timer
|
||||||
|
|
||||||
lastProposer common.Address
|
|
||||||
lastProposal istanbul.Proposal
|
|
||||||
valSet istanbul.ValidatorSet
|
valSet istanbul.ValidatorSet
|
||||||
waitingForRoundChange bool
|
waitingForRoundChange bool
|
||||||
validateFn func([]byte, []byte) (common.Address, error)
|
validateFn func([]byte, []byte) (common.Address, error)
|
||||||
|
@ -77,7 +76,8 @@ type core struct {
|
||||||
backlogs map[istanbul.Validator]*prque.Prque
|
backlogs map[istanbul.Validator]*prque.Prque
|
||||||
backlogsMu *sync.Mutex
|
backlogsMu *sync.Mutex
|
||||||
|
|
||||||
current *roundState
|
current *roundState
|
||||||
|
handlerWg *sync.WaitGroup
|
||||||
|
|
||||||
roundChangeSet *roundChangeSet
|
roundChangeSet *roundChangeSet
|
||||||
roundChangeTimer *time.Timer
|
roundChangeTimer *time.Timer
|
||||||
|
@ -179,35 +179,65 @@ func (c *core) commit() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *core) startNewRound(newView *istanbul.View, lastProposal istanbul.Proposal, lastProposer common.Address, roundChange bool) {
|
// startNewRound starts a new round. if round equals to 0, it means to starts a new sequence
|
||||||
|
func (c *core) startNewRound(round *big.Int) {
|
||||||
var logger log.Logger
|
var logger log.Logger
|
||||||
if c.current == nil {
|
if c.current == nil {
|
||||||
logger = c.logger.New("old_round", -1, "old_seq", 0, "old_proposer", c.valSet.GetProposer())
|
logger = c.logger.New("old_round", -1, "old_seq", 0)
|
||||||
} else {
|
} else {
|
||||||
logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence(), "old_proposer", c.valSet.GetProposer())
|
logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
roundChange := false
|
||||||
// Try to get last proposal
|
// Try to get last proposal
|
||||||
if lastProposal == nil {
|
lastProposal, lastProposer := c.backend.LastProposal()
|
||||||
lastProposal, lastProposer = c.backend.LastProposal()
|
if c.current == nil {
|
||||||
if lastProposal.Number().Cmp(newView.Sequence) > 0 {
|
logger.Trace("Start to the initial round")
|
||||||
newView = &istanbul.View{
|
} else if lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
|
||||||
Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
|
diff := new(big.Int).Sub(lastProposal.Number(), c.current.Sequence())
|
||||||
Round: new(big.Int),
|
c.sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64())
|
||||||
}
|
|
||||||
c.lastProposal = lastProposal
|
if !c.consensusTimestamp.IsZero() {
|
||||||
c.lastProposer = lastProposer
|
c.consensusTimer.UpdateSince(c.consensusTimestamp)
|
||||||
logger.Trace("Catch up latest proposal", "number", lastProposal.Number().Uint64(), "hash", lastProposal.Hash())
|
c.consensusTimestamp = time.Time{}
|
||||||
}
|
}
|
||||||
|
logger.Trace("Catch up latest proposal", "number", lastProposal.Number().Uint64(), "hash", lastProposal.Hash())
|
||||||
|
} else if lastProposal.Number().Cmp(big.NewInt(c.current.Sequence().Int64()-1)) == 0 {
|
||||||
|
if round.Cmp(common.Big0) == 0 {
|
||||||
|
// same seq and round, don't need to start new round
|
||||||
|
return
|
||||||
|
} else if round.Cmp(c.current.Round()) < 0 {
|
||||||
|
logger.Warn("New round should not be smaller than current round", "seq", lastProposal.Number().Int64(), "new_round", round, "old_round", c.current.Round())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
roundChange = true
|
||||||
|
} else {
|
||||||
|
logger.Warn("New sequence should be larger than current sequence", "new_seq", lastProposal.Number().Int64())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.valSet = c.backend.Validators(c.lastProposal)
|
var newView *istanbul.View
|
||||||
|
if roundChange {
|
||||||
|
newView = &istanbul.View{
|
||||||
|
Sequence: new(big.Int).Set(c.current.Sequence()),
|
||||||
|
Round: new(big.Int).Set(round),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
newView = &istanbul.View{
|
||||||
|
Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
|
||||||
|
Round: new(big.Int),
|
||||||
|
}
|
||||||
|
c.valSet = c.backend.Validators(lastProposal)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update logger
|
||||||
|
logger = logger.New("old_proposer", c.valSet.GetProposer())
|
||||||
// Clear invalid ROUND CHANGE messages
|
// Clear invalid ROUND CHANGE messages
|
||||||
c.roundChangeSet = newRoundChangeSet(c.valSet)
|
c.roundChangeSet = newRoundChangeSet(c.valSet)
|
||||||
// New snapshot for new round
|
// New snapshot for new round
|
||||||
c.updateRoundState(newView, c.valSet, roundChange)
|
c.updateRoundState(newView, c.valSet, roundChange)
|
||||||
// Calculate new proposer
|
// Calculate new proposer
|
||||||
c.valSet.CalcProposer(c.lastProposer, newView.Round.Uint64())
|
c.valSet.CalcProposer(lastProposer, newView.Round.Uint64())
|
||||||
c.waitingForRoundChange = false
|
c.waitingForRoundChange = false
|
||||||
c.setState(StateAcceptRequest)
|
c.setState(StateAcceptRequest)
|
||||||
if roundChange && c.isProposer() && c.current != nil {
|
if roundChange && c.isProposer() && c.current != nil {
|
||||||
|
@ -248,12 +278,12 @@ func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.Valid
|
||||||
// Lock only if both roundChange is true and it is locked
|
// Lock only if both roundChange is true and it is locked
|
||||||
if roundChange && c.current != nil {
|
if roundChange && c.current != nil {
|
||||||
if c.current.IsHashLocked() {
|
if c.current.IsHashLocked() {
|
||||||
c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest)
|
c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest, c.backend.HasBadProposal)
|
||||||
} else {
|
} else {
|
||||||
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest)
|
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest, c.backend.HasBadProposal)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil)
|
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil, c.backend.HasBadProposal)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,8 +318,12 @@ func (c *core) newRoundChangeTimer() {
|
||||||
c.stopTimer()
|
c.stopTimer()
|
||||||
|
|
||||||
// set timeout based on the round number
|
// set timeout based on the round number
|
||||||
t := uint64(math.Pow(2, float64(c.current.Round().Uint64()))) * c.config.RequestTimeout
|
timeout := time.Duration(c.config.RequestTimeout) * time.Millisecond
|
||||||
timeout := time.Duration(t) * time.Millisecond
|
round := c.current.Round().Uint64()
|
||||||
|
if round > 0 {
|
||||||
|
timeout += time.Duration(math.Pow(2, float64(round))) * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
c.roundChangeTimer = time.AfterFunc(timeout, func() {
|
c.roundChangeTimer = time.AfterFunc(timeout, func() {
|
||||||
c.sendEvent(timeoutEvent{})
|
c.sendEvent(timeoutEvent{})
|
||||||
})
|
})
|
||||||
|
|
|
@ -16,36 +16,11 @@
|
||||||
|
|
||||||
package core
|
package core
|
||||||
|
|
||||||
import (
|
import "github.com/ethereum/go-ethereum/common"
|
||||||
"math/big"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
func (c *core) handleFinalCommitted() error {
|
||||||
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
logger := c.logger.New("state", c.state)
|
||||||
)
|
|
||||||
|
|
||||||
func (c *core) handleFinalCommitted(proposal istanbul.Proposal, proposer common.Address) error {
|
|
||||||
logger := c.logger.New("state", c.state, "number", proposal.Number(), "hash", proposal.Hash())
|
|
||||||
logger.Trace("Received a final committed proposal")
|
logger.Trace("Received a final committed proposal")
|
||||||
|
c.startNewRound(common.Big0)
|
||||||
// Catch up the sequence number
|
|
||||||
if proposal.Number().Cmp(c.current.Sequence()) >= 0 {
|
|
||||||
// Remember to store the proposer since we've accpetted the proposal
|
|
||||||
diff := new(big.Int).Sub(proposal.Number(), c.current.Sequence())
|
|
||||||
c.sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64())
|
|
||||||
|
|
||||||
if !c.consensusTimestamp.IsZero() {
|
|
||||||
c.consensusTimer.UpdateSince(c.consensusTimestamp)
|
|
||||||
c.consensusTimestamp = time.Time{}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.lastProposer = proposer
|
|
||||||
c.lastProposal = proposal
|
|
||||||
c.startNewRound(&istanbul.View{
|
|
||||||
Sequence: new(big.Int).Add(proposal.Number(), common.Big1),
|
|
||||||
Round: new(big.Int).Set(common.Big0),
|
|
||||||
}, proposal, proposer, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,24 +17,14 @@
|
||||||
package core
|
package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math/big"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Start implements core.Engine.Start
|
// Start implements core.Engine.Start
|
||||||
func (c *core) Start(lastSequence *big.Int, lastProposer common.Address, lastProposal istanbul.Proposal) error {
|
func (c *core) Start() error {
|
||||||
// Initialize last proposer
|
|
||||||
c.lastProposer = lastProposer
|
|
||||||
c.lastProposal = lastProposal
|
|
||||||
c.valSet = c.backend.Validators(c.lastProposal)
|
|
||||||
|
|
||||||
// Start a new round from last sequence + 1
|
// Start a new round from last sequence + 1
|
||||||
c.startNewRound(&istanbul.View{
|
c.startNewRound(common.Big0)
|
||||||
Sequence: new(big.Int).Add(lastSequence, common.Big1),
|
|
||||||
Round: common.Big0,
|
|
||||||
}, lastProposal, lastProposer, false)
|
|
||||||
|
|
||||||
// Tests will handle events itself, so we have to make subscribeEvents()
|
// Tests will handle events itself, so we have to make subscribeEvents()
|
||||||
// be able to call in test.
|
// be able to call in test.
|
||||||
|
@ -48,6 +38,9 @@ func (c *core) Start(lastSequence *big.Int, lastProposer common.Address, lastPro
|
||||||
func (c *core) Stop() error {
|
func (c *core) Stop() error {
|
||||||
c.stopTimer()
|
c.stopTimer()
|
||||||
c.unsubscribeEvents()
|
c.unsubscribeEvents()
|
||||||
|
|
||||||
|
// Make sure the handler goroutine exits
|
||||||
|
c.handlerWg.Wait()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,6 +71,14 @@ func (c *core) unsubscribeEvents() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *core) handleEvents() {
|
func (c *core) handleEvents() {
|
||||||
|
// Clear state
|
||||||
|
defer func() {
|
||||||
|
c.current = nil
|
||||||
|
c.handlerWg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
c.handlerWg.Add(1)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event, ok := <-c.events.Chan():
|
case event, ok := <-c.events.Chan():
|
||||||
|
@ -118,9 +119,9 @@ func (c *core) handleEvents() {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch ev := event.Data.(type) {
|
switch event.Data.(type) {
|
||||||
case istanbul.FinalCommittedEvent:
|
case istanbul.FinalCommittedEvent:
|
||||||
c.handleFinalCommitted(ev.Proposal, ev.Proposer)
|
c.handleFinalCommitted()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -171,7 +172,7 @@ func (c *core) handleCheckedMsg(msg *message, src istanbul.Validator) error {
|
||||||
case msgCommit:
|
case msgCommit:
|
||||||
return testBacklog(c.handleCommit(msg, src))
|
return testBacklog(c.handleCommit(msg, src))
|
||||||
case msgRoundChange:
|
case msgRoundChange:
|
||||||
return c.handleRoundChange(msg, src)
|
return testBacklog(c.handleRoundChange(msg, src))
|
||||||
default:
|
default:
|
||||||
logger.Error("Invalid message", "msg", msg)
|
logger.Error("Invalid message", "msg", msg)
|
||||||
}
|
}
|
||||||
|
@ -191,13 +192,10 @@ func (c *core) handleTimeoutMsg() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lastProposal, lastProposer := c.backend.LastProposal()
|
lastProposal, _ := c.backend.LastProposal()
|
||||||
if lastProposal != nil && lastProposal.Number().Cmp(c.current.Sequence()) > 0 {
|
if lastProposal != nil && lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
|
||||||
c.logger.Trace("round change timeout, catch up latest sequence", "number", lastProposal.Number().Uint64())
|
c.logger.Trace("round change timeout, catch up latest sequence", "number", lastProposal.Number().Uint64())
|
||||||
c.startNewRound(&istanbul.View{
|
c.startNewRound(common.Big0)
|
||||||
Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
|
|
||||||
Round: new(big.Int),
|
|
||||||
}, lastProposal, lastProposer, false)
|
|
||||||
} else {
|
} else {
|
||||||
c.sendNextRoundChange()
|
c.sendNextRoundChange()
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ func newMessageSet(valSet istanbul.ValidatorSet) *messageSet {
|
||||||
Sequence: new(big.Int),
|
Sequence: new(big.Int),
|
||||||
},
|
},
|
||||||
messagesMu: new(sync.Mutex),
|
messagesMu: new(sync.Mutex),
|
||||||
messages: make(map[common.Hash]*message),
|
messages: make(map[common.Address]*message),
|
||||||
valSet: valSet,
|
valSet: valSet,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,7 @@ type messageSet struct {
|
||||||
view *istanbul.View
|
view *istanbul.View
|
||||||
valSet istanbul.ValidatorSet
|
valSet istanbul.ValidatorSet
|
||||||
messagesMu *sync.Mutex
|
messagesMu *sync.Mutex
|
||||||
messages map[common.Hash]*message
|
messages map[common.Address]*message
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *messageSet) View() *istanbul.View {
|
func (ms *messageSet) View() *istanbul.View {
|
||||||
|
@ -80,6 +80,12 @@ func (ms *messageSet) Size() int {
|
||||||
return len(ms.messages)
|
return len(ms.messages)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ms *messageSet) Get(addr common.Address) *message {
|
||||||
|
ms.messagesMu.Lock()
|
||||||
|
defer ms.messagesMu.Unlock()
|
||||||
|
return ms.messages[addr]
|
||||||
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
|
||||||
func (ms *messageSet) verify(msg *message) error {
|
func (ms *messageSet) verify(msg *message) error {
|
||||||
|
@ -94,7 +100,7 @@ func (ms *messageSet) verify(msg *message) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *messageSet) addVerifiedMessage(msg *message) error {
|
func (ms *messageSet) addVerifiedMessage(msg *message) error {
|
||||||
ms.messages[istanbul.RLPHash(msg)] = msg
|
ms.messages[msg.Address] = msg
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,8 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
|
||||||
|
|
||||||
// Change to Prepared state if we've received enough PREPARE messages or it is locked
|
// Change to Prepared state if we've received enough PREPARE messages or it is locked
|
||||||
// and we are in earlier state before Prepared state.
|
// and we are in earlier state before Prepared state.
|
||||||
if (c.current.IsHashLocked() || c.current.Prepares.Size() > 2*c.valSet.F()) && c.state.Cmp(StatePrepared) < 0 {
|
if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.GetPrepareOrCommitSize() > 2*c.valSet.F()) &&
|
||||||
|
c.state.Cmp(StatePrepared) < 0 {
|
||||||
c.current.LockHash()
|
c.current.LockHash()
|
||||||
c.setState(StatePrepared)
|
c.setState(StatePrepared)
|
||||||
c.sendCommit()
|
c.sendCommit()
|
||||||
|
|
|
@ -56,7 +56,21 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure we have the same view with the PRE-PREPARE message
|
// Ensure we have the same view with the PRE-PREPARE message
|
||||||
|
// If it is old message, see if we need to broadcast COMMIT
|
||||||
if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil {
|
if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil {
|
||||||
|
if err == errOldMessage {
|
||||||
|
// Get validator set for the given proposal
|
||||||
|
valSet := c.backend.ParentValidators(preprepare.Proposal).Copy()
|
||||||
|
previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1)
|
||||||
|
valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64())
|
||||||
|
// Broadcast COMMIT if it is an existing block
|
||||||
|
// 1. The proposer needs to be a proposer matches the given (Sequence + Round)
|
||||||
|
// 2. The given block must exist
|
||||||
|
if valSet.IsProposer(src.Address()) && c.backend.HasPropsal(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) {
|
||||||
|
c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal.Hash())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,8 +101,16 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
|
||||||
// Here is about to accept the PRE-PREPARE
|
// Here is about to accept the PRE-PREPARE
|
||||||
if c.state == StateAcceptRequest {
|
if c.state == StateAcceptRequest {
|
||||||
// Send ROUND CHANGE if the locked proposal and the received proposal are different
|
// Send ROUND CHANGE if the locked proposal and the received proposal are different
|
||||||
if c.current.IsHashLocked() && preprepare.Proposal.Hash() != c.current.GetLockedHash() {
|
if c.current.IsHashLocked() {
|
||||||
c.sendNextRoundChange()
|
if preprepare.Proposal.Hash() == c.current.GetLockedHash() {
|
||||||
|
// Broadcast COMMIT and enters Prepared state directly
|
||||||
|
c.acceptPreprepare(preprepare)
|
||||||
|
c.setState(StatePrepared)
|
||||||
|
c.sendCommit()
|
||||||
|
} else {
|
||||||
|
// Send round change
|
||||||
|
c.sendNextRoundChange()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Either
|
// Either
|
||||||
// 1. the locked proposal and the received proposal match
|
// 1. the locked proposal and the received proposal match
|
||||||
|
|
|
@ -273,7 +273,7 @@ func TestHandlePreprepareWithLock(t *testing.T) {
|
||||||
t.Errorf("error mismatch: have %v, want nil", err)
|
t.Errorf("error mismatch: have %v, want nil", err)
|
||||||
}
|
}
|
||||||
if test.proposal == test.lockProposal {
|
if test.proposal == test.lockProposal {
|
||||||
if c.state != StatePreprepared {
|
if c.state != StatePrepared {
|
||||||
t.Errorf("state mismatch: have %v, want %v", c.state, StatePreprepared)
|
t.Errorf("state mismatch: have %v, want %v", c.state, StatePreprepared)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(curView, c.currentView()) {
|
if !reflect.DeepEqual(curView, c.currentView()) {
|
||||||
|
|
|
@ -60,7 +60,7 @@ func (c *core) checkRequestMsg(request *istanbul.Request) error {
|
||||||
func (c *core) storeRequestMsg(request *istanbul.Request) {
|
func (c *core) storeRequestMsg(request *istanbul.Request) {
|
||||||
logger := c.logger.New("state", c.state)
|
logger := c.logger.New("state", c.state)
|
||||||
|
|
||||||
logger.Trace("Store future request", "request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
|
logger.Trace("Store future request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
|
||||||
|
|
||||||
c.pendingRequestsMu.Lock()
|
c.pendingRequestsMu.Lock()
|
||||||
defer c.pendingRequestsMu.Unlock()
|
defer c.pendingRequestsMu.Unlock()
|
||||||
|
|
|
@ -36,7 +36,7 @@ func TestCheckRequestMsg(t *testing.T) {
|
||||||
current: newRoundState(&istanbul.View{
|
current: newRoundState(&istanbul.View{
|
||||||
Sequence: big.NewInt(1),
|
Sequence: big.NewInt(1),
|
||||||
Round: big.NewInt(0),
|
Round: big.NewInt(0),
|
||||||
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
|
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
|
||||||
}
|
}
|
||||||
|
|
||||||
// invalid request
|
// invalid request
|
||||||
|
@ -91,7 +91,7 @@ func TestStoreRequestMsg(t *testing.T) {
|
||||||
current: newRoundState(&istanbul.View{
|
current: newRoundState(&istanbul.View{
|
||||||
Sequence: big.NewInt(0),
|
Sequence: big.NewInt(0),
|
||||||
Round: big.NewInt(0),
|
Round: big.NewInt(0),
|
||||||
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
|
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
|
||||||
pendingRequests: prque.New(),
|
pendingRequests: prque.New(),
|
||||||
pendingRequestsMu: new(sync.Mutex),
|
pendingRequestsMu: new(sync.Mutex),
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,10 +48,9 @@ func (c *core) sendRoundChange(round *big.Int) {
|
||||||
|
|
||||||
// Now we have the new round number and sequence number
|
// Now we have the new round number and sequence number
|
||||||
cv = c.currentView()
|
cv = c.currentView()
|
||||||
rc := &roundChange{
|
rc := &istanbul.Subject{
|
||||||
Round: new(big.Int).Set(cv.Round),
|
View: cv,
|
||||||
Sequence: new(big.Int).Set(cv.Sequence),
|
Digest: common.Hash{},
|
||||||
Digest: common.Hash{},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
payload, err := Encode(rc)
|
payload, err := Encode(rc)
|
||||||
|
@ -70,29 +69,22 @@ func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
|
||||||
logger := c.logger.New("state", c.state, "from", src.Address().Hex())
|
logger := c.logger.New("state", c.state, "from", src.Address().Hex())
|
||||||
|
|
||||||
// Decode ROUND CHANGE message
|
// Decode ROUND CHANGE message
|
||||||
var rc *roundChange
|
var rc *istanbul.Subject
|
||||||
if err := msg.Decode(&rc); err != nil {
|
if err := msg.Decode(&rc); err != nil {
|
||||||
logger.Error("Failed to decode ROUND CHANGE", "err", err)
|
logger.Error("Failed to decode ROUND CHANGE", "err", err)
|
||||||
return errInvalidMessage
|
return errInvalidMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := c.checkMessage(msgRoundChange, rc.View); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
cv := c.currentView()
|
cv := c.currentView()
|
||||||
|
roundView := rc.View
|
||||||
// We never accept ROUND CHANGE message with different sequence number
|
|
||||||
if rc.Sequence.Cmp(cv.Sequence) != 0 {
|
|
||||||
logger.Warn("Inconsistent sequence number", "expected", cv.Sequence, "got", rc.Sequence)
|
|
||||||
return errInvalidMessage
|
|
||||||
}
|
|
||||||
|
|
||||||
// We never accept ROUND CHANGE message with smaller round number
|
|
||||||
if rc.Round.Cmp(cv.Round) < 0 {
|
|
||||||
logger.Warn("Old round change", "from", src, "expected", cv.Round, "got", rc.Round)
|
|
||||||
return errOldMessage
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the ROUND CHANGE message to its message set and return how many
|
// Add the ROUND CHANGE message to its message set and return how many
|
||||||
// messages we've got with the same round number and sequence number.
|
// messages we've got with the same round number and sequence number.
|
||||||
num, err := c.roundChangeSet.Add(rc.Round, msg)
|
num, err := c.roundChangeSet.Add(roundView.Round, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn("Failed to add round change message", "from", src, "msg", msg, "err", err)
|
logger.Warn("Failed to add round change message", "from", src, "msg", msg, "err", err)
|
||||||
return err
|
return err
|
||||||
|
@ -102,21 +94,17 @@ func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
|
||||||
// If our round number is smaller than the certificate's round number, we would
|
// If our round number is smaller than the certificate's round number, we would
|
||||||
// try to catch up the round number.
|
// try to catch up the round number.
|
||||||
if c.waitingForRoundChange && num == int(c.valSet.F()+1) {
|
if c.waitingForRoundChange && num == int(c.valSet.F()+1) {
|
||||||
if cv.Round.Cmp(rc.Round) < 0 {
|
if cv.Round.Cmp(roundView.Round) < 0 {
|
||||||
c.sendRoundChange(rc.Round)
|
c.sendRoundChange(roundView.Round)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
} else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(rc.Round) < 0) {
|
} else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(roundView.Round) < 0) {
|
||||||
// We've received 2f+1 ROUND CHANGE messages, start a new round immediately.
|
// We've received 2f+1 ROUND CHANGE messages, start a new round immediately.
|
||||||
c.startNewRound(&istanbul.View{
|
c.startNewRound(roundView.Round)
|
||||||
Round: new(big.Int).Set(rc.Round),
|
|
||||||
Sequence: new(big.Int).Set(rc.Sequence),
|
|
||||||
}, nil, common.Address{}, true)
|
|
||||||
return nil
|
return nil
|
||||||
} else if cv.Round.Cmp(rc.Round) < 0 {
|
} else if cv.Round.Cmp(roundView.Round) < 0 {
|
||||||
// We consider the message with larger round as future messages and not
|
// Only gossip the message with current round to other validators.
|
||||||
// gossip it to other validators.
|
return errIgnored
|
||||||
return errFutureMessage
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,10 +33,9 @@ func TestRoundChangeSet(t *testing.T) {
|
||||||
Sequence: big.NewInt(1),
|
Sequence: big.NewInt(1),
|
||||||
Round: big.NewInt(1),
|
Round: big.NewInt(1),
|
||||||
}
|
}
|
||||||
r := &roundChange{
|
r := &istanbul.Subject{
|
||||||
Round: view.Round,
|
View: view,
|
||||||
Sequence: view.Sequence,
|
Digest: common.Hash{},
|
||||||
Digest: common.Hash{},
|
|
||||||
}
|
}
|
||||||
m, _ := Encode(r)
|
m, _ := Encode(r)
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ import (
|
||||||
// newRoundState creates a new roundState instance with the given view and validatorSet
|
// newRoundState creates a new roundState instance with the given view and validatorSet
|
||||||
// lockedHash and preprepare are for round change when lock exists,
|
// lockedHash and preprepare are for round change when lock exists,
|
||||||
// we need to keep a reference of preprepare in order to propose locked proposal when there is a lock and itself is the proposer
|
// we need to keep a reference of preprepare in order to propose locked proposal when there is a lock and itself is the proposer
|
||||||
func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lockedHash common.Hash, preprepare *istanbul.Preprepare, pendingRequest *istanbul.Request) *roundState {
|
func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lockedHash common.Hash, preprepare *istanbul.Preprepare, pendingRequest *istanbul.Request, hasBadProposal func(hash common.Hash) bool) *roundState {
|
||||||
return &roundState{
|
return &roundState{
|
||||||
round: view.Round,
|
round: view.Round,
|
||||||
sequence: view.Sequence,
|
sequence: view.Sequence,
|
||||||
|
@ -39,6 +39,7 @@ func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lock
|
||||||
lockedHash: lockedHash,
|
lockedHash: lockedHash,
|
||||||
mu: new(sync.RWMutex),
|
mu: new(sync.RWMutex),
|
||||||
pendingRequest: pendingRequest,
|
pendingRequest: pendingRequest,
|
||||||
|
hasBadProposal: hasBadProposal,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +53,23 @@ type roundState struct {
|
||||||
lockedHash common.Hash
|
lockedHash common.Hash
|
||||||
pendingRequest *istanbul.Request
|
pendingRequest *istanbul.Request
|
||||||
|
|
||||||
mu *sync.RWMutex
|
mu *sync.RWMutex
|
||||||
|
hasBadProposal func(hash common.Hash) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *roundState) GetPrepareOrCommitSize() int {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
result := s.Prepares.Size() + s.Commits.Size()
|
||||||
|
|
||||||
|
// find duplicate one
|
||||||
|
for _, m := range s.Prepares.Values() {
|
||||||
|
if s.Commits.Get(m.Address) != nil {
|
||||||
|
result--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *roundState) Subject() *istanbul.Subject {
|
func (s *roundState) Subject() *istanbul.Subject {
|
||||||
|
@ -138,7 +155,10 @@ func (s *roundState) IsHashLocked() bool {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
return s.lockedHash != common.Hash{}
|
if common.EmptyHash(s.lockedHash) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return !s.hasBadProposal(s.GetLockedHash())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *roundState) GetLockedHash() common.Hash {
|
func (s *roundState) GetLockedHash() common.Hash {
|
||||||
|
|
|
@ -33,6 +33,9 @@ func newTestRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet)
|
||||||
Prepares: newMessageSet(validatorSet),
|
Prepares: newMessageSet(validatorSet),
|
||||||
Commits: newMessageSet(validatorSet),
|
Commits: newMessageSet(validatorSet),
|
||||||
mu: new(sync.RWMutex),
|
mu: new(sync.RWMutex),
|
||||||
|
hasBadProposal: func(hash common.Hash) bool {
|
||||||
|
return false
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -100,9 +100,7 @@ func (self *testSystemBackend) Commit(proposal istanbul.Proposal, seals [][]byte
|
||||||
})
|
})
|
||||||
|
|
||||||
// fake new head events
|
// fake new head events
|
||||||
go self.events.Post(istanbul.FinalCommittedEvent{
|
go self.events.Post(istanbul.FinalCommittedEvent{})
|
||||||
Proposal: proposal,
|
|
||||||
})
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,8 +131,29 @@ func (self *testSystemBackend) NewRequest(request istanbul.Proposal) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *testSystemBackend) HasBadProposal(hash common.Hash) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (self *testSystemBackend) LastProposal() (istanbul.Proposal, common.Address) {
|
func (self *testSystemBackend) LastProposal() (istanbul.Proposal, common.Address) {
|
||||||
return makeBlock(1), common.Address{}
|
l := len(self.committedMsgs)
|
||||||
|
if l > 0 {
|
||||||
|
return self.committedMsgs[l-1].commitProposal, common.Address{}
|
||||||
|
}
|
||||||
|
return makeBlock(0), common.Address{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only block height 5 will return true
|
||||||
|
func (self *testSystemBackend) HasPropsal(hash common.Hash, number *big.Int) bool {
|
||||||
|
return number.Cmp(big.NewInt(5)) == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *testSystemBackend) GetProposer(number uint64) common.Address {
|
||||||
|
return common.Address{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *testSystemBackend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
|
||||||
|
return self.peers
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==============================================
|
// ==============================================
|
||||||
|
@ -187,11 +206,13 @@ func NewTestSystemWithBackend(n, f uint64) *testSystem {
|
||||||
|
|
||||||
core := New(backend, config).(*core)
|
core := New(backend, config).(*core)
|
||||||
core.state = StateAcceptRequest
|
core.state = StateAcceptRequest
|
||||||
core.lastProposer = common.Address{}
|
|
||||||
core.current = newRoundState(&istanbul.View{
|
core.current = newRoundState(&istanbul.View{
|
||||||
Round: big.NewInt(0),
|
Round: big.NewInt(0),
|
||||||
Sequence: big.NewInt(1),
|
Sequence: big.NewInt(1),
|
||||||
}, vset, common.Hash{}, nil, nil)
|
}, vset, common.Hash{}, nil, nil, func(hash common.Hash) bool {
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
core.valSet = vset
|
||||||
core.logger = testLogger
|
core.logger = testLogger
|
||||||
core.validateFn = backend.CheckValidatorSignature
|
core.validateFn = backend.CheckValidatorSignature
|
||||||
|
|
||||||
|
@ -223,7 +244,7 @@ func (t *testSystem) listen() {
|
||||||
func (t *testSystem) Run(core bool) func() {
|
func (t *testSystem) Run(core bool) func() {
|
||||||
for _, b := range t.backends {
|
for _, b := range t.backends {
|
||||||
if core {
|
if core {
|
||||||
b.engine.Start(common.Big0, common.Address{}, nil) // start Istanbul core
|
b.engine.Start() // start Istanbul core
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,15 +19,13 @@ package core
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math/big"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Engine interface {
|
type Engine interface {
|
||||||
Start(lastSequence *big.Int, lastProposer common.Address, lastProposal istanbul.Proposal) error
|
Start() error
|
||||||
Stop() error
|
Stop() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,34 +162,3 @@ func (m *message) String() string {
|
||||||
func Encode(val interface{}) ([]byte, error) {
|
func Encode(val interface{}) ([]byte, error) {
|
||||||
return rlp.EncodeToBytes(val)
|
return rlp.EncodeToBytes(val)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
type roundChange struct {
|
|
||||||
Round *big.Int
|
|
||||||
Sequence *big.Int
|
|
||||||
Digest common.Hash
|
|
||||||
}
|
|
||||||
|
|
||||||
// EncodeRLP serializes rc into the Ethereum RLP format.
|
|
||||||
func (rc *roundChange) EncodeRLP(w io.Writer) error {
|
|
||||||
return rlp.Encode(w, []interface{}{
|
|
||||||
rc.Round,
|
|
||||||
rc.Sequence,
|
|
||||||
rc.Digest,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// DecodeRLP implements rlp.Decoder, and load the consensus fields from a RLP stream.
|
|
||||||
func (rc *roundChange) DecodeRLP(s *rlp.Stream) error {
|
|
||||||
var rawRoundChange struct {
|
|
||||||
Round *big.Int
|
|
||||||
Sequence *big.Int
|
|
||||||
Digest common.Hash
|
|
||||||
}
|
|
||||||
if err := s.Decode(&rawRoundChange); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
rc.Round, rc.Sequence, rc.Digest = rawRoundChange.Round, rawRoundChange.Sequence, rawRoundChange.Digest
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -172,47 +172,8 @@ func testSubjectWithSignature(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testRoundChange(t *testing.T) {
|
|
||||||
rc := &roundChange{
|
|
||||||
Round: big.NewInt(1),
|
|
||||||
Sequence: big.NewInt(2),
|
|
||||||
Digest: common.StringToHash("1234567890"),
|
|
||||||
}
|
|
||||||
RoundChangePayload, _ := Encode(rc)
|
|
||||||
|
|
||||||
m := &message{
|
|
||||||
Code: msgRoundChange,
|
|
||||||
Msg: RoundChangePayload,
|
|
||||||
Address: common.HexToAddress("0x1234567890"),
|
|
||||||
}
|
|
||||||
|
|
||||||
msgPayload, err := m.Payload()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("error mismatch: have %v, want nil", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
decodedMsg := new(message)
|
|
||||||
err = decodedMsg.FromPayload(msgPayload, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("error mismatch: have %v, want nil", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var decodedRC *roundChange
|
|
||||||
err = decodedMsg.Decode(&decodedRC)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("error mismatch: have %v, want nil", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// if block is encoded/decoded by rlp, we cannot to compare interface data type using reflect.DeepEqual. (like istanbul.Proposal)
|
|
||||||
// so individual comparison here.
|
|
||||||
if !reflect.DeepEqual(rc, decodedRC) {
|
|
||||||
t.Errorf("round change mismatch: have %v, want %v", decodedRC, rc)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMessageEncodeDecode(t *testing.T) {
|
func TestMessageEncodeDecode(t *testing.T) {
|
||||||
testPreprepare(t)
|
testPreprepare(t)
|
||||||
testSubject(t)
|
testSubject(t)
|
||||||
testSubjectWithSignature(t)
|
testSubjectWithSignature(t)
|
||||||
testRoundChange(t)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,8 +16,6 @@
|
||||||
|
|
||||||
package istanbul
|
package istanbul
|
||||||
|
|
||||||
import "github.com/ethereum/go-ethereum/common"
|
|
||||||
|
|
||||||
// RequestEvent is posted to propose a proposal
|
// RequestEvent is posted to propose a proposal
|
||||||
type RequestEvent struct {
|
type RequestEvent struct {
|
||||||
Proposal Proposal
|
Proposal Proposal
|
||||||
|
@ -30,6 +28,4 @@ type MessageEvent struct {
|
||||||
|
|
||||||
// FinalCommittedEvent is posted when a proposal is committed
|
// FinalCommittedEvent is posted when a proposal is committed
|
||||||
type FinalCommittedEvent struct {
|
type FinalCommittedEvent struct {
|
||||||
Proposal Proposal
|
|
||||||
Proposer common.Address
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,6 +71,8 @@ type ValidatorSet interface {
|
||||||
Copy() ValidatorSet
|
Copy() ValidatorSet
|
||||||
// Get the maximum number of faulty nodes
|
// Get the maximum number of faulty nodes
|
||||||
F() int
|
F() int
|
||||||
|
// Get proposer policy
|
||||||
|
Policy() ProposerPolicy
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
|
|
@ -41,16 +41,18 @@ func (val *defaultValidator) String() string {
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
|
||||||
type defaultSet struct {
|
type defaultSet struct {
|
||||||
validators istanbul.Validators
|
validators istanbul.Validators
|
||||||
|
policy istanbul.ProposerPolicy
|
||||||
|
|
||||||
proposer istanbul.Validator
|
proposer istanbul.Validator
|
||||||
validatorMu sync.RWMutex
|
validatorMu sync.RWMutex
|
||||||
|
selector istanbul.ProposalSelector
|
||||||
selector istanbul.ProposalSelector
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDefaultSet(addrs []common.Address, selector istanbul.ProposalSelector) *defaultSet {
|
func newDefaultSet(addrs []common.Address, policy istanbul.ProposerPolicy) *defaultSet {
|
||||||
valSet := &defaultSet{}
|
valSet := &defaultSet{}
|
||||||
|
|
||||||
|
valSet.policy = policy
|
||||||
// init validators
|
// init validators
|
||||||
valSet.validators = make([]istanbul.Validator, len(addrs))
|
valSet.validators = make([]istanbul.Validator, len(addrs))
|
||||||
for i, addr := range addrs {
|
for i, addr := range addrs {
|
||||||
|
@ -62,8 +64,10 @@ func newDefaultSet(addrs []common.Address, selector istanbul.ProposalSelector) *
|
||||||
if valSet.Size() > 0 {
|
if valSet.Size() > 0 {
|
||||||
valSet.proposer = valSet.GetByIndex(0)
|
valSet.proposer = valSet.GetByIndex(0)
|
||||||
}
|
}
|
||||||
//set proposal selector
|
valSet.selector = roundRobinProposer
|
||||||
valSet.selector = selector
|
if policy == istanbul.Sticky {
|
||||||
|
valSet.selector = stickyProposer
|
||||||
|
}
|
||||||
|
|
||||||
return valSet
|
return valSet
|
||||||
}
|
}
|
||||||
|
@ -182,14 +186,16 @@ func (valSet *defaultSet) RemoveValidator(address common.Address) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (valSet *defaultSet) Copy() istanbul.ValidatorSet {
|
func (valSet *defaultSet) Copy() istanbul.ValidatorSet {
|
||||||
valSet.validatorMu.Lock()
|
valSet.validatorMu.RLock()
|
||||||
defer valSet.validatorMu.Unlock()
|
defer valSet.validatorMu.RUnlock()
|
||||||
|
|
||||||
addresses := make([]common.Address, 0, len(valSet.validators))
|
addresses := make([]common.Address, 0, len(valSet.validators))
|
||||||
for _, v := range valSet.validators {
|
for _, v := range valSet.validators {
|
||||||
addresses = append(addresses, v.Address())
|
addresses = append(addresses, v.Address())
|
||||||
}
|
}
|
||||||
return newDefaultSet(addresses, valSet.selector)
|
return NewSet(addresses, valSet.policy)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (valSet *defaultSet) F() int { return int(math.Ceil(float64(valSet.Size())/3)) - 1 }
|
func (valSet *defaultSet) F() int { return int(math.Ceil(float64(valSet.Size())/3)) - 1 }
|
||||||
|
|
||||||
|
func (valSet *defaultSet) Policy() istanbul.ProposerPolicy { return valSet.policy }
|
||||||
|
|
|
@ -78,7 +78,7 @@ func testNormalValSet(t *testing.T) {
|
||||||
val1 := New(addr1)
|
val1 := New(addr1)
|
||||||
val2 := New(addr2)
|
val2 := New(addr2)
|
||||||
|
|
||||||
valSet := newDefaultSet([]common.Address{addr1, addr2}, roundRobinProposer)
|
valSet := newDefaultSet([]common.Address{addr1, addr2}, istanbul.RoundRobin)
|
||||||
if valSet == nil {
|
if valSet == nil {
|
||||||
t.Errorf("the format of validator set is invalid")
|
t.Errorf("the format of validator set is invalid")
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
|
@ -182,7 +182,7 @@ func testStickyProposer(t *testing.T) {
|
||||||
val1 := New(addr1)
|
val1 := New(addr1)
|
||||||
val2 := New(addr2)
|
val2 := New(addr2)
|
||||||
|
|
||||||
valSet := newDefaultSet([]common.Address{addr1, addr2}, stickyProposer)
|
valSet := newDefaultSet([]common.Address{addr1, addr2}, istanbul.Sticky)
|
||||||
|
|
||||||
// test get proposer
|
// test get proposer
|
||||||
if val := valSet.GetProposer(); !reflect.DeepEqual(val, val1) {
|
if val := valSet.GetProposer(); !reflect.DeepEqual(val, val1) {
|
||||||
|
|
|
@ -28,15 +28,7 @@ func New(addr common.Address) istanbul.Validator {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSet(addrs []common.Address, policy istanbul.ProposerPolicy) istanbul.ValidatorSet {
|
func NewSet(addrs []common.Address, policy istanbul.ProposerPolicy) istanbul.ValidatorSet {
|
||||||
switch policy {
|
return newDefaultSet(addrs, policy)
|
||||||
case istanbul.RoundRobin:
|
|
||||||
return newDefaultSet(addrs, roundRobinProposer)
|
|
||||||
case istanbul.Sticky:
|
|
||||||
return newDefaultSet(addrs, stickyProposer)
|
|
||||||
}
|
|
||||||
|
|
||||||
// use round-robin policy as default proposal policy
|
|
||||||
return newDefaultSet(addrs, roundRobinProposer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExtractValidators(extraData []byte) []common.Address {
|
func ExtractValidators(extraData []byte) []common.Address {
|
||||||
|
|
|
@ -46,10 +46,10 @@ type Protocol struct {
|
||||||
Lengths []uint64
|
Lengths []uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcaster defines the interface to broadcast blocks and find peer
|
// Broadcaster defines the interface to enqueue blocks to fetcher and find peer
|
||||||
type Broadcaster interface {
|
type Broadcaster interface {
|
||||||
// BroadcastBlock broadcasts blocks to peers
|
// Enqueue add a block into fetcher queue
|
||||||
BroadcastBlock(block *types.Block, propagate bool)
|
Enqueue(id string, block *types.Block)
|
||||||
// FindPeers retrives peers by addresses
|
// FindPeers retrives peers by addresses
|
||||||
FindPeers(map[common.Address]bool) map[common.Address]Peer
|
FindPeers(map[common.Address]bool) map[common.Address]Peer
|
||||||
}
|
}
|
||||||
|
|
|
@ -1297,6 +1297,11 @@ func (bc *BlockChain) BadBlocks() ([]BadBlockArgs, error) {
|
||||||
return headers, nil
|
return headers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasBadBlock returns whether the block with the hash is a bad block
|
||||||
|
func (bc *BlockChain) HasBadBlock(hash common.Hash) bool {
|
||||||
|
return bc.badBlocks.Contains(hash)
|
||||||
|
}
|
||||||
|
|
||||||
// addBadBlock adds a bad block to the bad-block LRU cache
|
// addBadBlock adds a bad block to the bad-block LRU cache
|
||||||
func (bc *BlockChain) addBadBlock(block *types.Block) {
|
func (bc *BlockChain) addBadBlock(block *types.Block) {
|
||||||
bc.badBlocks.Add(block.Header().Hash(), block.Header())
|
bc.badBlocks.Add(block.Header().Hash(), block.Header())
|
||||||
|
|
|
@ -356,8 +356,6 @@ func (s *Ethereum) StartMining(local bool) error {
|
||||||
return fmt.Errorf("signer missing: %v", err)
|
return fmt.Errorf("signer missing: %v", err)
|
||||||
}
|
}
|
||||||
clique.Authorize(eb, wallet.SignHash)
|
clique.Authorize(eb, wallet.SignHash)
|
||||||
} else if istanbul, ok := s.engine.(consensus.Istanbul); ok {
|
|
||||||
istanbul.Start(s.blockchain, s.blockchain.InsertChain)
|
|
||||||
}
|
}
|
||||||
if local {
|
if local {
|
||||||
// If local (CPU) mining is started, we can disable the transaction rejection
|
// If local (CPU) mining is started, we can disable the transaction rejection
|
||||||
|
@ -370,12 +368,7 @@ func (s *Ethereum) StartMining(local bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Ethereum) StopMining() {
|
func (s *Ethereum) StopMining() { s.miner.Stop() }
|
||||||
s.miner.Stop()
|
|
||||||
if istanbul, ok := s.engine.(consensus.Istanbul); ok {
|
|
||||||
istanbul.Stop()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func (s *Ethereum) IsMining() bool { return s.miner.Mining() }
|
func (s *Ethereum) IsMining() bool { return s.miner.Mining() }
|
||||||
func (s *Ethereum) Miner() *miner.Miner { return s.miner }
|
func (s *Ethereum) Miner() *miner.Miner { return s.miner }
|
||||||
|
|
||||||
|
|
|
@ -125,7 +125,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
|
||||||
handler.SetBroadcaster(manager)
|
handler.SetBroadcaster(manager)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Figure out whether to allow fast sync or not
|
// Figure out whether to allow fast sync or not
|
||||||
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
|
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
|
||||||
log.Warn("Blockchain not empty, fast sync disabled")
|
log.Warn("Blockchain not empty, fast sync disabled")
|
||||||
mode = downloader.FullSync
|
mode = downloader.FullSync
|
||||||
|
@ -716,6 +716,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pm *ProtocolManager) Enqueue(id string, block *types.Block) {
|
||||||
|
pm.fetcher.Enqueue(id, block)
|
||||||
|
}
|
||||||
|
|
||||||
// BroadcastBlock will either propagate a block to a subset of it's peers, or
|
// BroadcastBlock will either propagate a block to a subset of it's peers, or
|
||||||
// will only announce it's availability (depending what's requested).
|
// will only announce it's availability (depending what's requested).
|
||||||
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
|
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
|
||||||
|
|
|
@ -212,6 +212,9 @@ func (self *worker) start() {
|
||||||
defer self.mu.Unlock()
|
defer self.mu.Unlock()
|
||||||
|
|
||||||
atomic.StoreInt32(&self.mining, 1)
|
atomic.StoreInt32(&self.mining, 1)
|
||||||
|
if istanbul, ok := self.engine.(consensus.Istanbul); ok {
|
||||||
|
istanbul.Start(self.chain, self.chain.CurrentBlock, self.chain.HasBadBlock)
|
||||||
|
}
|
||||||
|
|
||||||
// spin up agents
|
// spin up agents
|
||||||
for agent := range self.agents {
|
for agent := range self.agents {
|
||||||
|
@ -229,6 +232,11 @@ func (self *worker) stop() {
|
||||||
agent.Stop()
|
agent.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if istanbul, ok := self.engine.(consensus.Istanbul); ok {
|
||||||
|
istanbul.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
atomic.StoreInt32(&self.mining, 0)
|
atomic.StoreInt32(&self.mining, 0)
|
||||||
atomic.StoreInt32(&self.atWork, 0)
|
atomic.StoreInt32(&self.atWork, 0)
|
||||||
}
|
}
|
||||||
|
@ -256,11 +264,11 @@ func (self *worker) update() {
|
||||||
// A real event arrived, process interesting content
|
// A real event arrived, process interesting content
|
||||||
select {
|
select {
|
||||||
// Handle ChainHeadEvent
|
// Handle ChainHeadEvent
|
||||||
case ev := <-self.chainHeadCh:
|
case <-self.chainHeadCh:
|
||||||
self.commitNewWork()
|
if h, ok := self.engine.(consensus.Handler); ok {
|
||||||
if h, ok := self.engine.(consensus.Handler); ok && ev.Block != nil {
|
h.NewChainHead()
|
||||||
h.NewChainHead(ev.Block)
|
|
||||||
}
|
}
|
||||||
|
self.commitNewWork()
|
||||||
|
|
||||||
// Handle ChainSideEvent
|
// Handle ChainSideEvent
|
||||||
case ev := <-self.chainSideCh:
|
case ev := <-self.chainSideCh:
|
||||||
|
|
|
@ -184,8 +184,8 @@ func (c *ChainConfig) String() string {
|
||||||
engine = c.Ethash
|
engine = c.Ethash
|
||||||
case c.Clique != nil:
|
case c.Clique != nil:
|
||||||
engine = c.Clique
|
engine = c.Clique
|
||||||
//case c.Istanbul != nil:
|
case c.Istanbul != nil:
|
||||||
// engine = c.Istanbul
|
engine = c.Istanbul
|
||||||
default:
|
default:
|
||||||
engine = "unknown"
|
engine = "unknown"
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue