consensus, core, eth, miner, params: update istanbul engine

This commit is contained in:
mark.lin 2018-01-31 10:27:08 +08:00
parent 3719d61ee3
commit 9ac24f9aaa
38 changed files with 495 additions and 321 deletions

View File

@ -99,7 +99,7 @@ type Engine interface {
// Handler should be implemented is the consensus needs to handle and send peer's message
type Handler interface {
// NewChainHead handles a new head block comes
NewChainHead(block *types.Block) error
NewChainHead() error
// HandleMsg handles a message from peer
HandleMsg(address common.Address, data p2p.Msg) (bool, error)
@ -121,7 +121,7 @@ type Istanbul interface {
Engine
// Start starts the engine
Start(chain ChainReader, inserter func(types.Blocks) (int, error)) error
Start(chain ChainReader, currentBlock func() *types.Block, hasBadBlock func(hash common.Hash) bool) error
// Stop stops the engine
Stop() error

View File

@ -17,6 +17,7 @@
package istanbul
import (
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
@ -57,4 +58,16 @@ type Backend interface {
// LastProposal retrieves latest committed proposal and the address of proposer
LastProposal() (Proposal, common.Address)
// HasPropsal checks if the combination of the given hash and height matches any existing blocks
HasPropsal(hash common.Hash, number *big.Int) bool
// GetProposer returns the proposer of the given block height
GetProposer(number uint64) common.Address
// ParentValidators returns the validator set of the given proposal's parent block
ParentValidators(proposal Proposal) ValidatorSet
// HasBadBlock returns whether the block with the hash is a bad block
HasBadProposal(hash common.Hash) bool
}

View File

@ -18,6 +18,7 @@ package backend
import (
"crypto/ecdsa"
"math/big"
"sync"
"time"
@ -35,6 +36,11 @@ import (
lru "github.com/hashicorp/golang-lru"
)
const (
// fetcherID is the ID indicates the block is from Istanbul engine
fetcherID = "istanbul"
)
// New creates an Ethereum backend for Istanbul core engine.
func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Database) consensus.Istanbul {
// Allocate the snapshot caches and create the engine
@ -70,14 +76,15 @@ type backend struct {
logger log.Logger
db ethdb.Database
chain consensus.ChainReader
inserter func(types.Blocks) (int, error)
currentBlock func() *types.Block
hasBadBlock func(hash common.Hash) bool
// the channels for istanbul engine notifications
commitCh chan *types.Block
proposedBlockHash common.Hash
sealMu sync.Mutex
coreStarted bool
coreMu sync.Mutex
coreMu sync.RWMutex
// Current list of candidates we are pushing
candidates map[common.Address]bool
@ -180,16 +187,11 @@ func (sb *backend) Commit(proposal istanbul.Proposal, seals [][]byte) error {
if sb.proposedBlockHash == block.Hash() {
// feed block hash to Seal() and wait the Seal() result
sb.commitCh <- block
// TODO: how do we check the block is inserted correctly?
return nil
}
// if I'm not a proposer, insert the block directly and broadcast NewCommittedEvent
if _, err := sb.inserter(types.Blocks{block}); err != nil && err != core.ErrKnownBlock {
return err
}
if sb.broadcaster != nil {
go sb.broadcaster.BroadcastBlock(block, false)
sb.broadcaster.Enqueue(fetcherID, block)
}
return nil
}
@ -208,6 +210,22 @@ func (sb *backend) Verify(proposal istanbul.Proposal) (time.Duration, error) {
sb.logger.Error("Invalid proposal, %v", proposal)
return 0, errInvalidProposal
}
// check bad block
if sb.HasBadProposal(block.Hash()) {
return 0, core.ErrBlacklistedHash
}
// check block body
txnHash := types.DeriveSha(block.Transactions())
uncleHash := types.CalcUncleHash(block.Uncles())
if txnHash != block.Header().TxHash {
return 0, errMismatchTxhashes
}
if uncleHash != nilUncleHash {
return 0, errInvalidUncleHash
}
// verify the header of proposed block
err := sb.VerifyHeader(sb.chain, block.Header(), false)
// ignore errEmptyCommittedSeals error because we don't have the committed seals yet
@ -239,6 +257,11 @@ func (sb *backend) CheckSignature(data []byte, address common.Address, sig []byt
return nil
}
// HasPropsal implements istanbul.Backend.HashBlock
func (sb *backend) HasPropsal(hash common.Hash, number *big.Int) bool {
return sb.chain.GetHeader(hash, number.Uint64()) != nil
}
// GetProposer implements istanbul.Backend.GetProposer
func (sb *backend) GetProposer(number uint64) common.Address {
if h := sb.chain.GetHeaderByNumber(number); h != nil {
@ -248,6 +271,14 @@ func (sb *backend) GetProposer(number uint64) common.Address {
return common.Address{}
}
// ParentValidators implements istanbul.Backend.GetParentValidators
func (sb *backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
if block, ok := proposal.(*types.Block); ok {
return sb.getValidators(block.Number().Uint64()-1, block.ParentHash())
}
return validator.NewSet(nil, sb.config.ProposerPolicy)
}
func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet {
snap, err := sb.snapshot(sb.chain, number, hash, nil)
if err != nil {
@ -257,17 +288,12 @@ func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.Valid
}
func (sb *backend) LastProposal() (istanbul.Proposal, common.Address) {
if sb.chain == nil {
sb.logger.Error("Failed to access blockchain")
return nil, common.Address{}
}
h := sb.chain.CurrentHeader()
block := sb.currentBlock()
var proposer common.Address
if h.Number.Cmp(common.Big0) > 0 {
if block.Number().Cmp(common.Big0) > 0 {
var err error
proposer, err = sb.Author(h)
proposer, err = sb.Author(block.Header())
if err != nil {
sb.logger.Error("Failed to get block proposer", "err", err)
return nil, common.Address{}
@ -275,5 +301,12 @@ func (sb *backend) LastProposal() (istanbul.Proposal, common.Address) {
}
// Return header only block here since we don't need block body
return types.NewBlockWithHeader(h), proposer
return block, proposer
}
func (sb *backend) HasBadProposal(hash common.Hash) bool {
if sb.hasBadBlock == nil {
return false
}
return sb.hasBadBlock(hash)
}

View File

@ -29,11 +29,10 @@ import (
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)
func TestSign(t *testing.T) {
b, _, _ := newBackend()
b := newBackend()
data := []byte("Here is a string....")
sig, err := b.Sign(data)
if err != nil {
@ -54,7 +53,7 @@ func TestCheckSignature(t *testing.T) {
data := []byte("Here is a string....")
hashData := crypto.Keccak256([]byte(data))
sig, _ := crypto.Sign(hashData, key)
b, _, _ := newBackend()
b := newBackend()
a := getAddress()
err := b.CheckSignature(data, a, sig)
if err != nil {
@ -68,7 +67,7 @@ func TestCheckSignature(t *testing.T) {
}
func TestCheckValidatorSignature(t *testing.T) {
_, keys, vset := newBackend()
vset, keys := newTestValidatorSet(5)
// 1. Positive test: sign with validator's key should succeed
data := []byte("dummy data")
@ -113,7 +112,7 @@ func TestCheckValidatorSignature(t *testing.T) {
}
func TestCommit(t *testing.T) {
backend, _, _ := newBackend()
backend := newBackend()
commitCh := make(chan *types.Block)
// Case: it's a proposer, so the backend.commit will receive channel result from backend.Commit function
@ -235,13 +234,9 @@ func (slice Keys) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}
func newBackend() (b *backend, validatorKeys Keys, validatorSet istanbul.ValidatorSet) {
func newBackend() (b *backend) {
_, b = newBlockChain(4)
key, _ := generatePrivateKey()
validatorSet, validatorKeys = newTestValidatorSet(5)
b = &backend{
privateKey: key,
logger: log.New("backend", "simple"),
commitCh: make(chan *types.Block, 1),
}
b.privateKey = key
return
}

View File

@ -80,6 +80,8 @@ var (
errInvalidCommittedSeals = errors.New("invalid committed seals")
// errEmptyCommittedSeals is returned if the field of committed seals is zero.
errEmptyCommittedSeals = errors.New("zero committed seals")
// errMismatchTxhashes is returned if the TxHash in header is mismatch.
errMismatchTxhashes = errors.New("mismatch transcations hashes")
)
var (
defaultDifficulty = big.NewInt(1)
@ -485,7 +487,7 @@ func (sb *backend) APIs(chain consensus.ChainReader) []rpc.API {
}
// Start implements consensus.Istanbul.Start
func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks) (int, error)) error {
func (sb *backend) Start(chain consensus.ChainReader, currentBlock func() *types.Block, hasBadBlock func(hash common.Hash) bool) error {
sb.coreMu.Lock()
defer sb.coreMu.Unlock()
if sb.coreStarted {
@ -500,23 +502,10 @@ func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks
sb.commitCh = make(chan *types.Block, 1)
sb.chain = chain
sb.inserter = inserter
sb.currentBlock = currentBlock
sb.hasBadBlock = hasBadBlock
curHeader := chain.CurrentHeader()
lastSequence := new(big.Int).Set(curHeader.Number)
lastProposer := common.Address{}
// should get proposer if the block is not genesis
if lastSequence.Cmp(common.Big0) > 0 {
p, err := sb.Author(curHeader)
if err != nil {
return err
}
lastProposer = p
}
// We don't need block body so we create a header only block.
// The proposal is only for validator set calculation.
lastProposal := types.NewBlockWithHeader(curHeader)
if err := sb.core.Start(lastSequence, lastProposer, lastProposal); err != nil {
if err := sb.core.Start(); err != nil {
return err
}

View File

@ -51,7 +51,7 @@ func newBlockChain(n int) (*core.BlockChain, *backend) {
if err != nil {
panic(err)
}
b.Start(blockchain, blockchain.InsertChain)
b.Start(blockchain, blockchain.CurrentBlock, blockchain.HasBadBlock)
snap, err := b.snapshot(blockchain, 0, common.Hash{}, nil)
if err != nil {
panic(err)
@ -138,7 +138,7 @@ func makeBlock(chain *core.BlockChain, engine *backend, parent *types.Block) *ty
func makeBlockWithoutSeal(chain *core.BlockChain, engine *backend, parent *types.Block) *types.Block {
header := makeHeader(parent, engine.config)
engine.Prepare(chain, header)
state, _, _ := chain.StateAt(parent.Root())
state, _,_ := chain.StateAt(parent.Root())
block, _ := engine.Finalize(chain, header, state, nil, nil, nil)
return block
}

View File

@ -22,7 +22,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
lru "github.com/hashicorp/golang-lru"
)
@ -93,20 +92,12 @@ func (sb *backend) SetBroadcaster(broadcaster consensus.Broadcaster) {
sb.broadcaster = broadcaster
}
func (sb *backend) NewChainHead(block *types.Block) error {
sb.coreMu.Lock()
defer sb.coreMu.Unlock()
func (sb *backend) NewChainHead() error {
sb.coreMu.RLock()
defer sb.coreMu.RUnlock()
if !sb.coreStarted {
return istanbul.ErrStoppedEngine
}
p, err := sb.Author(block.Header())
if err != nil {
sb.logger.Error("Failed to get block proposer", "err", err)
return err
}
go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{
Proposal: block,
Proposer: p,
})
go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{})
return nil
}

View File

@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
)
@ -50,11 +51,11 @@ type Tally struct {
type Snapshot struct {
Epoch uint64 // The number of blocks after which to checkpoint and reset the pending votes
Number uint64 `json:"number"` // Block number where the snapshot was created
Hash common.Hash `json:"hash"` // Block hash where the snapshot was created
Votes []*Vote `json:"votes"` // List of votes cast in chronological order
Tally map[common.Address]Tally `json:"tally"` // Current vote tally to avoid recalculating
ValSet istanbul.ValidatorSet `json:"validators"` // Set of authorized validators at this moment
Number uint64 // Block number where the snapshot was created
Hash common.Hash // Block hash where the snapshot was created
Votes []*Vote // List of votes cast in chronological order
Tally map[common.Address]Tally // Current vote tally to avoid recalculating
ValSet istanbul.ValidatorSet // Set of authorized validators at this moment
}
// newSnapshot create a new snapshot with the specified startup parameters. This
@ -272,3 +273,49 @@ func (s *Snapshot) validators() []common.Address {
}
return validators
}
type snapshotJSON struct {
Epoch uint64 `json:"epoch"`
Number uint64 `json:"number"`
Hash common.Hash `json:"hash"`
Votes []*Vote `json:"votes"`
Tally map[common.Address]Tally `json:"tally"`
// for validator set
Validators []common.Address `json:"validators"`
Policy istanbul.ProposerPolicy `json:"policy"`
}
func (s *Snapshot) toJSONStruct() *snapshotJSON {
return &snapshotJSON{
Epoch: s.Epoch,
Number: s.Number,
Hash: s.Hash,
Votes: s.Votes,
Tally: s.Tally,
Validators: s.validators(),
Policy: s.ValSet.Policy(),
}
}
// Unmarshal from a json byte array
func (s *Snapshot) UnmarshalJSON(b []byte) error {
var j snapshotJSON
if err := json.Unmarshal(b, &j); err != nil {
return err
}
s.Epoch = j.Epoch
s.Number = j.Number
s.Hash = j.Hash
s.Votes = j.Votes
s.Tally = j.Tally
s.ValSet = validator.NewSet(j.Validators, j.Policy)
return nil
}
// Marshal to a json byte array
func (s *Snapshot) MarshalJSON() ([]byte, error) {
j := s.toJSONStruct()
return json.Marshal(j)
}

View File

@ -20,10 +20,12 @@ import (
"bytes"
"crypto/ecdsa"
"math/big"
"reflect"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@ -400,3 +402,54 @@ func TestVoting(t *testing.T) {
}
}
}
func TestSaveAndLoad(t *testing.T) {
snap := &Snapshot{
Epoch: 5,
Number: 10,
Hash: common.HexToHash("1234567890"),
Votes: []*Vote{
{
Validator: common.StringToAddress("1234567891"),
Block: 15,
Address: common.StringToAddress("1234567892"),
Authorize: false,
},
},
Tally: map[common.Address]Tally{
common.StringToAddress("1234567893"): Tally{
Authorize: false,
Votes: 20,
},
},
ValSet: validator.NewSet([]common.Address{
common.StringToAddress("1234567894"),
common.StringToAddress("1234567895"),
}, istanbul.RoundRobin),
}
db, _ := ethdb.NewMemDatabase()
err := snap.store(db)
if err != nil {
t.Errorf("store snapshot failed: %v", err)
}
snap1, err := loadSnapshot(snap.Epoch, db, snap.Hash)
if err != nil {
t.Errorf("load snapshot failed: %v", err)
}
if snap.Epoch != snap1.Epoch {
t.Errorf("epoch mismatch: have %v, want %v", snap1.Epoch, snap.Epoch)
}
if snap.Hash != snap1.Hash {
t.Errorf("hash mismatch: have %v, want %v", snap1.Number, snap.Number)
}
if !reflect.DeepEqual(snap.Votes, snap.Votes) {
t.Errorf("votes mismatch: have %v, want %v", snap1.Votes, snap.Votes)
}
if !reflect.DeepEqual(snap.Tally, snap.Tally) {
t.Errorf("tally mismatch: have %v, want %v", snap1.Tally, snap.Tally)
}
if !reflect.DeepEqual(snap.ValSet, snap.ValSet) {
t.Errorf("validator set mismatch: have %v, want %v", snap1.ValSet, snap.ValSet)
}
}

View File

@ -40,6 +40,15 @@ func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error {
return errInvalidMessage
}
if msgCode == msgRoundChange {
if view.Sequence.Cmp(c.currentView().Sequence) > 0 {
return errFutureMessage
} else if view.Cmp(c.currentView()) < 0 {
return errOldMessage
}
return nil
}
if view.Cmp(c.currentView()) > 0 {
return errFutureMessage
}
@ -90,7 +99,7 @@ func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
if err == nil {
backlog.Push(msg, toPriority(msg.Code, p.View))
}
// for istanbul.MsgPrepare and istanbul.MsgCommit cases
// for msgRoundChange, msgPrepare and msgCommit cases
default:
var p *istanbul.Subject
err := msg.Decode(&p)
@ -127,7 +136,7 @@ func (c *core) processBacklog() {
if err == nil {
view = m.View
}
// for istanbul.MsgPrepare and istanbul.MsgCommit cases
// for msgRoundChange, msgPrepare and msgCommit cases
default:
var sub *istanbul.Subject
err := msg.Decode(&sub)
@ -162,6 +171,10 @@ func (c *core) processBacklog() {
}
func toPriority(msgCode uint64, view *istanbul.View) float32 {
if msgCode == msgRoundChange {
// For msgRoundChange, set the message priority based on its sequence
return -float32(view.Sequence.Uint64() * 1000)
}
// FIXME: round will be reset as 0 while new sequence
// 10 * Round limits the range of message code is from 0 to 9
// 1000 * Sequence limits the range of round is from 0 to 99

View File

@ -37,7 +37,7 @@ func TestCheckMessage(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
}
// invalid view format
@ -47,7 +47,7 @@ func TestCheckMessage(t *testing.T) {
}
testStates := []State{StateAcceptRequest, StatePreprepared, StatePrepared, StateCommitted}
testCode := []uint64{msgPreprepare, msgPrepare, msgCommit}
testCode := []uint64{msgPreprepare, msgPrepare, msgCommit, msgRoundChange}
// future sequence
v := &istanbul.View{
@ -73,7 +73,11 @@ func TestCheckMessage(t *testing.T) {
c.state = testStates[i]
for j := 0; j < len(testCode); j++ {
err := c.checkMessage(testCode[j], v)
if err != errFutureMessage {
if testCode[j] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != errFutureMessage {
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
}
}
@ -89,7 +93,11 @@ func TestCheckMessage(t *testing.T) {
c.state = testStates[i]
for j := 0; j < len(testCode); j++ {
err := c.checkMessage(testCode[j], v)
if err != errFutureMessage {
if testCode[j] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != errFutureMessage {
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
}
}
@ -101,7 +109,11 @@ func TestCheckMessage(t *testing.T) {
c.state = StateAcceptRequest
for i := 0; i < len(testCode); i++ {
err = c.checkMessage(testCode[i], v)
if testCode[i] == msgPreprepare {
if testCode[i] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if testCode[i] == msgPreprepare {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
@ -116,7 +128,11 @@ func TestCheckMessage(t *testing.T) {
c.state = StatePreprepared
for i := 0; i < len(testCode); i++ {
err = c.checkMessage(testCode[i], v)
if err != nil {
if testCode[i] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
}
@ -125,7 +141,11 @@ func TestCheckMessage(t *testing.T) {
c.state = StatePrepared
for i := 0; i < len(testCode); i++ {
err = c.checkMessage(testCode[i], v)
if err != nil {
if testCode[i] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
}
@ -134,7 +154,11 @@ func TestCheckMessage(t *testing.T) {
c.state = StateCommitted
for i := 0; i < len(testCode); i++ {
err = c.checkMessage(testCode[i], v)
if err != nil {
if testCode[i] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
}
@ -195,6 +219,17 @@ func TestStoreBacklog(t *testing.T) {
if !reflect.DeepEqual(msg, m) {
t.Errorf("message mismatch: have %v, want %v", msg, m)
}
// push roundChange msg
m = &message{
Code: msgRoundChange,
Msg: subjectPayload,
}
c.storeBacklog(m, p)
msg = c.backlogs[p].PopItem()
if !reflect.DeepEqual(msg, m) {
t.Errorf("message mismatch: have %v, want %v", msg, m)
}
}
func TestProcessFutureBacklog(t *testing.T) {
@ -209,7 +244,7 @@ func TestProcessFutureBacklog(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
state: StateAcceptRequest,
}
c.subscribeEvents()
@ -276,6 +311,10 @@ func TestProcessBacklog(t *testing.T) {
Code: msgCommit,
Msg: subjectPayload,
},
&message{
Code: msgRoundChange,
Msg: subjectPayload,
},
}
for i := 0; i < len(msgs); i++ {
testProcessBacklog(t, msgs[i])
@ -297,7 +336,7 @@ func testProcessBacklog(t *testing.T, msg *message) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
}
c.subscribeEvents()
defer c.unsubscribeEvents()

View File

@ -39,6 +39,7 @@ func New(backend istanbul.Backend, config *istanbul.Config) Engine {
config: config,
address: backend.Address(),
state: StateAcceptRequest,
handlerWg: new(sync.WaitGroup),
logger: log.New("address", backend.Address()),
backend: backend,
backlogs: make(map[istanbul.Validator]*prque.Prque),
@ -68,8 +69,6 @@ type core struct {
timeoutSub *event.TypeMuxSubscription
futurePreprepareTimer *time.Timer
lastProposer common.Address
lastProposal istanbul.Proposal
valSet istanbul.ValidatorSet
waitingForRoundChange bool
validateFn func([]byte, []byte) (common.Address, error)
@ -77,7 +76,8 @@ type core struct {
backlogs map[istanbul.Validator]*prque.Prque
backlogsMu *sync.Mutex
current *roundState
current *roundState
handlerWg *sync.WaitGroup
roundChangeSet *roundChangeSet
roundChangeTimer *time.Timer
@ -179,35 +179,65 @@ func (c *core) commit() {
}
}
func (c *core) startNewRound(newView *istanbul.View, lastProposal istanbul.Proposal, lastProposer common.Address, roundChange bool) {
// startNewRound starts a new round. if round equals to 0, it means to starts a new sequence
func (c *core) startNewRound(round *big.Int) {
var logger log.Logger
if c.current == nil {
logger = c.logger.New("old_round", -1, "old_seq", 0, "old_proposer", c.valSet.GetProposer())
logger = c.logger.New("old_round", -1, "old_seq", 0)
} else {
logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence(), "old_proposer", c.valSet.GetProposer())
logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence())
}
roundChange := false
// Try to get last proposal
if lastProposal == nil {
lastProposal, lastProposer = c.backend.LastProposal()
if lastProposal.Number().Cmp(newView.Sequence) > 0 {
newView = &istanbul.View{
Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
Round: new(big.Int),
}
c.lastProposal = lastProposal
c.lastProposer = lastProposer
logger.Trace("Catch up latest proposal", "number", lastProposal.Number().Uint64(), "hash", lastProposal.Hash())
lastProposal, lastProposer := c.backend.LastProposal()
if c.current == nil {
logger.Trace("Start to the initial round")
} else if lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
diff := new(big.Int).Sub(lastProposal.Number(), c.current.Sequence())
c.sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64())
if !c.consensusTimestamp.IsZero() {
c.consensusTimer.UpdateSince(c.consensusTimestamp)
c.consensusTimestamp = time.Time{}
}
logger.Trace("Catch up latest proposal", "number", lastProposal.Number().Uint64(), "hash", lastProposal.Hash())
} else if lastProposal.Number().Cmp(big.NewInt(c.current.Sequence().Int64()-1)) == 0 {
if round.Cmp(common.Big0) == 0 {
// same seq and round, don't need to start new round
return
} else if round.Cmp(c.current.Round()) < 0 {
logger.Warn("New round should not be smaller than current round", "seq", lastProposal.Number().Int64(), "new_round", round, "old_round", c.current.Round())
return
}
roundChange = true
} else {
logger.Warn("New sequence should be larger than current sequence", "new_seq", lastProposal.Number().Int64())
return
}
c.valSet = c.backend.Validators(c.lastProposal)
var newView *istanbul.View
if roundChange {
newView = &istanbul.View{
Sequence: new(big.Int).Set(c.current.Sequence()),
Round: new(big.Int).Set(round),
}
} else {
newView = &istanbul.View{
Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
Round: new(big.Int),
}
c.valSet = c.backend.Validators(lastProposal)
}
// Update logger
logger = logger.New("old_proposer", c.valSet.GetProposer())
// Clear invalid ROUND CHANGE messages
c.roundChangeSet = newRoundChangeSet(c.valSet)
// New snapshot for new round
c.updateRoundState(newView, c.valSet, roundChange)
// Calculate new proposer
c.valSet.CalcProposer(c.lastProposer, newView.Round.Uint64())
c.valSet.CalcProposer(lastProposer, newView.Round.Uint64())
c.waitingForRoundChange = false
c.setState(StateAcceptRequest)
if roundChange && c.isProposer() && c.current != nil {
@ -248,12 +278,12 @@ func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.Valid
// Lock only if both roundChange is true and it is locked
if roundChange && c.current != nil {
if c.current.IsHashLocked() {
c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest)
c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest, c.backend.HasBadProposal)
} else {
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest)
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest, c.backend.HasBadProposal)
}
} else {
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil)
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil, c.backend.HasBadProposal)
}
}
@ -288,8 +318,12 @@ func (c *core) newRoundChangeTimer() {
c.stopTimer()
// set timeout based on the round number
t := uint64(math.Pow(2, float64(c.current.Round().Uint64()))) * c.config.RequestTimeout
timeout := time.Duration(t) * time.Millisecond
timeout := time.Duration(c.config.RequestTimeout) * time.Millisecond
round := c.current.Round().Uint64()
if round > 0 {
timeout += time.Duration(math.Pow(2, float64(round))) * time.Second
}
c.roundChangeTimer = time.AfterFunc(timeout, func() {
c.sendEvent(timeoutEvent{})
})

View File

@ -16,36 +16,11 @@
package core
import (
"math/big"
"time"
import "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
func (c *core) handleFinalCommitted(proposal istanbul.Proposal, proposer common.Address) error {
logger := c.logger.New("state", c.state, "number", proposal.Number(), "hash", proposal.Hash())
func (c *core) handleFinalCommitted() error {
logger := c.logger.New("state", c.state)
logger.Trace("Received a final committed proposal")
// Catch up the sequence number
if proposal.Number().Cmp(c.current.Sequence()) >= 0 {
// Remember to store the proposer since we've accpetted the proposal
diff := new(big.Int).Sub(proposal.Number(), c.current.Sequence())
c.sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64())
if !c.consensusTimestamp.IsZero() {
c.consensusTimer.UpdateSince(c.consensusTimestamp)
c.consensusTimestamp = time.Time{}
}
c.lastProposer = proposer
c.lastProposal = proposal
c.startNewRound(&istanbul.View{
Sequence: new(big.Int).Add(proposal.Number(), common.Big1),
Round: new(big.Int).Set(common.Big0),
}, proposal, proposer, false)
}
c.startNewRound(common.Big0)
return nil
}

View File

@ -17,24 +17,14 @@
package core
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
// Start implements core.Engine.Start
func (c *core) Start(lastSequence *big.Int, lastProposer common.Address, lastProposal istanbul.Proposal) error {
// Initialize last proposer
c.lastProposer = lastProposer
c.lastProposal = lastProposal
c.valSet = c.backend.Validators(c.lastProposal)
func (c *core) Start() error {
// Start a new round from last sequence + 1
c.startNewRound(&istanbul.View{
Sequence: new(big.Int).Add(lastSequence, common.Big1),
Round: common.Big0,
}, lastProposal, lastProposer, false)
c.startNewRound(common.Big0)
// Tests will handle events itself, so we have to make subscribeEvents()
// be able to call in test.
@ -48,6 +38,9 @@ func (c *core) Start(lastSequence *big.Int, lastProposer common.Address, lastPro
func (c *core) Stop() error {
c.stopTimer()
c.unsubscribeEvents()
// Make sure the handler goroutine exits
c.handlerWg.Wait()
return nil
}
@ -78,6 +71,14 @@ func (c *core) unsubscribeEvents() {
}
func (c *core) handleEvents() {
// Clear state
defer func() {
c.current = nil
c.handlerWg.Done()
}()
c.handlerWg.Add(1)
for {
select {
case event, ok := <-c.events.Chan():
@ -118,9 +119,9 @@ func (c *core) handleEvents() {
if !ok {
return
}
switch ev := event.Data.(type) {
switch event.Data.(type) {
case istanbul.FinalCommittedEvent:
c.handleFinalCommitted(ev.Proposal, ev.Proposer)
c.handleFinalCommitted()
}
}
}
@ -171,7 +172,7 @@ func (c *core) handleCheckedMsg(msg *message, src istanbul.Validator) error {
case msgCommit:
return testBacklog(c.handleCommit(msg, src))
case msgRoundChange:
return c.handleRoundChange(msg, src)
return testBacklog(c.handleRoundChange(msg, src))
default:
logger.Error("Invalid message", "msg", msg)
}
@ -191,13 +192,10 @@ func (c *core) handleTimeoutMsg() {
}
}
lastProposal, lastProposer := c.backend.LastProposal()
if lastProposal != nil && lastProposal.Number().Cmp(c.current.Sequence()) > 0 {
lastProposal, _ := c.backend.LastProposal()
if lastProposal != nil && lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
c.logger.Trace("round change timeout, catch up latest sequence", "number", lastProposal.Number().Uint64())
c.startNewRound(&istanbul.View{
Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
Round: new(big.Int),
}, lastProposal, lastProposer, false)
c.startNewRound(common.Big0)
} else {
c.sendNextRoundChange()
}

View File

@ -34,7 +34,7 @@ func newMessageSet(valSet istanbul.ValidatorSet) *messageSet {
Sequence: new(big.Int),
},
messagesMu: new(sync.Mutex),
messages: make(map[common.Hash]*message),
messages: make(map[common.Address]*message),
valSet: valSet,
}
}
@ -45,7 +45,7 @@ type messageSet struct {
view *istanbul.View
valSet istanbul.ValidatorSet
messagesMu *sync.Mutex
messages map[common.Hash]*message
messages map[common.Address]*message
}
func (ms *messageSet) View() *istanbul.View {
@ -80,6 +80,12 @@ func (ms *messageSet) Size() int {
return len(ms.messages)
}
func (ms *messageSet) Get(addr common.Address) *message {
ms.messagesMu.Lock()
defer ms.messagesMu.Unlock()
return ms.messages[addr]
}
// ----------------------------------------------------------------------------
func (ms *messageSet) verify(msg *message) error {
@ -94,7 +100,7 @@ func (ms *messageSet) verify(msg *message) error {
}
func (ms *messageSet) addVerifiedMessage(msg *message) error {
ms.messages[istanbul.RLPHash(msg)] = msg
ms.messages[msg.Address] = msg
return nil
}

View File

@ -59,7 +59,8 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
// Change to Prepared state if we've received enough PREPARE messages or it is locked
// and we are in earlier state before Prepared state.
if (c.current.IsHashLocked() || c.current.Prepares.Size() > 2*c.valSet.F()) && c.state.Cmp(StatePrepared) < 0 {
if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.GetPrepareOrCommitSize() > 2*c.valSet.F()) &&
c.state.Cmp(StatePrepared) < 0 {
c.current.LockHash()
c.setState(StatePrepared)
c.sendCommit()

View File

@ -56,7 +56,21 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
}
// Ensure we have the same view with the PRE-PREPARE message
// If it is old message, see if we need to broadcast COMMIT
if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil {
if err == errOldMessage {
// Get validator set for the given proposal
valSet := c.backend.ParentValidators(preprepare.Proposal).Copy()
previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1)
valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64())
// Broadcast COMMIT if it is an existing block
// 1. The proposer needs to be a proposer matches the given (Sequence + Round)
// 2. The given block must exist
if valSet.IsProposer(src.Address()) && c.backend.HasPropsal(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) {
c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal.Hash())
return nil
}
}
return err
}
@ -87,8 +101,16 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
// Here is about to accept the PRE-PREPARE
if c.state == StateAcceptRequest {
// Send ROUND CHANGE if the locked proposal and the received proposal are different
if c.current.IsHashLocked() && preprepare.Proposal.Hash() != c.current.GetLockedHash() {
c.sendNextRoundChange()
if c.current.IsHashLocked() {
if preprepare.Proposal.Hash() == c.current.GetLockedHash() {
// Broadcast COMMIT and enters Prepared state directly
c.acceptPreprepare(preprepare)
c.setState(StatePrepared)
c.sendCommit()
} else {
// Send round change
c.sendNextRoundChange()
}
} else {
// Either
// 1. the locked proposal and the received proposal match

View File

@ -273,7 +273,7 @@ func TestHandlePreprepareWithLock(t *testing.T) {
t.Errorf("error mismatch: have %v, want nil", err)
}
if test.proposal == test.lockProposal {
if c.state != StatePreprepared {
if c.state != StatePrepared {
t.Errorf("state mismatch: have %v, want %v", c.state, StatePreprepared)
}
if !reflect.DeepEqual(curView, c.currentView()) {

View File

@ -60,7 +60,7 @@ func (c *core) checkRequestMsg(request *istanbul.Request) error {
func (c *core) storeRequestMsg(request *istanbul.Request) {
logger := c.logger.New("state", c.state)
logger.Trace("Store future request", "request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
logger.Trace("Store future request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
c.pendingRequestsMu.Lock()
defer c.pendingRequestsMu.Unlock()

View File

@ -36,7 +36,7 @@ func TestCheckRequestMsg(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
}
// invalid request
@ -91,7 +91,7 @@ func TestStoreRequestMsg(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(0),
Round: big.NewInt(0),
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
pendingRequests: prque.New(),
pendingRequestsMu: new(sync.Mutex),
}

View File

@ -48,10 +48,9 @@ func (c *core) sendRoundChange(round *big.Int) {
// Now we have the new round number and sequence number
cv = c.currentView()
rc := &roundChange{
Round: new(big.Int).Set(cv.Round),
Sequence: new(big.Int).Set(cv.Sequence),
Digest: common.Hash{},
rc := &istanbul.Subject{
View: cv,
Digest: common.Hash{},
}
payload, err := Encode(rc)
@ -70,29 +69,22 @@ func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
logger := c.logger.New("state", c.state, "from", src.Address().Hex())
// Decode ROUND CHANGE message
var rc *roundChange
var rc *istanbul.Subject
if err := msg.Decode(&rc); err != nil {
logger.Error("Failed to decode ROUND CHANGE", "err", err)
return errInvalidMessage
}
if err := c.checkMessage(msgRoundChange, rc.View); err != nil {
return err
}
cv := c.currentView()
// We never accept ROUND CHANGE message with different sequence number
if rc.Sequence.Cmp(cv.Sequence) != 0 {
logger.Warn("Inconsistent sequence number", "expected", cv.Sequence, "got", rc.Sequence)
return errInvalidMessage
}
// We never accept ROUND CHANGE message with smaller round number
if rc.Round.Cmp(cv.Round) < 0 {
logger.Warn("Old round change", "from", src, "expected", cv.Round, "got", rc.Round)
return errOldMessage
}
roundView := rc.View
// Add the ROUND CHANGE message to its message set and return how many
// messages we've got with the same round number and sequence number.
num, err := c.roundChangeSet.Add(rc.Round, msg)
num, err := c.roundChangeSet.Add(roundView.Round, msg)
if err != nil {
logger.Warn("Failed to add round change message", "from", src, "msg", msg, "err", err)
return err
@ -102,21 +94,17 @@ func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
// If our round number is smaller than the certificate's round number, we would
// try to catch up the round number.
if c.waitingForRoundChange && num == int(c.valSet.F()+1) {
if cv.Round.Cmp(rc.Round) < 0 {
c.sendRoundChange(rc.Round)
if cv.Round.Cmp(roundView.Round) < 0 {
c.sendRoundChange(roundView.Round)
}
return nil
} else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(rc.Round) < 0) {
} else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(roundView.Round) < 0) {
// We've received 2f+1 ROUND CHANGE messages, start a new round immediately.
c.startNewRound(&istanbul.View{
Round: new(big.Int).Set(rc.Round),
Sequence: new(big.Int).Set(rc.Sequence),
}, nil, common.Address{}, true)
c.startNewRound(roundView.Round)
return nil
} else if cv.Round.Cmp(rc.Round) < 0 {
// We consider the message with larger round as future messages and not
// gossip it to other validators.
return errFutureMessage
} else if cv.Round.Cmp(roundView.Round) < 0 {
// Only gossip the message with current round to other validators.
return errIgnored
}
return nil
}

View File

@ -33,10 +33,9 @@ func TestRoundChangeSet(t *testing.T) {
Sequence: big.NewInt(1),
Round: big.NewInt(1),
}
r := &roundChange{
Round: view.Round,
Sequence: view.Sequence,
Digest: common.Hash{},
r := &istanbul.Subject{
View: view,
Digest: common.Hash{},
}
m, _ := Encode(r)

View File

@ -29,7 +29,7 @@ import (
// newRoundState creates a new roundState instance with the given view and validatorSet
// lockedHash and preprepare are for round change when lock exists,
// we need to keep a reference of preprepare in order to propose locked proposal when there is a lock and itself is the proposer
func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lockedHash common.Hash, preprepare *istanbul.Preprepare, pendingRequest *istanbul.Request) *roundState {
func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lockedHash common.Hash, preprepare *istanbul.Preprepare, pendingRequest *istanbul.Request, hasBadProposal func(hash common.Hash) bool) *roundState {
return &roundState{
round: view.Round,
sequence: view.Sequence,
@ -39,6 +39,7 @@ func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lock
lockedHash: lockedHash,
mu: new(sync.RWMutex),
pendingRequest: pendingRequest,
hasBadProposal: hasBadProposal,
}
}
@ -52,7 +53,23 @@ type roundState struct {
lockedHash common.Hash
pendingRequest *istanbul.Request
mu *sync.RWMutex
mu *sync.RWMutex
hasBadProposal func(hash common.Hash) bool
}
func (s *roundState) GetPrepareOrCommitSize() int {
s.mu.RLock()
defer s.mu.RUnlock()
result := s.Prepares.Size() + s.Commits.Size()
// find duplicate one
for _, m := range s.Prepares.Values() {
if s.Commits.Get(m.Address) != nil {
result--
}
}
return result
}
func (s *roundState) Subject() *istanbul.Subject {
@ -138,7 +155,10 @@ func (s *roundState) IsHashLocked() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.lockedHash != common.Hash{}
if common.EmptyHash(s.lockedHash) {
return false
}
return !s.hasBadProposal(s.GetLockedHash())
}
func (s *roundState) GetLockedHash() common.Hash {

View File

@ -33,6 +33,9 @@ func newTestRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet)
Prepares: newMessageSet(validatorSet),
Commits: newMessageSet(validatorSet),
mu: new(sync.RWMutex),
hasBadProposal: func(hash common.Hash) bool {
return false
},
}
}

View File

@ -100,9 +100,7 @@ func (self *testSystemBackend) Commit(proposal istanbul.Proposal, seals [][]byte
})
// fake new head events
go self.events.Post(istanbul.FinalCommittedEvent{
Proposal: proposal,
})
go self.events.Post(istanbul.FinalCommittedEvent{})
return nil
}
@ -133,8 +131,29 @@ func (self *testSystemBackend) NewRequest(request istanbul.Proposal) {
})
}
func (self *testSystemBackend) HasBadProposal(hash common.Hash) bool {
return false
}
func (self *testSystemBackend) LastProposal() (istanbul.Proposal, common.Address) {
return makeBlock(1), common.Address{}
l := len(self.committedMsgs)
if l > 0 {
return self.committedMsgs[l-1].commitProposal, common.Address{}
}
return makeBlock(0), common.Address{}
}
// Only block height 5 will return true
func (self *testSystemBackend) HasPropsal(hash common.Hash, number *big.Int) bool {
return number.Cmp(big.NewInt(5)) == 0
}
func (self *testSystemBackend) GetProposer(number uint64) common.Address {
return common.Address{}
}
func (self *testSystemBackend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
return self.peers
}
// ==============================================
@ -187,11 +206,13 @@ func NewTestSystemWithBackend(n, f uint64) *testSystem {
core := New(backend, config).(*core)
core.state = StateAcceptRequest
core.lastProposer = common.Address{}
core.current = newRoundState(&istanbul.View{
Round: big.NewInt(0),
Sequence: big.NewInt(1),
}, vset, common.Hash{}, nil, nil)
}, vset, common.Hash{}, nil, nil, func(hash common.Hash) bool {
return false
})
core.valSet = vset
core.logger = testLogger
core.validateFn = backend.CheckValidatorSignature
@ -223,7 +244,7 @@ func (t *testSystem) listen() {
func (t *testSystem) Run(core bool) func() {
for _, b := range t.backends {
if core {
b.engine.Start(common.Big0, common.Address{}, nil) // start Istanbul core
b.engine.Start() // start Istanbul core
}
}

View File

@ -19,15 +19,13 @@ package core
import (
"fmt"
"io"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/rlp"
)
type Engine interface {
Start(lastSequence *big.Int, lastProposer common.Address, lastProposal istanbul.Proposal) error
Start() error
Stop() error
}
@ -164,34 +162,3 @@ func (m *message) String() string {
func Encode(val interface{}) ([]byte, error) {
return rlp.EncodeToBytes(val)
}
// ----------------------------------------------------------------------------
type roundChange struct {
Round *big.Int
Sequence *big.Int
Digest common.Hash
}
// EncodeRLP serializes rc into the Ethereum RLP format.
func (rc *roundChange) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{
rc.Round,
rc.Sequence,
rc.Digest,
})
}
// DecodeRLP implements rlp.Decoder, and load the consensus fields from a RLP stream.
func (rc *roundChange) DecodeRLP(s *rlp.Stream) error {
var rawRoundChange struct {
Round *big.Int
Sequence *big.Int
Digest common.Hash
}
if err := s.Decode(&rawRoundChange); err != nil {
return err
}
rc.Round, rc.Sequence, rc.Digest = rawRoundChange.Round, rawRoundChange.Sequence, rawRoundChange.Digest
return nil
}

View File

@ -172,47 +172,8 @@ func testSubjectWithSignature(t *testing.T) {
}
}
func testRoundChange(t *testing.T) {
rc := &roundChange{
Round: big.NewInt(1),
Sequence: big.NewInt(2),
Digest: common.StringToHash("1234567890"),
}
RoundChangePayload, _ := Encode(rc)
m := &message{
Code: msgRoundChange,
Msg: RoundChangePayload,
Address: common.HexToAddress("0x1234567890"),
}
msgPayload, err := m.Payload()
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
decodedMsg := new(message)
err = decodedMsg.FromPayload(msgPayload, nil)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
var decodedRC *roundChange
err = decodedMsg.Decode(&decodedRC)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
// if block is encoded/decoded by rlp, we cannot to compare interface data type using reflect.DeepEqual. (like istanbul.Proposal)
// so individual comparison here.
if !reflect.DeepEqual(rc, decodedRC) {
t.Errorf("round change mismatch: have %v, want %v", decodedRC, rc)
}
}
func TestMessageEncodeDecode(t *testing.T) {
testPreprepare(t)
testSubject(t)
testSubjectWithSignature(t)
testRoundChange(t)
}

View File

@ -16,8 +16,6 @@
package istanbul
import "github.com/ethereum/go-ethereum/common"
// RequestEvent is posted to propose a proposal
type RequestEvent struct {
Proposal Proposal
@ -30,6 +28,4 @@ type MessageEvent struct {
// FinalCommittedEvent is posted when a proposal is committed
type FinalCommittedEvent struct {
Proposal Proposal
Proposer common.Address
}

View File

@ -71,6 +71,8 @@ type ValidatorSet interface {
Copy() ValidatorSet
// Get the maximum number of faulty nodes
F() int
// Get proposer policy
Policy() ProposerPolicy
}
// ----------------------------------------------------------------------------

View File

@ -41,16 +41,18 @@ func (val *defaultValidator) String() string {
// ----------------------------------------------------------------------------
type defaultSet struct {
validators istanbul.Validators
validators istanbul.Validators
policy istanbul.ProposerPolicy
proposer istanbul.Validator
validatorMu sync.RWMutex
selector istanbul.ProposalSelector
selector istanbul.ProposalSelector
}
func newDefaultSet(addrs []common.Address, selector istanbul.ProposalSelector) *defaultSet {
func newDefaultSet(addrs []common.Address, policy istanbul.ProposerPolicy) *defaultSet {
valSet := &defaultSet{}
valSet.policy = policy
// init validators
valSet.validators = make([]istanbul.Validator, len(addrs))
for i, addr := range addrs {
@ -62,8 +64,10 @@ func newDefaultSet(addrs []common.Address, selector istanbul.ProposalSelector) *
if valSet.Size() > 0 {
valSet.proposer = valSet.GetByIndex(0)
}
//set proposal selector
valSet.selector = selector
valSet.selector = roundRobinProposer
if policy == istanbul.Sticky {
valSet.selector = stickyProposer
}
return valSet
}
@ -182,14 +186,16 @@ func (valSet *defaultSet) RemoveValidator(address common.Address) bool {
}
func (valSet *defaultSet) Copy() istanbul.ValidatorSet {
valSet.validatorMu.Lock()
defer valSet.validatorMu.Unlock()
valSet.validatorMu.RLock()
defer valSet.validatorMu.RUnlock()
addresses := make([]common.Address, 0, len(valSet.validators))
for _, v := range valSet.validators {
addresses = append(addresses, v.Address())
}
return newDefaultSet(addresses, valSet.selector)
return NewSet(addresses, valSet.policy)
}
func (valSet *defaultSet) F() int { return int(math.Ceil(float64(valSet.Size())/3)) - 1 }
func (valSet *defaultSet) Policy() istanbul.ProposerPolicy { return valSet.policy }

View File

@ -78,7 +78,7 @@ func testNormalValSet(t *testing.T) {
val1 := New(addr1)
val2 := New(addr2)
valSet := newDefaultSet([]common.Address{addr1, addr2}, roundRobinProposer)
valSet := newDefaultSet([]common.Address{addr1, addr2}, istanbul.RoundRobin)
if valSet == nil {
t.Errorf("the format of validator set is invalid")
t.FailNow()
@ -182,7 +182,7 @@ func testStickyProposer(t *testing.T) {
val1 := New(addr1)
val2 := New(addr2)
valSet := newDefaultSet([]common.Address{addr1, addr2}, stickyProposer)
valSet := newDefaultSet([]common.Address{addr1, addr2}, istanbul.Sticky)
// test get proposer
if val := valSet.GetProposer(); !reflect.DeepEqual(val, val1) {

View File

@ -28,15 +28,7 @@ func New(addr common.Address) istanbul.Validator {
}
func NewSet(addrs []common.Address, policy istanbul.ProposerPolicy) istanbul.ValidatorSet {
switch policy {
case istanbul.RoundRobin:
return newDefaultSet(addrs, roundRobinProposer)
case istanbul.Sticky:
return newDefaultSet(addrs, stickyProposer)
}
// use round-robin policy as default proposal policy
return newDefaultSet(addrs, roundRobinProposer)
return newDefaultSet(addrs, policy)
}
func ExtractValidators(extraData []byte) []common.Address {

View File

@ -46,10 +46,10 @@ type Protocol struct {
Lengths []uint64
}
// Broadcaster defines the interface to broadcast blocks and find peer
// Broadcaster defines the interface to enqueue blocks to fetcher and find peer
type Broadcaster interface {
// BroadcastBlock broadcasts blocks to peers
BroadcastBlock(block *types.Block, propagate bool)
// Enqueue add a block into fetcher queue
Enqueue(id string, block *types.Block)
// FindPeers retrives peers by addresses
FindPeers(map[common.Address]bool) map[common.Address]Peer
}

View File

@ -1272,6 +1272,11 @@ func (bc *BlockChain) BadBlocks() ([]BadBlockArgs, error) {
return headers, nil
}
// HasBadBlock returns whether the block with the hash is a bad block
func (bc *BlockChain) HasBadBlock(hash common.Hash) bool {
return bc.badBlocks.Contains(hash)
}
// addBadBlock adds a bad block to the bad-block LRU cache
func (bc *BlockChain) addBadBlock(block *types.Block) {
bc.badBlocks.Add(block.Header().Hash(), block.Header())

View File

@ -356,8 +356,6 @@ func (s *Ethereum) StartMining(local bool) error {
return fmt.Errorf("signer missing: %v", err)
}
clique.Authorize(eb, wallet.SignHash)
} else if istanbul, ok := s.engine.(consensus.Istanbul); ok {
istanbul.Start(s.blockchain, s.blockchain.InsertChain)
}
if local {
// If local (CPU) mining is started, we can disable the transaction rejection
@ -370,12 +368,7 @@ func (s *Ethereum) StartMining(local bool) error {
return nil
}
func (s *Ethereum) StopMining() {
s.miner.Stop()
if istanbul, ok := s.engine.(consensus.Istanbul); ok {
istanbul.Stop()
}
}
func (s *Ethereum) StopMining() { s.miner.Stop() }
func (s *Ethereum) IsMining() bool { return s.miner.Mining() }
func (s *Ethereum) Miner() *miner.Miner { return s.miner }

View File

@ -125,7 +125,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
handler.SetBroadcaster(manager)
}
// Figure out whether to allow fast sync or not
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")
mode = downloader.FullSync
@ -716,6 +716,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return nil
}
func (pm *ProtocolManager) Enqueue(id string, block *types.Block) {
pm.fetcher.Enqueue(id, block)
}
// BroadcastBlock will either propagate a block to a subset of it's peers, or
// will only announce it's availability (depending what's requested).
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {

View File

@ -212,6 +212,9 @@ func (self *worker) start() {
defer self.mu.Unlock()
atomic.StoreInt32(&self.mining, 1)
if istanbul, ok := self.engine.(consensus.Istanbul); ok {
istanbul.Start(self.chain, self.chain.CurrentBlock, self.chain.HasBadBlock)
}
// spin up agents
for agent := range self.agents {
@ -229,6 +232,11 @@ func (self *worker) stop() {
agent.Stop()
}
}
if istanbul, ok := self.engine.(consensus.Istanbul); ok {
istanbul.Stop()
}
atomic.StoreInt32(&self.mining, 0)
atomic.StoreInt32(&self.atWork, 0)
}
@ -256,11 +264,11 @@ func (self *worker) update() {
// A real event arrived, process interesting content
select {
// Handle ChainHeadEvent
case ev := <-self.chainHeadCh:
self.commitNewWork()
if h, ok := self.engine.(consensus.Handler); ok && ev.Block != nil {
h.NewChainHead(ev.Block)
case <-self.chainHeadCh:
if h, ok := self.engine.(consensus.Handler); ok {
h.NewChainHead()
}
self.commitNewWork()
// Handle ChainSideEvent
case ev := <-self.chainSideCh:

View File

@ -184,8 +184,8 @@ func (c *ChainConfig) String() string {
engine = c.Ethash
case c.Clique != nil:
engine = c.Clique
//case c.Istanbul != nil:
// engine = c.Istanbul
case c.Istanbul != nil:
engine = c.Istanbul
default:
engine = "unknown"
}