Chain stalls while scaling out from 1 to 4 nodes, changing quorum size fixes things (#796)

* Adding Ceil2Nby3Block genesis config option to specify the number of blocks required to transition from 2f+1 to Ceil(2n/3) in IBFT

* Add support for using ceil(2N/3) in IBFT
This commit is contained in:
Amit Panghal 2019-09-30 15:26:13 -04:00 committed by Samer Falah
parent a1490d7be6
commit e1278520d0
13 changed files with 82 additions and 28 deletions

View File

@ -27,8 +27,8 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/consensus/istanbul"
istanbulCore "github.com/ethereum/go-ethereum/consensus/istanbul/core"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator" "github.com/ethereum/go-ethereum/consensus/istanbul/validator"
istanbulCore "github.com/ethereum/go-ethereum/consensus/istanbul/core"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/crypto/sha3"
@ -290,7 +290,7 @@ func (sb *backend) verifyCommittedSeals(chain consensus.ChainReader, header *typ
} }
// The length of validSeal should be larger than number of faulty node + 1 // The length of validSeal should be larger than number of faulty node + 1
if validSeal <= 2*snap.ValSet.F() { if validSeal <= snap.ValSet.F() {
return errInvalidCommittedSeals return errInvalidCommittedSeals
} }
@ -616,7 +616,6 @@ func sigHash(header *types.Header) (hash common.Hash) {
return hash return hash
} }
// SealHash returns the hash of a block prior to it being sealed. // SealHash returns the hash of a block prior to it being sealed.
func (sb *backend) SealHash(header *types.Header) common.Hash { func (sb *backend) SealHash(header *types.Header) common.Hash {
return sigHash(header) return sigHash(header)

View File

@ -16,6 +16,8 @@
package istanbul package istanbul
import "math/big"
type ProposerPolicy uint64 type ProposerPolicy uint64
const ( const (
@ -28,6 +30,7 @@ type Config struct {
BlockPeriod uint64 `toml:",omitempty"` // Default minimum difference between two consecutive block's timestamps in second BlockPeriod uint64 `toml:",omitempty"` // Default minimum difference between two consecutive block's timestamps in second
ProposerPolicy ProposerPolicy `toml:",omitempty"` // The policy for proposer selection ProposerPolicy ProposerPolicy `toml:",omitempty"` // The policy for proposer selection
Epoch uint64 `toml:",omitempty"` // The number of blocks after which to checkpoint and reset the pending votes Epoch uint64 `toml:",omitempty"` // The number of blocks after which to checkpoint and reset the pending votes
Ceil2Nby3Block *big.Int `toml:",omitempty"` // Number of confirmations required to move from one state to next [2F + 1 to Ceil(2N/3)]
} }
var DefaultConfig = &Config{ var DefaultConfig = &Config{
@ -35,4 +38,5 @@ var DefaultConfig = &Config{
BlockPeriod: 1, BlockPeriod: 1,
ProposerPolicy: RoundRobin, ProposerPolicy: RoundRobin,
Epoch: 30000, Epoch: 30000,
Ceil2Nby3Block: big.NewInt(0),
} }

View File

@ -72,7 +72,7 @@ func (c *core) handleCommit(msg *message, src istanbul.Validator) error {
// //
// If we already have a proposal, we may have chance to speed up the consensus process // If we already have a proposal, we may have chance to speed up the consensus process
// by committing the proposal without PREPARE messages. // by committing the proposal without PREPARE messages.
if c.current.Commits.Size() > 2*c.valSet.F() && c.state.Cmp(StateCommitted) < 0 { if c.current.Commits.Size() >= c.QuorumSize() && c.state.Cmp(StateCommitted) < 0 {
// Still need to call LockHash here since state can skip Prepared state and jump directly to the Committed state. // Still need to call LockHash here since state can skip Prepared state and jump directly to the Committed state.
c.current.LockHash() c.current.LockHash()
c.commit() c.commit()

View File

@ -191,8 +191,8 @@ OUTER:
if r0.state != StatePrepared { if r0.state != StatePrepared {
t.Errorf("state mismatch: have %v, want %v", r0.state, StatePrepared) t.Errorf("state mismatch: have %v, want %v", r0.state, StatePrepared)
} }
if r0.current.Commits.Size() > 2*r0.valSet.F() { if r0.current.Commits.Size() >= r0.QuorumSize() {
t.Errorf("the size of commit messages should be less than %v", 2*r0.valSet.F()+1) t.Errorf("the size of commit messages should be less than %v", r0.QuorumSize())
} }
if r0.current.IsHashLocked() { if r0.current.IsHashLocked() {
t.Errorf("block should not be locked") t.Errorf("block should not be locked")
@ -200,12 +200,12 @@ OUTER:
continue continue
} }
// core should have 2F+1 prepare messages // core should have 2F+1 before Ceil2Nby3Block or Ceil(2N/3) prepare messages
if r0.current.Commits.Size() <= 2*r0.valSet.F() { if r0.current.Commits.Size() < r0.QuorumSize() {
t.Errorf("the size of commit messages should be larger than 2F+1: size %v", r0.current.Commits.Size()) t.Errorf("the size of commit messages should be larger than 2F+1 or Ceil(2N/3): size %v", r0.QuorumSize())
} }
// check signatures large than 2F+1 // check signatures large than F
signedCount := 0 signedCount := 0
committedSeals := v0.committedMsgs[0].committedSeals committedSeals := v0.committedMsgs[0].committedSeals
for _, validator := range r0.valSet.List() { for _, validator := range r0.valSet.List() {
@ -216,8 +216,8 @@ OUTER:
} }
} }
} }
if signedCount <= 2*r0.valSet.F() { if signedCount <= r0.valSet.F() {
t.Errorf("the expected signed count should be larger than %v, but got %v", 2*r0.valSet.F(), signedCount) t.Errorf("the expected signed count should be larger than %v, but got %v", r0.valSet.F(), signedCount)
} }
if !r0.current.IsHashLocked() { if !r0.current.IsHashLocked() {
t.Errorf("block should be locked") t.Errorf("block should be locked")

View File

@ -342,6 +342,15 @@ func (c *core) checkValidatorSignature(data []byte, sig []byte) (common.Address,
return istanbul.CheckValidatorSignature(c.valSet, data, sig) return istanbul.CheckValidatorSignature(c.valSet, data, sig)
} }
func (c *core) QuorumSize() int {
if c.config.Ceil2Nby3Block == nil || (c.current != nil && c.current.sequence.Cmp(c.config.Ceil2Nby3Block) < 0) {
c.logger.Trace("Confirmation Formula used 2F+ 1")
return (2 * c.valSet.F()) + 1
}
c.logger.Trace("Confirmation Formula used ceil(2N/3)")
return int(math.Ceil(float64(2*c.valSet.Size()) / 3))
}
// PrepareCommittedSeal returns a committed seal for the given hash // PrepareCommittedSeal returns a committed seal for the given hash
func PrepareCommittedSeal(hash common.Hash) []byte { func PrepareCommittedSeal(hash common.Hash) []byte {
var buf bytes.Buffer var buf bytes.Buffer

View File

@ -17,6 +17,7 @@
package core package core
import ( import (
"github.com/ethereum/go-ethereum/common"
"math/big" "math/big"
"reflect" "reflect"
"testing" "testing"
@ -80,3 +81,20 @@ func TestNewRequest(t *testing.T) {
} }
} }
} }
func TestQuorumSize(t *testing.T) {
N := uint64(4)
F := uint64(1)
sys := NewTestSystemWithBackend(N, F)
backend := sys.backends[0]
c := backend.engine.(*core)
valSet := c.valSet
for i := 1; i <= 1000; i++ {
valSet.AddValidator(common.StringToAddress(string(i)))
if 2*c.QuorumSize() <= (valSet.Size()+valSet.F()) || 2*c.QuorumSize() > (valSet.Size()+valSet.F()+2) {
t.Errorf("quorumSize constraint failed, expected value (2*QuorumSize > Size+F && 2*QuorumSize <= Size+F+2) to be:%v, got: %v, for size: %v", true, false, valSet.Size())
}
}
}

View File

@ -59,7 +59,7 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
// Change to Prepared state if we've received enough PREPARE messages or it is locked // Change to Prepared state if we've received enough PREPARE messages or it is locked
// and we are in earlier state before Prepared state. // and we are in earlier state before Prepared state.
if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.GetPrepareOrCommitSize() > 2*c.valSet.F()) && if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.GetPrepareOrCommitSize() >= c.QuorumSize()) &&
c.state.Cmp(StatePrepared) < 0 { c.state.Cmp(StatePrepared) < 0 {
c.current.LockHash() c.current.LockHash()
c.setState(StatePrepared) c.setState(StatePrepared)

View File

@ -17,6 +17,7 @@
package core package core
import ( import (
"math"
"math/big" "math/big"
"reflect" "reflect"
"testing" "testing"
@ -156,12 +157,11 @@ func TestHandlePrepare(t *testing.T) {
errInconsistentSubject, errInconsistentSubject,
}, },
{ {
// less than 2F+1
func() *testSystem { func() *testSystem {
sys := NewTestSystemWithBackend(N, F) sys := NewTestSystemWithBackend(N, F)
// save less than 2*F+1 replica // save less than Ceil(2*N/3) replica
sys.backends = sys.backends[2*int(F)+1:] sys.backends = sys.backends[int(math.Ceil(float64(2*N)/3)):]
for i, backend := range sys.backends { for i, backend := range sys.backends {
c := backend.engine.(*core) c := backend.engine.(*core)
@ -214,8 +214,8 @@ OUTER:
if r0.state != StatePreprepared { if r0.state != StatePreprepared {
t.Errorf("state mismatch: have %v, want %v", r0.state, StatePreprepared) t.Errorf("state mismatch: have %v, want %v", r0.state, StatePreprepared)
} }
if r0.current.Prepares.Size() > 2*r0.valSet.F() { if r0.current.Prepares.Size() >= r0.QuorumSize() {
t.Errorf("the size of PREPARE messages should be less than %v", 2*r0.valSet.F()+1) t.Errorf("the size of PREPARE messages should be less than %v", r0.QuorumSize())
} }
if r0.current.IsHashLocked() { if r0.current.IsHashLocked() {
t.Errorf("block should not be locked") t.Errorf("block should not be locked")
@ -224,12 +224,12 @@ OUTER:
continue continue
} }
// core should have 2F+1 PREPARE messages // core should have 2F+1 before Ceil2Nby3Block and Ceil(2N/3) after Ceil2Nby3Block PREPARE messages
if r0.current.Prepares.Size() <= 2*r0.valSet.F() { if r0.current.Prepares.Size() < r0.QuorumSize() {
t.Errorf("the size of PREPARE messages should be larger than 2F+1: size %v", r0.current.Commits.Size()) t.Errorf("the size of PREPARE messages should be larger than 2F+1 or ceil(2N/3): size %v", r0.current.Commits.Size())
} }
// a message will be delivered to backend if 2F+1 // a message will be delivered to backend if ceil(2N/3)
if int64(len(v0.sentMsgs)) != 1 { if int64(len(v0.sentMsgs)) != 1 {
t.Errorf("the Send() should be called once: times %v", len(test.system.backends[0].sentMsgs)) t.Errorf("the Send() should be called once: times %v", len(test.system.backends[0].sentMsgs))
} }

View File

@ -98,8 +98,8 @@ func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
c.sendRoundChange(roundView.Round) c.sendRoundChange(roundView.Round)
} }
return nil return nil
} else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(roundView.Round) < 0) { } else if num == c.QuorumSize() && (c.waitingForRoundChange || cv.Round.Cmp(roundView.Round) < 0) {
// We've received 2f+1 ROUND CHANGE messages, start a new round immediately. // We've received 2f+1/Ceil(2N/3) ROUND CHANGE messages, start a new round immediately.
c.startNewRound(roundView.Round) c.startNewRound(roundView.Round)
return nil return nil
} else if cv.Round.Cmp(roundView.Round) < 0 { } else if cv.Round.Cmp(roundView.Round) < 0 {

View File

@ -257,6 +257,8 @@ func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainCo
config.Istanbul.Epoch = chainConfig.Istanbul.Epoch config.Istanbul.Epoch = chainConfig.Istanbul.Epoch
} }
config.Istanbul.ProposerPolicy = istanbul.ProposerPolicy(chainConfig.Istanbul.ProposerPolicy) config.Istanbul.ProposerPolicy = istanbul.ProposerPolicy(chainConfig.Istanbul.ProposerPolicy)
config.Istanbul.Ceil2Nby3Block = chainConfig.Istanbul.Ceil2Nby3Block
return istanbulBackend.New(&config.Istanbul, ctx.NodeKey(), db) return istanbulBackend.New(&config.Istanbul, ctx.NodeKey(), db)
} }

View File

@ -82,7 +82,7 @@ func TestNodeInfo(t *testing.T) {
}{ }{
{"ethash", nil, nil, false}, {"ethash", nil, nil, false},
{"raft", nil, nil, true}, {"raft", nil, nil, true},
{"istanbul", nil, &params.IstanbulConfig{1, 1}, false}, {"istanbul", nil, &params.IstanbulConfig{1, 1, big.NewInt(0)}, false},
{"clique", &params.CliqueConfig{1, 1}, nil, false}, {"clique", &params.CliqueConfig{1, 1}, nil, false},
} }

View File

@ -124,6 +124,7 @@ var (
Istanbul: &IstanbulConfig{ Istanbul: &IstanbulConfig{
Epoch: 30000, Epoch: 30000,
ProposerPolicy: 0, ProposerPolicy: 0,
Ceil2Nby3Block: big.NewInt(0),
}, },
} }
@ -214,8 +215,9 @@ func (c *CliqueConfig) String() string {
// IstanbulConfig is the consensus engine configs for Istanbul based sealing. // IstanbulConfig is the consensus engine configs for Istanbul based sealing.
type IstanbulConfig struct { type IstanbulConfig struct {
Epoch uint64 `json:"epoch"` // Epoch length to reset votes and checkpoint Epoch uint64 `json:"epoch"` // Epoch length to reset votes and checkpoint
ProposerPolicy uint64 `json:"policy"` // The policy for proposer selection ProposerPolicy uint64 `json:"policy"` // The policy for proposer selection
Ceil2Nby3Block *big.Int `json:"ceil2Nby3Block,omitempty"` // Number of confirmations required to move from one state to next [2F + 1 to Ceil(2N/3)]
} }
// String implements the stringer interface, returning the consensus engine details. // String implements the stringer interface, returning the consensus engine details.
@ -375,6 +377,9 @@ func (c *ChainConfig) checkCompatible(newcfg *ChainConfig, head *big.Int, isQuor
if isForkIncompatible(c.EWASMBlock, newcfg.EWASMBlock, head) { if isForkIncompatible(c.EWASMBlock, newcfg.EWASMBlock, head) {
return newCompatError("ewasm fork block", c.EWASMBlock, newcfg.EWASMBlock) return newCompatError("ewasm fork block", c.EWASMBlock, newcfg.EWASMBlock)
} }
if c.Istanbul != nil && newcfg.Istanbul != nil && isForkIncompatible(c.Istanbul.Ceil2Nby3Block, newcfg.Istanbul.Ceil2Nby3Block, head) {
return newCompatError("Ceil 2N/3 fork block", c.Istanbul.Ceil2Nby3Block, newcfg.Istanbul.Ceil2Nby3Block)
}
return nil return nil
} }

View File

@ -70,6 +70,23 @@ func TestCheckCompatible(t *testing.T) {
RewindTo: 9, RewindTo: 9,
}, },
}, },
{
stored: &ChainConfig{Istanbul: &IstanbulConfig{Ceil2Nby3Block: big.NewInt(10)}},
new: &ChainConfig{Istanbul: &IstanbulConfig{Ceil2Nby3Block: big.NewInt(20)}},
head: 4,
wantErr: nil,
},
{
stored: &ChainConfig{Istanbul: &IstanbulConfig{Ceil2Nby3Block: big.NewInt(10)}},
new: &ChainConfig{Istanbul: &IstanbulConfig{Ceil2Nby3Block: big.NewInt(20)}},
head: 30,
wantErr: &ConfigCompatError{
What: "Ceil 2N/3 fork block",
StoredConfig: big.NewInt(10),
NewConfig: big.NewInt(20),
RewindTo: 9,
},
},
} }
for _, test := range tests { for _, test := range tests {