From 9ac24f9aaafb67a7760bdd3dabed3cc5030152d7 Mon Sep 17 00:00:00 2001 From: "mark.lin" Date: Wed, 31 Jan 2018 10:27:08 +0800 Subject: [PATCH] consensus, core, eth, miner, params: update istanbul engine --- consensus/consensus.go | 4 +- consensus/istanbul/backend.go | 13 ++++ consensus/istanbul/backend/backend.go | 67 +++++++++++----- consensus/istanbul/backend/backend_test.go | 19 ++--- consensus/istanbul/backend/engine.go | 23 ++---- consensus/istanbul/backend/engine_test.go | 4 +- consensus/istanbul/backend/handler.go | 17 +---- consensus/istanbul/backend/snapshot.go | 57 ++++++++++++-- consensus/istanbul/backend/snapshot_test.go | 53 +++++++++++++ consensus/istanbul/core/backlog.go | 17 ++++- consensus/istanbul/core/backlog_test.go | 59 ++++++++++++--- consensus/istanbul/core/core.go | 80 ++++++++++++++------ consensus/istanbul/core/final_committed.go | 33 +------- consensus/istanbul/core/handler.go | 40 +++++----- consensus/istanbul/core/message_set.go | 12 ++- consensus/istanbul/core/prepare.go | 3 +- consensus/istanbul/core/preprepare.go | 26 ++++++- consensus/istanbul/core/preprepare_test.go | 2 +- consensus/istanbul/core/request.go | 2 +- consensus/istanbul/core/request_test.go | 4 +- consensus/istanbul/core/roundchange.go | 46 +++++------ consensus/istanbul/core/roundchange_test.go | 7 +- consensus/istanbul/core/roundstate.go | 26 ++++++- consensus/istanbul/core/roundstate_test.go | 3 + consensus/istanbul/core/testbackend_test.go | 35 +++++++-- consensus/istanbul/core/types.go | 35 +-------- consensus/istanbul/core/types_test.go | 39 ---------- consensus/istanbul/events.go | 4 - consensus/istanbul/validator.go | 2 + consensus/istanbul/validator/default.go | 24 +++--- consensus/istanbul/validator/default_test.go | 4 +- consensus/istanbul/validator/validator.go | 10 +-- consensus/protocol.go | 6 +- core/blockchain.go | 5 ++ eth/backend.go | 9 +-- eth/handler.go | 6 +- miner/worker.go | 16 +++- params/config.go | 4 +- 38 files changed, 495 insertions(+), 321 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index ae4622de5..c0a6b6bb7 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -99,7 +99,7 @@ type Engine interface { // Handler should be implemented is the consensus needs to handle and send peer's message type Handler interface { // NewChainHead handles a new head block comes - NewChainHead(block *types.Block) error + NewChainHead() error // HandleMsg handles a message from peer HandleMsg(address common.Address, data p2p.Msg) (bool, error) @@ -121,7 +121,7 @@ type Istanbul interface { Engine // Start starts the engine - Start(chain ChainReader, inserter func(types.Blocks) (int, error)) error + Start(chain ChainReader, currentBlock func() *types.Block, hasBadBlock func(hash common.Hash) bool) error // Stop stops the engine Stop() error diff --git a/consensus/istanbul/backend.go b/consensus/istanbul/backend.go index 2f24a65b3..46039438e 100644 --- a/consensus/istanbul/backend.go +++ b/consensus/istanbul/backend.go @@ -17,6 +17,7 @@ package istanbul import ( + "math/big" "time" "github.com/ethereum/go-ethereum/common" @@ -57,4 +58,16 @@ type Backend interface { // LastProposal retrieves latest committed proposal and the address of proposer LastProposal() (Proposal, common.Address) + + // HasPropsal checks if the combination of the given hash and height matches any existing blocks + HasPropsal(hash common.Hash, number *big.Int) bool + + // GetProposer returns the proposer of the given block height + GetProposer(number uint64) common.Address + + // ParentValidators returns the validator set of the given proposal's parent block + ParentValidators(proposal Proposal) ValidatorSet + + // HasBadBlock returns whether the block with the hash is a bad block + HasBadProposal(hash common.Hash) bool } diff --git a/consensus/istanbul/backend/backend.go b/consensus/istanbul/backend/backend.go index b9345b211..2f763af92 100644 --- a/consensus/istanbul/backend/backend.go +++ b/consensus/istanbul/backend/backend.go @@ -18,6 +18,7 @@ package backend import ( "crypto/ecdsa" + "math/big" "sync" "time" @@ -35,6 +36,11 @@ import ( lru "github.com/hashicorp/golang-lru" ) +const ( + // fetcherID is the ID indicates the block is from Istanbul engine + fetcherID = "istanbul" +) + // New creates an Ethereum backend for Istanbul core engine. func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Database) consensus.Istanbul { // Allocate the snapshot caches and create the engine @@ -70,14 +76,15 @@ type backend struct { logger log.Logger db ethdb.Database chain consensus.ChainReader - inserter func(types.Blocks) (int, error) + currentBlock func() *types.Block + hasBadBlock func(hash common.Hash) bool // the channels for istanbul engine notifications commitCh chan *types.Block proposedBlockHash common.Hash sealMu sync.Mutex coreStarted bool - coreMu sync.Mutex + coreMu sync.RWMutex // Current list of candidates we are pushing candidates map[common.Address]bool @@ -180,16 +187,11 @@ func (sb *backend) Commit(proposal istanbul.Proposal, seals [][]byte) error { if sb.proposedBlockHash == block.Hash() { // feed block hash to Seal() and wait the Seal() result sb.commitCh <- block - // TODO: how do we check the block is inserted correctly? return nil } - // if I'm not a proposer, insert the block directly and broadcast NewCommittedEvent - if _, err := sb.inserter(types.Blocks{block}); err != nil && err != core.ErrKnownBlock { - return err - } if sb.broadcaster != nil { - go sb.broadcaster.BroadcastBlock(block, false) + sb.broadcaster.Enqueue(fetcherID, block) } return nil } @@ -208,6 +210,22 @@ func (sb *backend) Verify(proposal istanbul.Proposal) (time.Duration, error) { sb.logger.Error("Invalid proposal, %v", proposal) return 0, errInvalidProposal } + + // check bad block + if sb.HasBadProposal(block.Hash()) { + return 0, core.ErrBlacklistedHash + } + + // check block body + txnHash := types.DeriveSha(block.Transactions()) + uncleHash := types.CalcUncleHash(block.Uncles()) + if txnHash != block.Header().TxHash { + return 0, errMismatchTxhashes + } + if uncleHash != nilUncleHash { + return 0, errInvalidUncleHash + } + // verify the header of proposed block err := sb.VerifyHeader(sb.chain, block.Header(), false) // ignore errEmptyCommittedSeals error because we don't have the committed seals yet @@ -239,6 +257,11 @@ func (sb *backend) CheckSignature(data []byte, address common.Address, sig []byt return nil } +// HasPropsal implements istanbul.Backend.HashBlock +func (sb *backend) HasPropsal(hash common.Hash, number *big.Int) bool { + return sb.chain.GetHeader(hash, number.Uint64()) != nil +} + // GetProposer implements istanbul.Backend.GetProposer func (sb *backend) GetProposer(number uint64) common.Address { if h := sb.chain.GetHeaderByNumber(number); h != nil { @@ -248,6 +271,14 @@ func (sb *backend) GetProposer(number uint64) common.Address { return common.Address{} } +// ParentValidators implements istanbul.Backend.GetParentValidators +func (sb *backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet { + if block, ok := proposal.(*types.Block); ok { + return sb.getValidators(block.Number().Uint64()-1, block.ParentHash()) + } + return validator.NewSet(nil, sb.config.ProposerPolicy) +} + func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet { snap, err := sb.snapshot(sb.chain, number, hash, nil) if err != nil { @@ -257,17 +288,12 @@ func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.Valid } func (sb *backend) LastProposal() (istanbul.Proposal, common.Address) { - if sb.chain == nil { - sb.logger.Error("Failed to access blockchain") - return nil, common.Address{} - } - - h := sb.chain.CurrentHeader() + block := sb.currentBlock() var proposer common.Address - if h.Number.Cmp(common.Big0) > 0 { + if block.Number().Cmp(common.Big0) > 0 { var err error - proposer, err = sb.Author(h) + proposer, err = sb.Author(block.Header()) if err != nil { sb.logger.Error("Failed to get block proposer", "err", err) return nil, common.Address{} @@ -275,5 +301,12 @@ func (sb *backend) LastProposal() (istanbul.Proposal, common.Address) { } // Return header only block here since we don't need block body - return types.NewBlockWithHeader(h), proposer + return block, proposer +} + +func (sb *backend) HasBadProposal(hash common.Hash) bool { + if sb.hasBadBlock == nil { + return false + } + return sb.hasBadBlock(hash) } diff --git a/consensus/istanbul/backend/backend_test.go b/consensus/istanbul/backend/backend_test.go index 0699b72bb..5c9858209 100644 --- a/consensus/istanbul/backend/backend_test.go +++ b/consensus/istanbul/backend/backend_test.go @@ -29,11 +29,10 @@ import ( "github.com/ethereum/go-ethereum/consensus/istanbul/validator" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" ) func TestSign(t *testing.T) { - b, _, _ := newBackend() + b := newBackend() data := []byte("Here is a string....") sig, err := b.Sign(data) if err != nil { @@ -54,7 +53,7 @@ func TestCheckSignature(t *testing.T) { data := []byte("Here is a string....") hashData := crypto.Keccak256([]byte(data)) sig, _ := crypto.Sign(hashData, key) - b, _, _ := newBackend() + b := newBackend() a := getAddress() err := b.CheckSignature(data, a, sig) if err != nil { @@ -68,7 +67,7 @@ func TestCheckSignature(t *testing.T) { } func TestCheckValidatorSignature(t *testing.T) { - _, keys, vset := newBackend() + vset, keys := newTestValidatorSet(5) // 1. Positive test: sign with validator's key should succeed data := []byte("dummy data") @@ -113,7 +112,7 @@ func TestCheckValidatorSignature(t *testing.T) { } func TestCommit(t *testing.T) { - backend, _, _ := newBackend() + backend := newBackend() commitCh := make(chan *types.Block) // Case: it's a proposer, so the backend.commit will receive channel result from backend.Commit function @@ -235,13 +234,9 @@ func (slice Keys) Swap(i, j int) { slice[i], slice[j] = slice[j], slice[i] } -func newBackend() (b *backend, validatorKeys Keys, validatorSet istanbul.ValidatorSet) { +func newBackend() (b *backend) { + _, b = newBlockChain(4) key, _ := generatePrivateKey() - validatorSet, validatorKeys = newTestValidatorSet(5) - b = &backend{ - privateKey: key, - logger: log.New("backend", "simple"), - commitCh: make(chan *types.Block, 1), - } + b.privateKey = key return } diff --git a/consensus/istanbul/backend/engine.go b/consensus/istanbul/backend/engine.go index 0ace594e3..be047c57c 100644 --- a/consensus/istanbul/backend/engine.go +++ b/consensus/istanbul/backend/engine.go @@ -80,6 +80,8 @@ var ( errInvalidCommittedSeals = errors.New("invalid committed seals") // errEmptyCommittedSeals is returned if the field of committed seals is zero. errEmptyCommittedSeals = errors.New("zero committed seals") + // errMismatchTxhashes is returned if the TxHash in header is mismatch. + errMismatchTxhashes = errors.New("mismatch transcations hashes") ) var ( defaultDifficulty = big.NewInt(1) @@ -485,7 +487,7 @@ func (sb *backend) APIs(chain consensus.ChainReader) []rpc.API { } // Start implements consensus.Istanbul.Start -func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks) (int, error)) error { +func (sb *backend) Start(chain consensus.ChainReader, currentBlock func() *types.Block, hasBadBlock func(hash common.Hash) bool) error { sb.coreMu.Lock() defer sb.coreMu.Unlock() if sb.coreStarted { @@ -500,23 +502,10 @@ func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks sb.commitCh = make(chan *types.Block, 1) sb.chain = chain - sb.inserter = inserter + sb.currentBlock = currentBlock + sb.hasBadBlock = hasBadBlock - curHeader := chain.CurrentHeader() - lastSequence := new(big.Int).Set(curHeader.Number) - lastProposer := common.Address{} - // should get proposer if the block is not genesis - if lastSequence.Cmp(common.Big0) > 0 { - p, err := sb.Author(curHeader) - if err != nil { - return err - } - lastProposer = p - } - // We don't need block body so we create a header only block. - // The proposal is only for validator set calculation. - lastProposal := types.NewBlockWithHeader(curHeader) - if err := sb.core.Start(lastSequence, lastProposer, lastProposal); err != nil { + if err := sb.core.Start(); err != nil { return err } diff --git a/consensus/istanbul/backend/engine_test.go b/consensus/istanbul/backend/engine_test.go index 75a20c368..89a38cde6 100644 --- a/consensus/istanbul/backend/engine_test.go +++ b/consensus/istanbul/backend/engine_test.go @@ -51,7 +51,7 @@ func newBlockChain(n int) (*core.BlockChain, *backend) { if err != nil { panic(err) } - b.Start(blockchain, blockchain.InsertChain) + b.Start(blockchain, blockchain.CurrentBlock, blockchain.HasBadBlock) snap, err := b.snapshot(blockchain, 0, common.Hash{}, nil) if err != nil { panic(err) @@ -138,7 +138,7 @@ func makeBlock(chain *core.BlockChain, engine *backend, parent *types.Block) *ty func makeBlockWithoutSeal(chain *core.BlockChain, engine *backend, parent *types.Block) *types.Block { header := makeHeader(parent, engine.config) engine.Prepare(chain, header) - state, _, _ := chain.StateAt(parent.Root()) + state, _,_ := chain.StateAt(parent.Root()) block, _ := engine.Finalize(chain, header, state, nil, nil, nil) return block } diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index c5111c5ef..a33800945 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -22,7 +22,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/istanbul" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/p2p" lru "github.com/hashicorp/golang-lru" ) @@ -93,20 +92,12 @@ func (sb *backend) SetBroadcaster(broadcaster consensus.Broadcaster) { sb.broadcaster = broadcaster } -func (sb *backend) NewChainHead(block *types.Block) error { - sb.coreMu.Lock() - defer sb.coreMu.Unlock() +func (sb *backend) NewChainHead() error { + sb.coreMu.RLock() + defer sb.coreMu.RUnlock() if !sb.coreStarted { return istanbul.ErrStoppedEngine } - p, err := sb.Author(block.Header()) - if err != nil { - sb.logger.Error("Failed to get block proposer", "err", err) - return err - } - go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{ - Proposal: block, - Proposer: p, - }) + go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{}) return nil } diff --git a/consensus/istanbul/backend/snapshot.go b/consensus/istanbul/backend/snapshot.go index 08be3a251..4ab8d9425 100644 --- a/consensus/istanbul/backend/snapshot.go +++ b/consensus/istanbul/backend/snapshot.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" + "github.com/ethereum/go-ethereum/consensus/istanbul/validator" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" ) @@ -50,11 +51,11 @@ type Tally struct { type Snapshot struct { Epoch uint64 // The number of blocks after which to checkpoint and reset the pending votes - Number uint64 `json:"number"` // Block number where the snapshot was created - Hash common.Hash `json:"hash"` // Block hash where the snapshot was created - Votes []*Vote `json:"votes"` // List of votes cast in chronological order - Tally map[common.Address]Tally `json:"tally"` // Current vote tally to avoid recalculating - ValSet istanbul.ValidatorSet `json:"validators"` // Set of authorized validators at this moment + Number uint64 // Block number where the snapshot was created + Hash common.Hash // Block hash where the snapshot was created + Votes []*Vote // List of votes cast in chronological order + Tally map[common.Address]Tally // Current vote tally to avoid recalculating + ValSet istanbul.ValidatorSet // Set of authorized validators at this moment } // newSnapshot create a new snapshot with the specified startup parameters. This @@ -272,3 +273,49 @@ func (s *Snapshot) validators() []common.Address { } return validators } + +type snapshotJSON struct { + Epoch uint64 `json:"epoch"` + Number uint64 `json:"number"` + Hash common.Hash `json:"hash"` + Votes []*Vote `json:"votes"` + Tally map[common.Address]Tally `json:"tally"` + + // for validator set + Validators []common.Address `json:"validators"` + Policy istanbul.ProposerPolicy `json:"policy"` +} + +func (s *Snapshot) toJSONStruct() *snapshotJSON { + return &snapshotJSON{ + Epoch: s.Epoch, + Number: s.Number, + Hash: s.Hash, + Votes: s.Votes, + Tally: s.Tally, + Validators: s.validators(), + Policy: s.ValSet.Policy(), + } +} + +// Unmarshal from a json byte array +func (s *Snapshot) UnmarshalJSON(b []byte) error { + var j snapshotJSON + if err := json.Unmarshal(b, &j); err != nil { + return err + } + + s.Epoch = j.Epoch + s.Number = j.Number + s.Hash = j.Hash + s.Votes = j.Votes + s.Tally = j.Tally + s.ValSet = validator.NewSet(j.Validators, j.Policy) + return nil +} + +// Marshal to a json byte array +func (s *Snapshot) MarshalJSON() ([]byte, error) { + j := s.toJSONStruct() + return json.Marshal(j) +} diff --git a/consensus/istanbul/backend/snapshot_test.go b/consensus/istanbul/backend/snapshot_test.go index e00a05c97..829f333b2 100644 --- a/consensus/istanbul/backend/snapshot_test.go +++ b/consensus/istanbul/backend/snapshot_test.go @@ -20,10 +20,12 @@ import ( "bytes" "crypto/ecdsa" "math/big" + "reflect" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" + "github.com/ethereum/go-ethereum/consensus/istanbul/validator" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -400,3 +402,54 @@ func TestVoting(t *testing.T) { } } } + +func TestSaveAndLoad(t *testing.T) { + snap := &Snapshot{ + Epoch: 5, + Number: 10, + Hash: common.HexToHash("1234567890"), + Votes: []*Vote{ + { + Validator: common.StringToAddress("1234567891"), + Block: 15, + Address: common.StringToAddress("1234567892"), + Authorize: false, + }, + }, + Tally: map[common.Address]Tally{ + common.StringToAddress("1234567893"): Tally{ + Authorize: false, + Votes: 20, + }, + }, + ValSet: validator.NewSet([]common.Address{ + common.StringToAddress("1234567894"), + common.StringToAddress("1234567895"), + }, istanbul.RoundRobin), + } + db, _ := ethdb.NewMemDatabase() + err := snap.store(db) + if err != nil { + t.Errorf("store snapshot failed: %v", err) + } + + snap1, err := loadSnapshot(snap.Epoch, db, snap.Hash) + if err != nil { + t.Errorf("load snapshot failed: %v", err) + } + if snap.Epoch != snap1.Epoch { + t.Errorf("epoch mismatch: have %v, want %v", snap1.Epoch, snap.Epoch) + } + if snap.Hash != snap1.Hash { + t.Errorf("hash mismatch: have %v, want %v", snap1.Number, snap.Number) + } + if !reflect.DeepEqual(snap.Votes, snap.Votes) { + t.Errorf("votes mismatch: have %v, want %v", snap1.Votes, snap.Votes) + } + if !reflect.DeepEqual(snap.Tally, snap.Tally) { + t.Errorf("tally mismatch: have %v, want %v", snap1.Tally, snap.Tally) + } + if !reflect.DeepEqual(snap.ValSet, snap.ValSet) { + t.Errorf("validator set mismatch: have %v, want %v", snap1.ValSet, snap.ValSet) + } +} diff --git a/consensus/istanbul/core/backlog.go b/consensus/istanbul/core/backlog.go index a589a541b..73a9732b8 100644 --- a/consensus/istanbul/core/backlog.go +++ b/consensus/istanbul/core/backlog.go @@ -40,6 +40,15 @@ func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error { return errInvalidMessage } + if msgCode == msgRoundChange { + if view.Sequence.Cmp(c.currentView().Sequence) > 0 { + return errFutureMessage + } else if view.Cmp(c.currentView()) < 0 { + return errOldMessage + } + return nil + } + if view.Cmp(c.currentView()) > 0 { return errFutureMessage } @@ -90,7 +99,7 @@ func (c *core) storeBacklog(msg *message, src istanbul.Validator) { if err == nil { backlog.Push(msg, toPriority(msg.Code, p.View)) } - // for istanbul.MsgPrepare and istanbul.MsgCommit cases + // for msgRoundChange, msgPrepare and msgCommit cases default: var p *istanbul.Subject err := msg.Decode(&p) @@ -127,7 +136,7 @@ func (c *core) processBacklog() { if err == nil { view = m.View } - // for istanbul.MsgPrepare and istanbul.MsgCommit cases + // for msgRoundChange, msgPrepare and msgCommit cases default: var sub *istanbul.Subject err := msg.Decode(&sub) @@ -162,6 +171,10 @@ func (c *core) processBacklog() { } func toPriority(msgCode uint64, view *istanbul.View) float32 { + if msgCode == msgRoundChange { + // For msgRoundChange, set the message priority based on its sequence + return -float32(view.Sequence.Uint64() * 1000) + } // FIXME: round will be reset as 0 while new sequence // 10 * Round limits the range of message code is from 0 to 9 // 1000 * Sequence limits the range of round is from 0 to 99 diff --git a/consensus/istanbul/core/backlog_test.go b/consensus/istanbul/core/backlog_test.go index 6e2c77f9a..bdef2c698 100644 --- a/consensus/istanbul/core/backlog_test.go +++ b/consensus/istanbul/core/backlog_test.go @@ -37,7 +37,7 @@ func TestCheckMessage(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4), common.Hash{}, nil, nil), + }, newTestValidatorSet(4), common.Hash{}, nil, nil, nil), } // invalid view format @@ -47,7 +47,7 @@ func TestCheckMessage(t *testing.T) { } testStates := []State{StateAcceptRequest, StatePreprepared, StatePrepared, StateCommitted} - testCode := []uint64{msgPreprepare, msgPrepare, msgCommit} + testCode := []uint64{msgPreprepare, msgPrepare, msgCommit, msgRoundChange} // future sequence v := &istanbul.View{ @@ -73,7 +73,11 @@ func TestCheckMessage(t *testing.T) { c.state = testStates[i] for j := 0; j < len(testCode); j++ { err := c.checkMessage(testCode[j], v) - if err != errFutureMessage { + if testCode[j] == msgRoundChange { + if err != nil { + t.Errorf("error mismatch: have %v, want nil", err) + } + } else if err != errFutureMessage { t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage) } } @@ -89,7 +93,11 @@ func TestCheckMessage(t *testing.T) { c.state = testStates[i] for j := 0; j < len(testCode); j++ { err := c.checkMessage(testCode[j], v) - if err != errFutureMessage { + if testCode[j] == msgRoundChange { + if err != nil { + t.Errorf("error mismatch: have %v, want nil", err) + } + } else if err != errFutureMessage { t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage) } } @@ -101,7 +109,11 @@ func TestCheckMessage(t *testing.T) { c.state = StateAcceptRequest for i := 0; i < len(testCode); i++ { err = c.checkMessage(testCode[i], v) - if testCode[i] == msgPreprepare { + if testCode[i] == msgRoundChange { + if err != nil { + t.Errorf("error mismatch: have %v, want nil", err) + } + } else if testCode[i] == msgPreprepare { if err != nil { t.Errorf("error mismatch: have %v, want nil", err) } @@ -116,7 +128,11 @@ func TestCheckMessage(t *testing.T) { c.state = StatePreprepared for i := 0; i < len(testCode); i++ { err = c.checkMessage(testCode[i], v) - if err != nil { + if testCode[i] == msgRoundChange { + if err != nil { + t.Errorf("error mismatch: have %v, want nil", err) + } + } else if err != nil { t.Errorf("error mismatch: have %v, want nil", err) } } @@ -125,7 +141,11 @@ func TestCheckMessage(t *testing.T) { c.state = StatePrepared for i := 0; i < len(testCode); i++ { err = c.checkMessage(testCode[i], v) - if err != nil { + if testCode[i] == msgRoundChange { + if err != nil { + t.Errorf("error mismatch: have %v, want nil", err) + } + } else if err != nil { t.Errorf("error mismatch: have %v, want nil", err) } } @@ -134,7 +154,11 @@ func TestCheckMessage(t *testing.T) { c.state = StateCommitted for i := 0; i < len(testCode); i++ { err = c.checkMessage(testCode[i], v) - if err != nil { + if testCode[i] == msgRoundChange { + if err != nil { + t.Errorf("error mismatch: have %v, want nil", err) + } + } else if err != nil { t.Errorf("error mismatch: have %v, want nil", err) } } @@ -195,6 +219,17 @@ func TestStoreBacklog(t *testing.T) { if !reflect.DeepEqual(msg, m) { t.Errorf("message mismatch: have %v, want %v", msg, m) } + + // push roundChange msg + m = &message{ + Code: msgRoundChange, + Msg: subjectPayload, + } + c.storeBacklog(m, p) + msg = c.backlogs[p].PopItem() + if !reflect.DeepEqual(msg, m) { + t.Errorf("message mismatch: have %v, want %v", msg, m) + } } func TestProcessFutureBacklog(t *testing.T) { @@ -209,7 +244,7 @@ func TestProcessFutureBacklog(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4), common.Hash{}, nil, nil), + }, newTestValidatorSet(4), common.Hash{}, nil, nil, nil), state: StateAcceptRequest, } c.subscribeEvents() @@ -276,6 +311,10 @@ func TestProcessBacklog(t *testing.T) { Code: msgCommit, Msg: subjectPayload, }, + &message{ + Code: msgRoundChange, + Msg: subjectPayload, + }, } for i := 0; i < len(msgs); i++ { testProcessBacklog(t, msgs[i]) @@ -297,7 +336,7 @@ func testProcessBacklog(t *testing.T, msg *message) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4), common.Hash{}, nil, nil), + }, newTestValidatorSet(4), common.Hash{}, nil, nil, nil), } c.subscribeEvents() defer c.unsubscribeEvents() diff --git a/consensus/istanbul/core/core.go b/consensus/istanbul/core/core.go index 942de4b9f..5818da976 100644 --- a/consensus/istanbul/core/core.go +++ b/consensus/istanbul/core/core.go @@ -39,6 +39,7 @@ func New(backend istanbul.Backend, config *istanbul.Config) Engine { config: config, address: backend.Address(), state: StateAcceptRequest, + handlerWg: new(sync.WaitGroup), logger: log.New("address", backend.Address()), backend: backend, backlogs: make(map[istanbul.Validator]*prque.Prque), @@ -68,8 +69,6 @@ type core struct { timeoutSub *event.TypeMuxSubscription futurePreprepareTimer *time.Timer - lastProposer common.Address - lastProposal istanbul.Proposal valSet istanbul.ValidatorSet waitingForRoundChange bool validateFn func([]byte, []byte) (common.Address, error) @@ -77,7 +76,8 @@ type core struct { backlogs map[istanbul.Validator]*prque.Prque backlogsMu *sync.Mutex - current *roundState + current *roundState + handlerWg *sync.WaitGroup roundChangeSet *roundChangeSet roundChangeTimer *time.Timer @@ -179,35 +179,65 @@ func (c *core) commit() { } } -func (c *core) startNewRound(newView *istanbul.View, lastProposal istanbul.Proposal, lastProposer common.Address, roundChange bool) { +// startNewRound starts a new round. if round equals to 0, it means to starts a new sequence +func (c *core) startNewRound(round *big.Int) { var logger log.Logger if c.current == nil { - logger = c.logger.New("old_round", -1, "old_seq", 0, "old_proposer", c.valSet.GetProposer()) + logger = c.logger.New("old_round", -1, "old_seq", 0) } else { - logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence(), "old_proposer", c.valSet.GetProposer()) + logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence()) } + roundChange := false // Try to get last proposal - if lastProposal == nil { - lastProposal, lastProposer = c.backend.LastProposal() - if lastProposal.Number().Cmp(newView.Sequence) > 0 { - newView = &istanbul.View{ - Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1), - Round: new(big.Int), - } - c.lastProposal = lastProposal - c.lastProposer = lastProposer - logger.Trace("Catch up latest proposal", "number", lastProposal.Number().Uint64(), "hash", lastProposal.Hash()) + lastProposal, lastProposer := c.backend.LastProposal() + if c.current == nil { + logger.Trace("Start to the initial round") + } else if lastProposal.Number().Cmp(c.current.Sequence()) >= 0 { + diff := new(big.Int).Sub(lastProposal.Number(), c.current.Sequence()) + c.sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64()) + + if !c.consensusTimestamp.IsZero() { + c.consensusTimer.UpdateSince(c.consensusTimestamp) + c.consensusTimestamp = time.Time{} } + logger.Trace("Catch up latest proposal", "number", lastProposal.Number().Uint64(), "hash", lastProposal.Hash()) + } else if lastProposal.Number().Cmp(big.NewInt(c.current.Sequence().Int64()-1)) == 0 { + if round.Cmp(common.Big0) == 0 { + // same seq and round, don't need to start new round + return + } else if round.Cmp(c.current.Round()) < 0 { + logger.Warn("New round should not be smaller than current round", "seq", lastProposal.Number().Int64(), "new_round", round, "old_round", c.current.Round()) + return + } + roundChange = true + } else { + logger.Warn("New sequence should be larger than current sequence", "new_seq", lastProposal.Number().Int64()) + return } - c.valSet = c.backend.Validators(c.lastProposal) + var newView *istanbul.View + if roundChange { + newView = &istanbul.View{ + Sequence: new(big.Int).Set(c.current.Sequence()), + Round: new(big.Int).Set(round), + } + } else { + newView = &istanbul.View{ + Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1), + Round: new(big.Int), + } + c.valSet = c.backend.Validators(lastProposal) + } + + // Update logger + logger = logger.New("old_proposer", c.valSet.GetProposer()) // Clear invalid ROUND CHANGE messages c.roundChangeSet = newRoundChangeSet(c.valSet) // New snapshot for new round c.updateRoundState(newView, c.valSet, roundChange) // Calculate new proposer - c.valSet.CalcProposer(c.lastProposer, newView.Round.Uint64()) + c.valSet.CalcProposer(lastProposer, newView.Round.Uint64()) c.waitingForRoundChange = false c.setState(StateAcceptRequest) if roundChange && c.isProposer() && c.current != nil { @@ -248,12 +278,12 @@ func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.Valid // Lock only if both roundChange is true and it is locked if roundChange && c.current != nil { if c.current.IsHashLocked() { - c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest) + c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest, c.backend.HasBadProposal) } else { - c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest) + c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest, c.backend.HasBadProposal) } } else { - c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil) + c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil, c.backend.HasBadProposal) } } @@ -288,8 +318,12 @@ func (c *core) newRoundChangeTimer() { c.stopTimer() // set timeout based on the round number - t := uint64(math.Pow(2, float64(c.current.Round().Uint64()))) * c.config.RequestTimeout - timeout := time.Duration(t) * time.Millisecond + timeout := time.Duration(c.config.RequestTimeout) * time.Millisecond + round := c.current.Round().Uint64() + if round > 0 { + timeout += time.Duration(math.Pow(2, float64(round))) * time.Second + } + c.roundChangeTimer = time.AfterFunc(timeout, func() { c.sendEvent(timeoutEvent{}) }) diff --git a/consensus/istanbul/core/final_committed.go b/consensus/istanbul/core/final_committed.go index 05ed3b077..35e84d4f1 100644 --- a/consensus/istanbul/core/final_committed.go +++ b/consensus/istanbul/core/final_committed.go @@ -16,36 +16,11 @@ package core -import ( - "math/big" - "time" +import "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus/istanbul" -) - -func (c *core) handleFinalCommitted(proposal istanbul.Proposal, proposer common.Address) error { - logger := c.logger.New("state", c.state, "number", proposal.Number(), "hash", proposal.Hash()) +func (c *core) handleFinalCommitted() error { + logger := c.logger.New("state", c.state) logger.Trace("Received a final committed proposal") - - // Catch up the sequence number - if proposal.Number().Cmp(c.current.Sequence()) >= 0 { - // Remember to store the proposer since we've accpetted the proposal - diff := new(big.Int).Sub(proposal.Number(), c.current.Sequence()) - c.sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64()) - - if !c.consensusTimestamp.IsZero() { - c.consensusTimer.UpdateSince(c.consensusTimestamp) - c.consensusTimestamp = time.Time{} - } - - c.lastProposer = proposer - c.lastProposal = proposal - c.startNewRound(&istanbul.View{ - Sequence: new(big.Int).Add(proposal.Number(), common.Big1), - Round: new(big.Int).Set(common.Big0), - }, proposal, proposer, false) - } - + c.startNewRound(common.Big0) return nil } diff --git a/consensus/istanbul/core/handler.go b/consensus/istanbul/core/handler.go index b9556b743..9b76a3506 100644 --- a/consensus/istanbul/core/handler.go +++ b/consensus/istanbul/core/handler.go @@ -17,24 +17,14 @@ package core import ( - "math/big" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" ) // Start implements core.Engine.Start -func (c *core) Start(lastSequence *big.Int, lastProposer common.Address, lastProposal istanbul.Proposal) error { - // Initialize last proposer - c.lastProposer = lastProposer - c.lastProposal = lastProposal - c.valSet = c.backend.Validators(c.lastProposal) - +func (c *core) Start() error { // Start a new round from last sequence + 1 - c.startNewRound(&istanbul.View{ - Sequence: new(big.Int).Add(lastSequence, common.Big1), - Round: common.Big0, - }, lastProposal, lastProposer, false) + c.startNewRound(common.Big0) // Tests will handle events itself, so we have to make subscribeEvents() // be able to call in test. @@ -48,6 +38,9 @@ func (c *core) Start(lastSequence *big.Int, lastProposer common.Address, lastPro func (c *core) Stop() error { c.stopTimer() c.unsubscribeEvents() + + // Make sure the handler goroutine exits + c.handlerWg.Wait() return nil } @@ -78,6 +71,14 @@ func (c *core) unsubscribeEvents() { } func (c *core) handleEvents() { + // Clear state + defer func() { + c.current = nil + c.handlerWg.Done() + }() + + c.handlerWg.Add(1) + for { select { case event, ok := <-c.events.Chan(): @@ -118,9 +119,9 @@ func (c *core) handleEvents() { if !ok { return } - switch ev := event.Data.(type) { + switch event.Data.(type) { case istanbul.FinalCommittedEvent: - c.handleFinalCommitted(ev.Proposal, ev.Proposer) + c.handleFinalCommitted() } } } @@ -171,7 +172,7 @@ func (c *core) handleCheckedMsg(msg *message, src istanbul.Validator) error { case msgCommit: return testBacklog(c.handleCommit(msg, src)) case msgRoundChange: - return c.handleRoundChange(msg, src) + return testBacklog(c.handleRoundChange(msg, src)) default: logger.Error("Invalid message", "msg", msg) } @@ -191,13 +192,10 @@ func (c *core) handleTimeoutMsg() { } } - lastProposal, lastProposer := c.backend.LastProposal() - if lastProposal != nil && lastProposal.Number().Cmp(c.current.Sequence()) > 0 { + lastProposal, _ := c.backend.LastProposal() + if lastProposal != nil && lastProposal.Number().Cmp(c.current.Sequence()) >= 0 { c.logger.Trace("round change timeout, catch up latest sequence", "number", lastProposal.Number().Uint64()) - c.startNewRound(&istanbul.View{ - Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1), - Round: new(big.Int), - }, lastProposal, lastProposer, false) + c.startNewRound(common.Big0) } else { c.sendNextRoundChange() } diff --git a/consensus/istanbul/core/message_set.go b/consensus/istanbul/core/message_set.go index 45c8091bd..82b1c0677 100644 --- a/consensus/istanbul/core/message_set.go +++ b/consensus/istanbul/core/message_set.go @@ -34,7 +34,7 @@ func newMessageSet(valSet istanbul.ValidatorSet) *messageSet { Sequence: new(big.Int), }, messagesMu: new(sync.Mutex), - messages: make(map[common.Hash]*message), + messages: make(map[common.Address]*message), valSet: valSet, } } @@ -45,7 +45,7 @@ type messageSet struct { view *istanbul.View valSet istanbul.ValidatorSet messagesMu *sync.Mutex - messages map[common.Hash]*message + messages map[common.Address]*message } func (ms *messageSet) View() *istanbul.View { @@ -80,6 +80,12 @@ func (ms *messageSet) Size() int { return len(ms.messages) } +func (ms *messageSet) Get(addr common.Address) *message { + ms.messagesMu.Lock() + defer ms.messagesMu.Unlock() + return ms.messages[addr] +} + // ---------------------------------------------------------------------------- func (ms *messageSet) verify(msg *message) error { @@ -94,7 +100,7 @@ func (ms *messageSet) verify(msg *message) error { } func (ms *messageSet) addVerifiedMessage(msg *message) error { - ms.messages[istanbul.RLPHash(msg)] = msg + ms.messages[msg.Address] = msg return nil } diff --git a/consensus/istanbul/core/prepare.go b/consensus/istanbul/core/prepare.go index ed1fc2815..f4ea25ae1 100644 --- a/consensus/istanbul/core/prepare.go +++ b/consensus/istanbul/core/prepare.go @@ -59,7 +59,8 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error { // Change to Prepared state if we've received enough PREPARE messages or it is locked // and we are in earlier state before Prepared state. - if (c.current.IsHashLocked() || c.current.Prepares.Size() > 2*c.valSet.F()) && c.state.Cmp(StatePrepared) < 0 { + if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.GetPrepareOrCommitSize() > 2*c.valSet.F()) && + c.state.Cmp(StatePrepared) < 0 { c.current.LockHash() c.setState(StatePrepared) c.sendCommit() diff --git a/consensus/istanbul/core/preprepare.go b/consensus/istanbul/core/preprepare.go index a79bfc21f..a9e594967 100644 --- a/consensus/istanbul/core/preprepare.go +++ b/consensus/istanbul/core/preprepare.go @@ -56,7 +56,21 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error { } // Ensure we have the same view with the PRE-PREPARE message + // If it is old message, see if we need to broadcast COMMIT if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil { + if err == errOldMessage { + // Get validator set for the given proposal + valSet := c.backend.ParentValidators(preprepare.Proposal).Copy() + previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1) + valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64()) + // Broadcast COMMIT if it is an existing block + // 1. The proposer needs to be a proposer matches the given (Sequence + Round) + // 2. The given block must exist + if valSet.IsProposer(src.Address()) && c.backend.HasPropsal(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) { + c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal.Hash()) + return nil + } + } return err } @@ -87,8 +101,16 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error { // Here is about to accept the PRE-PREPARE if c.state == StateAcceptRequest { // Send ROUND CHANGE if the locked proposal and the received proposal are different - if c.current.IsHashLocked() && preprepare.Proposal.Hash() != c.current.GetLockedHash() { - c.sendNextRoundChange() + if c.current.IsHashLocked() { + if preprepare.Proposal.Hash() == c.current.GetLockedHash() { + // Broadcast COMMIT and enters Prepared state directly + c.acceptPreprepare(preprepare) + c.setState(StatePrepared) + c.sendCommit() + } else { + // Send round change + c.sendNextRoundChange() + } } else { // Either // 1. the locked proposal and the received proposal match diff --git a/consensus/istanbul/core/preprepare_test.go b/consensus/istanbul/core/preprepare_test.go index 5ac603991..1097cfeba 100644 --- a/consensus/istanbul/core/preprepare_test.go +++ b/consensus/istanbul/core/preprepare_test.go @@ -273,7 +273,7 @@ func TestHandlePreprepareWithLock(t *testing.T) { t.Errorf("error mismatch: have %v, want nil", err) } if test.proposal == test.lockProposal { - if c.state != StatePreprepared { + if c.state != StatePrepared { t.Errorf("state mismatch: have %v, want %v", c.state, StatePreprepared) } if !reflect.DeepEqual(curView, c.currentView()) { diff --git a/consensus/istanbul/core/request.go b/consensus/istanbul/core/request.go index eaadc1d4f..426803086 100644 --- a/consensus/istanbul/core/request.go +++ b/consensus/istanbul/core/request.go @@ -60,7 +60,7 @@ func (c *core) checkRequestMsg(request *istanbul.Request) error { func (c *core) storeRequestMsg(request *istanbul.Request) { logger := c.logger.New("state", c.state) - logger.Trace("Store future request", "request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash()) + logger.Trace("Store future request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash()) c.pendingRequestsMu.Lock() defer c.pendingRequestsMu.Unlock() diff --git a/consensus/istanbul/core/request_test.go b/consensus/istanbul/core/request_test.go index cbb585b95..5ee179c1c 100644 --- a/consensus/istanbul/core/request_test.go +++ b/consensus/istanbul/core/request_test.go @@ -36,7 +36,7 @@ func TestCheckRequestMsg(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4), common.Hash{}, nil, nil), + }, newTestValidatorSet(4), common.Hash{}, nil, nil, nil), } // invalid request @@ -91,7 +91,7 @@ func TestStoreRequestMsg(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(0), Round: big.NewInt(0), - }, newTestValidatorSet(4), common.Hash{}, nil, nil), + }, newTestValidatorSet(4), common.Hash{}, nil, nil, nil), pendingRequests: prque.New(), pendingRequestsMu: new(sync.Mutex), } diff --git a/consensus/istanbul/core/roundchange.go b/consensus/istanbul/core/roundchange.go index a434281f3..89ffc41be 100644 --- a/consensus/istanbul/core/roundchange.go +++ b/consensus/istanbul/core/roundchange.go @@ -48,10 +48,9 @@ func (c *core) sendRoundChange(round *big.Int) { // Now we have the new round number and sequence number cv = c.currentView() - rc := &roundChange{ - Round: new(big.Int).Set(cv.Round), - Sequence: new(big.Int).Set(cv.Sequence), - Digest: common.Hash{}, + rc := &istanbul.Subject{ + View: cv, + Digest: common.Hash{}, } payload, err := Encode(rc) @@ -70,29 +69,22 @@ func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error { logger := c.logger.New("state", c.state, "from", src.Address().Hex()) // Decode ROUND CHANGE message - var rc *roundChange + var rc *istanbul.Subject if err := msg.Decode(&rc); err != nil { logger.Error("Failed to decode ROUND CHANGE", "err", err) return errInvalidMessage } + if err := c.checkMessage(msgRoundChange, rc.View); err != nil { + return err + } + cv := c.currentView() - - // We never accept ROUND CHANGE message with different sequence number - if rc.Sequence.Cmp(cv.Sequence) != 0 { - logger.Warn("Inconsistent sequence number", "expected", cv.Sequence, "got", rc.Sequence) - return errInvalidMessage - } - - // We never accept ROUND CHANGE message with smaller round number - if rc.Round.Cmp(cv.Round) < 0 { - logger.Warn("Old round change", "from", src, "expected", cv.Round, "got", rc.Round) - return errOldMessage - } + roundView := rc.View // Add the ROUND CHANGE message to its message set and return how many // messages we've got with the same round number and sequence number. - num, err := c.roundChangeSet.Add(rc.Round, msg) + num, err := c.roundChangeSet.Add(roundView.Round, msg) if err != nil { logger.Warn("Failed to add round change message", "from", src, "msg", msg, "err", err) return err @@ -102,21 +94,17 @@ func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error { // If our round number is smaller than the certificate's round number, we would // try to catch up the round number. if c.waitingForRoundChange && num == int(c.valSet.F()+1) { - if cv.Round.Cmp(rc.Round) < 0 { - c.sendRoundChange(rc.Round) + if cv.Round.Cmp(roundView.Round) < 0 { + c.sendRoundChange(roundView.Round) } return nil - } else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(rc.Round) < 0) { + } else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(roundView.Round) < 0) { // We've received 2f+1 ROUND CHANGE messages, start a new round immediately. - c.startNewRound(&istanbul.View{ - Round: new(big.Int).Set(rc.Round), - Sequence: new(big.Int).Set(rc.Sequence), - }, nil, common.Address{}, true) + c.startNewRound(roundView.Round) return nil - } else if cv.Round.Cmp(rc.Round) < 0 { - // We consider the message with larger round as future messages and not - // gossip it to other validators. - return errFutureMessage + } else if cv.Round.Cmp(roundView.Round) < 0 { + // Only gossip the message with current round to other validators. + return errIgnored } return nil } diff --git a/consensus/istanbul/core/roundchange_test.go b/consensus/istanbul/core/roundchange_test.go index 3d249f734..835219ae8 100644 --- a/consensus/istanbul/core/roundchange_test.go +++ b/consensus/istanbul/core/roundchange_test.go @@ -33,10 +33,9 @@ func TestRoundChangeSet(t *testing.T) { Sequence: big.NewInt(1), Round: big.NewInt(1), } - r := &roundChange{ - Round: view.Round, - Sequence: view.Sequence, - Digest: common.Hash{}, + r := &istanbul.Subject{ + View: view, + Digest: common.Hash{}, } m, _ := Encode(r) diff --git a/consensus/istanbul/core/roundstate.go b/consensus/istanbul/core/roundstate.go index 18f8ace22..8f011bfea 100644 --- a/consensus/istanbul/core/roundstate.go +++ b/consensus/istanbul/core/roundstate.go @@ -29,7 +29,7 @@ import ( // newRoundState creates a new roundState instance with the given view and validatorSet // lockedHash and preprepare are for round change when lock exists, // we need to keep a reference of preprepare in order to propose locked proposal when there is a lock and itself is the proposer -func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lockedHash common.Hash, preprepare *istanbul.Preprepare, pendingRequest *istanbul.Request) *roundState { +func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lockedHash common.Hash, preprepare *istanbul.Preprepare, pendingRequest *istanbul.Request, hasBadProposal func(hash common.Hash) bool) *roundState { return &roundState{ round: view.Round, sequence: view.Sequence, @@ -39,6 +39,7 @@ func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lock lockedHash: lockedHash, mu: new(sync.RWMutex), pendingRequest: pendingRequest, + hasBadProposal: hasBadProposal, } } @@ -52,7 +53,23 @@ type roundState struct { lockedHash common.Hash pendingRequest *istanbul.Request - mu *sync.RWMutex + mu *sync.RWMutex + hasBadProposal func(hash common.Hash) bool +} + +func (s *roundState) GetPrepareOrCommitSize() int { + s.mu.RLock() + defer s.mu.RUnlock() + + result := s.Prepares.Size() + s.Commits.Size() + + // find duplicate one + for _, m := range s.Prepares.Values() { + if s.Commits.Get(m.Address) != nil { + result-- + } + } + return result } func (s *roundState) Subject() *istanbul.Subject { @@ -138,7 +155,10 @@ func (s *roundState) IsHashLocked() bool { s.mu.RLock() defer s.mu.RUnlock() - return s.lockedHash != common.Hash{} + if common.EmptyHash(s.lockedHash) { + return false + } + return !s.hasBadProposal(s.GetLockedHash()) } func (s *roundState) GetLockedHash() common.Hash { diff --git a/consensus/istanbul/core/roundstate_test.go b/consensus/istanbul/core/roundstate_test.go index cb8239ac2..7cf1979c7 100644 --- a/consensus/istanbul/core/roundstate_test.go +++ b/consensus/istanbul/core/roundstate_test.go @@ -33,6 +33,9 @@ func newTestRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet) Prepares: newMessageSet(validatorSet), Commits: newMessageSet(validatorSet), mu: new(sync.RWMutex), + hasBadProposal: func(hash common.Hash) bool { + return false + }, } } diff --git a/consensus/istanbul/core/testbackend_test.go b/consensus/istanbul/core/testbackend_test.go index 69c6e46d2..0d6ad3574 100644 --- a/consensus/istanbul/core/testbackend_test.go +++ b/consensus/istanbul/core/testbackend_test.go @@ -100,9 +100,7 @@ func (self *testSystemBackend) Commit(proposal istanbul.Proposal, seals [][]byte }) // fake new head events - go self.events.Post(istanbul.FinalCommittedEvent{ - Proposal: proposal, - }) + go self.events.Post(istanbul.FinalCommittedEvent{}) return nil } @@ -133,8 +131,29 @@ func (self *testSystemBackend) NewRequest(request istanbul.Proposal) { }) } +func (self *testSystemBackend) HasBadProposal(hash common.Hash) bool { + return false +} + func (self *testSystemBackend) LastProposal() (istanbul.Proposal, common.Address) { - return makeBlock(1), common.Address{} + l := len(self.committedMsgs) + if l > 0 { + return self.committedMsgs[l-1].commitProposal, common.Address{} + } + return makeBlock(0), common.Address{} +} + +// Only block height 5 will return true +func (self *testSystemBackend) HasPropsal(hash common.Hash, number *big.Int) bool { + return number.Cmp(big.NewInt(5)) == 0 +} + +func (self *testSystemBackend) GetProposer(number uint64) common.Address { + return common.Address{} +} + +func (self *testSystemBackend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet { + return self.peers } // ============================================== @@ -187,11 +206,13 @@ func NewTestSystemWithBackend(n, f uint64) *testSystem { core := New(backend, config).(*core) core.state = StateAcceptRequest - core.lastProposer = common.Address{} core.current = newRoundState(&istanbul.View{ Round: big.NewInt(0), Sequence: big.NewInt(1), - }, vset, common.Hash{}, nil, nil) + }, vset, common.Hash{}, nil, nil, func(hash common.Hash) bool { + return false + }) + core.valSet = vset core.logger = testLogger core.validateFn = backend.CheckValidatorSignature @@ -223,7 +244,7 @@ func (t *testSystem) listen() { func (t *testSystem) Run(core bool) func() { for _, b := range t.backends { if core { - b.engine.Start(common.Big0, common.Address{}, nil) // start Istanbul core + b.engine.Start() // start Istanbul core } } diff --git a/consensus/istanbul/core/types.go b/consensus/istanbul/core/types.go index ebfba7614..74e1b2263 100644 --- a/consensus/istanbul/core/types.go +++ b/consensus/istanbul/core/types.go @@ -19,15 +19,13 @@ package core import ( "fmt" "io" - "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/rlp" ) type Engine interface { - Start(lastSequence *big.Int, lastProposer common.Address, lastProposal istanbul.Proposal) error + Start() error Stop() error } @@ -164,34 +162,3 @@ func (m *message) String() string { func Encode(val interface{}) ([]byte, error) { return rlp.EncodeToBytes(val) } - -// ---------------------------------------------------------------------------- - -type roundChange struct { - Round *big.Int - Sequence *big.Int - Digest common.Hash -} - -// EncodeRLP serializes rc into the Ethereum RLP format. -func (rc *roundChange) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{ - rc.Round, - rc.Sequence, - rc.Digest, - }) -} - -// DecodeRLP implements rlp.Decoder, and load the consensus fields from a RLP stream. -func (rc *roundChange) DecodeRLP(s *rlp.Stream) error { - var rawRoundChange struct { - Round *big.Int - Sequence *big.Int - Digest common.Hash - } - if err := s.Decode(&rawRoundChange); err != nil { - return err - } - rc.Round, rc.Sequence, rc.Digest = rawRoundChange.Round, rawRoundChange.Sequence, rawRoundChange.Digest - return nil -} diff --git a/consensus/istanbul/core/types_test.go b/consensus/istanbul/core/types_test.go index 5faea3dea..a280fba54 100644 --- a/consensus/istanbul/core/types_test.go +++ b/consensus/istanbul/core/types_test.go @@ -172,47 +172,8 @@ func testSubjectWithSignature(t *testing.T) { } } -func testRoundChange(t *testing.T) { - rc := &roundChange{ - Round: big.NewInt(1), - Sequence: big.NewInt(2), - Digest: common.StringToHash("1234567890"), - } - RoundChangePayload, _ := Encode(rc) - - m := &message{ - Code: msgRoundChange, - Msg: RoundChangePayload, - Address: common.HexToAddress("0x1234567890"), - } - - msgPayload, err := m.Payload() - if err != nil { - t.Errorf("error mismatch: have %v, want nil", err) - } - - decodedMsg := new(message) - err = decodedMsg.FromPayload(msgPayload, nil) - if err != nil { - t.Errorf("error mismatch: have %v, want nil", err) - } - - var decodedRC *roundChange - err = decodedMsg.Decode(&decodedRC) - if err != nil { - t.Errorf("error mismatch: have %v, want nil", err) - } - - // if block is encoded/decoded by rlp, we cannot to compare interface data type using reflect.DeepEqual. (like istanbul.Proposal) - // so individual comparison here. - if !reflect.DeepEqual(rc, decodedRC) { - t.Errorf("round change mismatch: have %v, want %v", decodedRC, rc) - } -} - func TestMessageEncodeDecode(t *testing.T) { testPreprepare(t) testSubject(t) testSubjectWithSignature(t) - testRoundChange(t) } diff --git a/consensus/istanbul/events.go b/consensus/istanbul/events.go index e287b8fe8..fb6e5bd9c 100644 --- a/consensus/istanbul/events.go +++ b/consensus/istanbul/events.go @@ -16,8 +16,6 @@ package istanbul -import "github.com/ethereum/go-ethereum/common" - // RequestEvent is posted to propose a proposal type RequestEvent struct { Proposal Proposal @@ -30,6 +28,4 @@ type MessageEvent struct { // FinalCommittedEvent is posted when a proposal is committed type FinalCommittedEvent struct { - Proposal Proposal - Proposer common.Address } diff --git a/consensus/istanbul/validator.go b/consensus/istanbul/validator.go index a96487cf9..e0d142866 100644 --- a/consensus/istanbul/validator.go +++ b/consensus/istanbul/validator.go @@ -71,6 +71,8 @@ type ValidatorSet interface { Copy() ValidatorSet // Get the maximum number of faulty nodes F() int + // Get proposer policy + Policy() ProposerPolicy } // ---------------------------------------------------------------------------- diff --git a/consensus/istanbul/validator/default.go b/consensus/istanbul/validator/default.go index fd0fba404..17edda552 100644 --- a/consensus/istanbul/validator/default.go +++ b/consensus/istanbul/validator/default.go @@ -41,16 +41,18 @@ func (val *defaultValidator) String() string { // ---------------------------------------------------------------------------- type defaultSet struct { - validators istanbul.Validators + validators istanbul.Validators + policy istanbul.ProposerPolicy + proposer istanbul.Validator validatorMu sync.RWMutex - - selector istanbul.ProposalSelector + selector istanbul.ProposalSelector } -func newDefaultSet(addrs []common.Address, selector istanbul.ProposalSelector) *defaultSet { +func newDefaultSet(addrs []common.Address, policy istanbul.ProposerPolicy) *defaultSet { valSet := &defaultSet{} + valSet.policy = policy // init validators valSet.validators = make([]istanbul.Validator, len(addrs)) for i, addr := range addrs { @@ -62,8 +64,10 @@ func newDefaultSet(addrs []common.Address, selector istanbul.ProposalSelector) * if valSet.Size() > 0 { valSet.proposer = valSet.GetByIndex(0) } - //set proposal selector - valSet.selector = selector + valSet.selector = roundRobinProposer + if policy == istanbul.Sticky { + valSet.selector = stickyProposer + } return valSet } @@ -182,14 +186,16 @@ func (valSet *defaultSet) RemoveValidator(address common.Address) bool { } func (valSet *defaultSet) Copy() istanbul.ValidatorSet { - valSet.validatorMu.Lock() - defer valSet.validatorMu.Unlock() + valSet.validatorMu.RLock() + defer valSet.validatorMu.RUnlock() addresses := make([]common.Address, 0, len(valSet.validators)) for _, v := range valSet.validators { addresses = append(addresses, v.Address()) } - return newDefaultSet(addresses, valSet.selector) + return NewSet(addresses, valSet.policy) } func (valSet *defaultSet) F() int { return int(math.Ceil(float64(valSet.Size())/3)) - 1 } + +func (valSet *defaultSet) Policy() istanbul.ProposerPolicy { return valSet.policy } diff --git a/consensus/istanbul/validator/default_test.go b/consensus/istanbul/validator/default_test.go index dd8d63820..987ed12c8 100644 --- a/consensus/istanbul/validator/default_test.go +++ b/consensus/istanbul/validator/default_test.go @@ -78,7 +78,7 @@ func testNormalValSet(t *testing.T) { val1 := New(addr1) val2 := New(addr2) - valSet := newDefaultSet([]common.Address{addr1, addr2}, roundRobinProposer) + valSet := newDefaultSet([]common.Address{addr1, addr2}, istanbul.RoundRobin) if valSet == nil { t.Errorf("the format of validator set is invalid") t.FailNow() @@ -182,7 +182,7 @@ func testStickyProposer(t *testing.T) { val1 := New(addr1) val2 := New(addr2) - valSet := newDefaultSet([]common.Address{addr1, addr2}, stickyProposer) + valSet := newDefaultSet([]common.Address{addr1, addr2}, istanbul.Sticky) // test get proposer if val := valSet.GetProposer(); !reflect.DeepEqual(val, val1) { diff --git a/consensus/istanbul/validator/validator.go b/consensus/istanbul/validator/validator.go index c4bdad61c..9a1e15c2d 100644 --- a/consensus/istanbul/validator/validator.go +++ b/consensus/istanbul/validator/validator.go @@ -28,15 +28,7 @@ func New(addr common.Address) istanbul.Validator { } func NewSet(addrs []common.Address, policy istanbul.ProposerPolicy) istanbul.ValidatorSet { - switch policy { - case istanbul.RoundRobin: - return newDefaultSet(addrs, roundRobinProposer) - case istanbul.Sticky: - return newDefaultSet(addrs, stickyProposer) - } - - // use round-robin policy as default proposal policy - return newDefaultSet(addrs, roundRobinProposer) + return newDefaultSet(addrs, policy) } func ExtractValidators(extraData []byte) []common.Address { diff --git a/consensus/protocol.go b/consensus/protocol.go index aad2d22e9..64181da69 100644 --- a/consensus/protocol.go +++ b/consensus/protocol.go @@ -46,10 +46,10 @@ type Protocol struct { Lengths []uint64 } -// Broadcaster defines the interface to broadcast blocks and find peer +// Broadcaster defines the interface to enqueue blocks to fetcher and find peer type Broadcaster interface { - // BroadcastBlock broadcasts blocks to peers - BroadcastBlock(block *types.Block, propagate bool) + // Enqueue add a block into fetcher queue + Enqueue(id string, block *types.Block) // FindPeers retrives peers by addresses FindPeers(map[common.Address]bool) map[common.Address]Peer } diff --git a/core/blockchain.go b/core/blockchain.go index 464d04481..51447f2c1 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1272,6 +1272,11 @@ func (bc *BlockChain) BadBlocks() ([]BadBlockArgs, error) { return headers, nil } +// HasBadBlock returns whether the block with the hash is a bad block +func (bc *BlockChain) HasBadBlock(hash common.Hash) bool { + return bc.badBlocks.Contains(hash) +} + // addBadBlock adds a bad block to the bad-block LRU cache func (bc *BlockChain) addBadBlock(block *types.Block) { bc.badBlocks.Add(block.Header().Hash(), block.Header()) diff --git a/eth/backend.go b/eth/backend.go index 68ab88c6c..7fb48c9d0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -356,8 +356,6 @@ func (s *Ethereum) StartMining(local bool) error { return fmt.Errorf("signer missing: %v", err) } clique.Authorize(eb, wallet.SignHash) - } else if istanbul, ok := s.engine.(consensus.Istanbul); ok { - istanbul.Start(s.blockchain, s.blockchain.InsertChain) } if local { // If local (CPU) mining is started, we can disable the transaction rejection @@ -370,12 +368,7 @@ func (s *Ethereum) StartMining(local bool) error { return nil } -func (s *Ethereum) StopMining() { - s.miner.Stop() - if istanbul, ok := s.engine.(consensus.Istanbul); ok { - istanbul.Stop() - } -} +func (s *Ethereum) StopMining() { s.miner.Stop() } func (s *Ethereum) IsMining() bool { return s.miner.Mining() } func (s *Ethereum) Miner() *miner.Miner { return s.miner } diff --git a/eth/handler.go b/eth/handler.go index c8738ada8..4be5a5b87 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -125,7 +125,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne handler.SetBroadcaster(manager) } -// Figure out whether to allow fast sync or not + // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { log.Warn("Blockchain not empty, fast sync disabled") mode = downloader.FullSync @@ -716,6 +716,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return nil } +func (pm *ProtocolManager) Enqueue(id string, block *types.Block) { + pm.fetcher.Enqueue(id, block) +} + // BroadcastBlock will either propagate a block to a subset of it's peers, or // will only announce it's availability (depending what's requested). func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { diff --git a/miner/worker.go b/miner/worker.go index e644a6aef..f433dfde5 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -212,6 +212,9 @@ func (self *worker) start() { defer self.mu.Unlock() atomic.StoreInt32(&self.mining, 1) + if istanbul, ok := self.engine.(consensus.Istanbul); ok { + istanbul.Start(self.chain, self.chain.CurrentBlock, self.chain.HasBadBlock) + } // spin up agents for agent := range self.agents { @@ -229,6 +232,11 @@ func (self *worker) stop() { agent.Stop() } } + + if istanbul, ok := self.engine.(consensus.Istanbul); ok { + istanbul.Stop() + } + atomic.StoreInt32(&self.mining, 0) atomic.StoreInt32(&self.atWork, 0) } @@ -256,11 +264,11 @@ func (self *worker) update() { // A real event arrived, process interesting content select { // Handle ChainHeadEvent - case ev := <-self.chainHeadCh: - self.commitNewWork() - if h, ok := self.engine.(consensus.Handler); ok && ev.Block != nil { - h.NewChainHead(ev.Block) + case <-self.chainHeadCh: + if h, ok := self.engine.(consensus.Handler); ok { + h.NewChainHead() } + self.commitNewWork() // Handle ChainSideEvent case ev := <-self.chainSideCh: diff --git a/params/config.go b/params/config.go index 7c2e1d5a0..8a0858c56 100644 --- a/params/config.go +++ b/params/config.go @@ -184,8 +184,8 @@ func (c *ChainConfig) String() string { engine = c.Ethash case c.Clique != nil: engine = c.Clique - //case c.Istanbul != nil: - // engine = c.Istanbul + case c.Istanbul != nil: + engine = c.Istanbul default: engine = "unknown" }